From 581d9365ee6e31af27ba2ab6b36bd6b2b6d754ed Mon Sep 17 00:00:00 2001 From: norangebit Date: Sun, 3 May 2020 14:57:03 +0200 Subject: [PATCH] add average and db push, pull and pushpull --- .../drills/drill09/average/AverageNode.kt | 61 +++++++++++++++---- .../kotlin/drills/drill09/average/TestPull.kt | 35 +++++++++++ .../drill09/average/{Test.kt => TestPush.kt} | 8 +-- .../drills/drill09/average/TestPushPull.kt | 35 +++++++++++ .../kotlin/drills/drill09/dbreplica/DbNode.kt | 56 +++++++++++++---- .../dbreplica/{Test.kt => TestPull.kt} | 6 +- .../drills/drill09/dbreplica/TestPush.kt | 32 ++++++++++ .../drills/drill09/dbreplica/TestPushPull.kt | 32 ++++++++++ .../util/network/epidemic/EpidemicNode.kt | 5 ++ .../util/network/epidemic/EpidemicPacket.kt | 18 ------ .../util/network/epidemic/ReplayPayload.kt | 6 -- .../util/network/epidemic/TimePayload.kt | 9 --- .../util/network/epidemic/TimedPayload.kt | 9 +++ .../network/epidemic/packet/EpidemicPacket.kt | 13 ++++ .../epidemic/packet/FeedbackEpidemicPacket.kt | 13 ++++ .../epidemic/packet/TimedEpidemicPacket.kt | 11 ++++ .../network/epidemic/replay/ReplayPacket.kt | 13 ---- .../simulator/packet/FeedbackPacket.kt | 5 ++ 18 files changed, 290 insertions(+), 77 deletions(-) create mode 100644 src/main/kotlin/drills/drill09/average/TestPull.kt rename src/main/kotlin/drills/drill09/average/{Test.kt => TestPush.kt} (71%) create mode 100644 src/main/kotlin/drills/drill09/average/TestPushPull.kt rename src/main/kotlin/drills/drill09/dbreplica/{Test.kt => TestPull.kt} (72%) create mode 100644 src/main/kotlin/drills/drill09/dbreplica/TestPush.kt create mode 100644 src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt delete mode 100644 src/main/kotlin/util/network/epidemic/EpidemicPacket.kt delete mode 100644 src/main/kotlin/util/network/epidemic/ReplayPayload.kt delete mode 100644 src/main/kotlin/util/network/epidemic/TimePayload.kt create mode 100644 src/main/kotlin/util/network/epidemic/TimedPayload.kt create mode 100644 src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt create mode 100644 src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt create mode 100644 src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt delete mode 100644 src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt create mode 100644 src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt diff --git a/src/main/kotlin/drills/drill09/average/AverageNode.kt b/src/main/kotlin/drills/drill09/average/AverageNode.kt index 80d4bbc..a1dc3dc 100644 --- a/src/main/kotlin/drills/drill09/average/AverageNode.kt +++ b/src/main/kotlin/drills/drill09/average/AverageNode.kt @@ -1,8 +1,8 @@ package drills.drill09.average import util.network.epidemic.EpidemicNode -import util.network.epidemic.EpidemicPacket -import util.network.epidemic.replay.ReplayPacket +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.FeedbackEpidemicPacket import util.network.simulator.Address import util.network.simulator.Network @@ -10,12 +10,15 @@ class AverageNode( address: Address, network: Network, initialValue: Double, + val infectStrategy: InfectStrategy = InfectStrategy.PUSH, bufferSize: Int = 1 -) : EpidemicNode>(address, network, initialValue, bufferSize) { - override fun receivePushOrReply(packet: ReplayPacket) { +) : EpidemicNode>( + address, network, initialValue, bufferSize +) { + override fun receivePushOrReply(packet: FeedbackEpidemicPacket) { val payload = packet.payload - if (packet.isReplay) + if (packet.isFeedback) changeValue(payload) else { val newValue = (nodeValue + payload) / 2 @@ -23,21 +26,34 @@ class AverageNode( changeValue(newValue) send( - ReplayPacket( + FeedbackEpidemicPacket( address, packet.senderAddress, newValue, - EpidemicPacket.Type.PUSH, + if (infectStrategy == InfectStrategy.PUSH) + EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY, true ) ) - sendPush() + when (infectStrategy) { + InfectStrategy.PUSH -> sendPush() + InfectStrategy.PULL -> sendPull() + InfectStrategy.PUSHPULL -> sendPushPull() + } } } - override fun receivePull(packet: ReplayPacket) { - TODO("Not yet implemented") + override fun receivePull(packet: FeedbackEpidemicPacket) { + if (packet.type == EpidemicPacket.Type.PUSHPULL) + return + + send( + makeInfectionPacket( + packet.senderAddress, + EpidemicPacket.Type.REPLY + ) + ) } override fun infect() { @@ -48,7 +64,7 @@ class AverageNode( receiverAddress: Address, type: EpidemicPacket.Type ): EpidemicPacket { - return ReplayPacket( + return FeedbackEpidemicPacket( address, receiverAddress, nodeValue, @@ -56,4 +72,27 @@ class AverageNode( false ) } + + companion object { + fun createPushNode( + address: Address, + network: Network, + initialValue: Double, + bufferSize: Int = 1 + ) = AverageNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize) + + fun createPullNode( + address: Address, + network: Network, + initialValue: Double, + bufferSize: Int = 1 + ) = AverageNode(address, network, initialValue, InfectStrategy.PULL, bufferSize) + + fun createPushPullNode( + address: Address, + network: Network, + initialValue: Double, + bufferSize: Int = 1 + ) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize) + } } \ No newline at end of file diff --git a/src/main/kotlin/drills/drill09/average/TestPull.kt b/src/main/kotlin/drills/drill09/average/TestPull.kt new file mode 100644 index 0000000..7fed07d --- /dev/null +++ b/src/main/kotlin/drills/drill09/average/TestPull.kt @@ -0,0 +1,35 @@ +package drills.drill09.average + +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.FeedbackEpidemicPacket +import util.network.simulator.Address +import util.network.simulator.Network + +fun main() { + val network = Network() + + repeat(17) { + AverageNode.createPullNode(Address(it), network, 0.0) + } + + val alpha = network.getRandomNode() as AverageNode + val beta = network.getRandomNode() as AverageNode + + val packet = FeedbackEpidemicPacket( + alpha.address, + beta.address, + 1, + EpidemicPacket.Type.REPLY, + false + ) + + alpha.send(packet) + + network.start() + + Thread.sleep(2000) + + network.forEach { + println(1.0 / (it as AverageNode).nodeValue) + } +} \ No newline at end of file diff --git a/src/main/kotlin/drills/drill09/average/Test.kt b/src/main/kotlin/drills/drill09/average/TestPush.kt similarity index 71% rename from src/main/kotlin/drills/drill09/average/Test.kt rename to src/main/kotlin/drills/drill09/average/TestPush.kt index ae5aef9..a196d3f 100644 --- a/src/main/kotlin/drills/drill09/average/Test.kt +++ b/src/main/kotlin/drills/drill09/average/TestPush.kt @@ -1,7 +1,7 @@ package drills.drill09.average -import util.network.epidemic.EpidemicPacket -import util.network.epidemic.replay.ReplayPacket +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.FeedbackEpidemicPacket import util.network.simulator.Address import util.network.simulator.Network @@ -9,13 +9,13 @@ fun main() { val network = Network() repeat(17) { - AverageNode(Address(it), network, 0.0) + AverageNode.createPushNode(Address(it), network, 0.0) } val alpha = network.getRandomNode() as AverageNode val beta = network.getRandomNode() as AverageNode - val packet = ReplayPacket( + val packet = FeedbackEpidemicPacket( alpha.address, beta.address, 1, diff --git a/src/main/kotlin/drills/drill09/average/TestPushPull.kt b/src/main/kotlin/drills/drill09/average/TestPushPull.kt new file mode 100644 index 0000000..b34c7c1 --- /dev/null +++ b/src/main/kotlin/drills/drill09/average/TestPushPull.kt @@ -0,0 +1,35 @@ +package drills.drill09.average + +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.FeedbackEpidemicPacket +import util.network.simulator.Address +import util.network.simulator.Network + +fun main() { + val network = Network() + + repeat(17) { + AverageNode.createPushPullNode(Address(it), network, 0.0) + } + + val alpha = network.getRandomNode() as AverageNode + val beta = network.getRandomNode() as AverageNode + + val packet = FeedbackEpidemicPacket( + alpha.address, + beta.address, + 1, + EpidemicPacket.Type.REPLY, + false + ) + + alpha.send(packet) + + network.start() + + Thread.sleep(2000) + + network.forEach { + println(1.0 / (it as AverageNode).nodeValue) + } +} \ No newline at end of file diff --git a/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt b/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt index 9e3b300..757b9a3 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt @@ -1,54 +1,84 @@ package drills.drill09.dbreplica -import util.network.epidemic.TimePayload +import util.network.epidemic.TimedPayload import util.network.epidemic.EpidemicNode -import util.network.epidemic.EpidemicPacket +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.TimedEpidemicPacket import util.network.simulator.Address import util.network.simulator.Network class DbNode( address: Address, network: Network, - initialValue: TimePayload, + initialValue: TimedPayload, + val infectStrategy: InfectStrategy = InfectStrategy.PUSHPULL, bufferSize: Int = 1 -) : EpidemicNode, EpidemicPacket>>( +) : EpidemicNode, TimedEpidemicPacket>( address, network, initialValue, bufferSize ) { - override fun receivePushOrReply(packet: EpidemicPacket>) { + override fun receivePushOrReply(packet: TimedEpidemicPacket) { val receivedValue = packet.payload if (nodeValue.timestamp < receivedValue.timestamp) changeValue(receivedValue) } - override fun receivePull(packet: EpidemicPacket>) { + override fun receivePull(packet: TimedEpidemicPacket) { val receivedValue = packet.payload if (nodeValue.timestamp > receivedValue.timestamp) send( - EpidemicPacket( - address, + makeInfectionPacket( packet.senderAddress, - nodeValue, EpidemicPacket.Type.REPLY ) ) } override fun infect() { - sendPushPull() - Thread.sleep(100) + when (infectStrategy) { + InfectStrategy.PUSH -> sendPush() + InfectStrategy.PULL -> sendPull() + InfectStrategy.PUSHPULL -> sendPushPull() + } + + Thread.sleep(DELTA) } override fun makeInfectionPacket( receiverAddress: Address, type: EpidemicPacket.Type - ): EpidemicPacket> { - return EpidemicPacket( + ): EpidemicPacket> { + return TimedEpidemicPacket( address, receiverAddress, nodeValue, type ) } + + companion object { + const val DELTA = 100L + + fun createPushNode( + address: Address, + network: Network, + initialValue: TimedPayload, + bufferSize: Int = 1 + ) = DbNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize) + + fun createPullNode( + address: Address, + network: Network, + initialValue: TimedPayload, + bufferSize: Int = 1 + ) = DbNode(address, network, initialValue, InfectStrategy.PULL, bufferSize) + + fun createPushPullNode( + address: Address, + network: Network, + initialValue: TimedPayload, + bufferSize: Int = 1 + ) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize) + } } \ No newline at end of file diff --git a/src/main/kotlin/drills/drill09/dbreplica/Test.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt similarity index 72% rename from src/main/kotlin/drills/drill09/dbreplica/Test.kt rename to src/main/kotlin/drills/drill09/dbreplica/TestPull.kt index 0c80018..b840212 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/Test.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt @@ -1,6 +1,6 @@ package drills.drill09.dbreplica -import util.network.epidemic.TimePayload +import util.network.epidemic.TimedPayload import util.network.simulator.Address import util.network.simulator.Network @@ -9,7 +9,7 @@ fun main() { val network = Network() repeat(10) { - DbNode(Address(it), network, TimePayload(0, 0)) + DbNode.createPullNode(Address(it), network, TimedPayload(0, 0)) } val alpha = network.getRandomNode() as DbNode @@ -22,7 +22,7 @@ fun main() { println((it as DbNode).nodeValue) } - alpha.changeValue(TimePayload(42, System.nanoTime())) + alpha.changeValue(TimedPayload(42, System.nanoTime())) Thread.sleep(2000) network.forEach { diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt new file mode 100644 index 0000000..e396619 --- /dev/null +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt @@ -0,0 +1,32 @@ +package drills.drill09.dbreplica + +import util.network.epidemic.TimedPayload +import util.network.simulator.Address +import util.network.simulator.Network + + +fun main() { + val network = Network() + + repeat(10) { + DbNode.createPushNode(Address(it), network, TimedPayload(0, 0)) + } + + val alpha = network.getRandomNode() as DbNode + + network.start() + + Thread.sleep(1000) + + network.forEach { + println((it as DbNode).nodeValue) + } + + alpha.changeValue(TimedPayload(42, System.nanoTime())) + Thread.sleep(2000) + + network.forEach { + println((it as DbNode).nodeValue) + } +} + diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt new file mode 100644 index 0000000..15a4c0d --- /dev/null +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt @@ -0,0 +1,32 @@ +package drills.drill09.dbreplica + +import util.network.epidemic.TimedPayload +import util.network.simulator.Address +import util.network.simulator.Network + + +fun main() { + val network = Network() + + repeat(10) { + DbNode.createPushPullNode(Address(it), network, TimedPayload(0, 0)) + } + + val alpha = network.getRandomNode() as DbNode + + network.start() + + Thread.sleep(1000) + + network.forEach { + println((it as DbNode).nodeValue) + } + + alpha.changeValue(TimedPayload(42, System.nanoTime())) + Thread.sleep(2000) + + network.forEach { + println((it as DbNode).nodeValue) + } +} + diff --git a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt index 5c2d0c6..a45aa35 100644 --- a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt +++ b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt @@ -1,5 +1,6 @@ package util.network.epidemic +import util.network.epidemic.packet.EpidemicPacket import util.network.simulator.Address import util.network.simulator.Network import util.network.simulator.Node @@ -75,6 +76,10 @@ abstract class EpidemicNode>( SUSCETIBLE, INFECTED, REMOVED } + enum class InfectStrategy { + PUSH, PULL, PUSHPULL + } + protected inner class EpidemicReceiver( threadGroup: ThreadGroup ) : Node.Receiver(threadGroup) { diff --git a/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt deleted file mode 100644 index 469f2b5..0000000 --- a/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt +++ /dev/null @@ -1,18 +0,0 @@ -package util.network.epidemic - -import util.network.simulator.Address -import util.network.simulator.packet.PayloadPacket - -open class EpidemicPacket( - override val senderAddress: Address, - override val receiverAddress: Address, - override val payload: T, - val type: Type -) : PayloadPacket { - - enum class Type { - PUSH, PULL, PUSHPULL, REPLY - } - -} - diff --git a/src/main/kotlin/util/network/epidemic/ReplayPayload.kt b/src/main/kotlin/util/network/epidemic/ReplayPayload.kt deleted file mode 100644 index d4532f1..0000000 --- a/src/main/kotlin/util/network/epidemic/ReplayPayload.kt +++ /dev/null @@ -1,6 +0,0 @@ -package util.network.epidemic - -data class ReplayPayload( - val value: T, - val isReplay: Boolean = true -) \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/TimePayload.kt b/src/main/kotlin/util/network/epidemic/TimePayload.kt deleted file mode 100644 index 209ff21..0000000 --- a/src/main/kotlin/util/network/epidemic/TimePayload.kt +++ /dev/null @@ -1,9 +0,0 @@ -package util.network.epidemic - -data class TimePayload(val value: T, val timestamp: Long) { - companion object { - fun createContent(value: T): TimePayload { - return TimePayload(value, System.nanoTime()) - } - } -} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/TimedPayload.kt b/src/main/kotlin/util/network/epidemic/TimedPayload.kt new file mode 100644 index 0000000..9177dd0 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/TimedPayload.kt @@ -0,0 +1,9 @@ +package util.network.epidemic + +data class TimedPayload(val value: T, val timestamp: Long) { + companion object { + fun createContent(value: T): TimedPayload { + return TimedPayload(value, System.nanoTime()) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt new file mode 100644 index 0000000..ee11230 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt @@ -0,0 +1,13 @@ +package util.network.epidemic.packet + +import util.network.simulator.packet.PayloadPacket + +interface EpidemicPacket: PayloadPacket { + val type: Type + + enum class Type { + PUSH, PULL, PUSHPULL, REPLY + } + +} + diff --git a/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt new file mode 100644 index 0000000..f0ac662 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt @@ -0,0 +1,13 @@ +package util.network.epidemic.packet + +import util.network.simulator.Address +import util.network.simulator.packet.FeedbackPacket + +data class FeedbackEpidemicPacket( + override val senderAddress: Address, + override val receiverAddress: Address, + override val payload: T, + override val type: EpidemicPacket.Type, + override val isFeedback: Boolean +) : EpidemicPacket, FeedbackPacket + diff --git a/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt new file mode 100644 index 0000000..c8942f1 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt @@ -0,0 +1,11 @@ +package util.network.epidemic.packet + +import util.network.epidemic.TimedPayload +import util.network.simulator.Address + +data class TimedEpidemicPacket( + override val senderAddress: Address, + override val receiverAddress: Address, + override val payload: TimedPayload, + override val type: EpidemicPacket.Type +): EpidemicPacket> \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt b/src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt deleted file mode 100644 index b3eb725..0000000 --- a/src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt +++ /dev/null @@ -1,13 +0,0 @@ -package util.network.epidemic.replay - -import util.network.epidemic.EpidemicPacket -import util.network.simulator.Address - -class ReplayPacket( - senderAddress: Address, - receiverAddress: Address, - payload: T, - type: Type, - val isReplay: Boolean -) : EpidemicPacket(senderAddress, receiverAddress, payload, type) - diff --git a/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt b/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt new file mode 100644 index 0000000..21be67a --- /dev/null +++ b/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt @@ -0,0 +1,5 @@ +package util.network.simulator.packet + +interface FeedbackPacket: Packet { + val isFeedback: Boolean +} \ No newline at end of file