diff --git a/src/main/kotlin/drills/drill09/average/AverageNode.kt b/src/main/kotlin/drills/drill09/average/AverageNode.kt index a1dc3dc..d421291 100644 --- a/src/main/kotlin/drills/drill09/average/AverageNode.kt +++ b/src/main/kotlin/drills/drill09/average/AverageNode.kt @@ -21,26 +21,35 @@ class AverageNode( if (packet.isFeedback) changeValue(payload) else { - val newValue = (nodeValue + payload) / 2 + computeAverage(payload) + sendFeedback(packet.senderAddress) + spreadNewValue() + } + } - changeValue(newValue) + private fun computeAverage(payload: Double) { + val newValue = (nodeValue + payload) / 2 + changeValue(newValue) + } - send( - FeedbackEpidemicPacket( - address, - packet.senderAddress, - newValue, - if (infectStrategy == InfectStrategy.PUSH) - EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY, - true - ) + private fun sendFeedback(receiverAddress: Address) { + send( + FeedbackEpidemicPacket( + address, + receiverAddress, + nodeValue, + if (infectStrategy == InfectStrategy.PUSH) + EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY, + true ) + ) + } - when (infectStrategy) { - InfectStrategy.PUSH -> sendPush() - InfectStrategy.PULL -> sendPull() - InfectStrategy.PUSHPULL -> sendPushPull() - } + private fun spreadNewValue() { + when (infectStrategy) { + InfectStrategy.PUSH -> sendPushToRandom() + InfectStrategy.PULL -> sendPullToRandom() + InfectStrategy.PUSHPULL -> sendPushPullToRandom() } } @@ -95,4 +104,4 @@ class AverageNode( bufferSize: Int = 1 ) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize) } -} \ No newline at end of file +} diff --git a/src/main/kotlin/drills/drill09/average/TestPull.kt b/src/main/kotlin/drills/drill09/average/TestPull.kt index 7fed07d..4488c14 100644 --- a/src/main/kotlin/drills/drill09/average/TestPull.kt +++ b/src/main/kotlin/drills/drill09/average/TestPull.kt @@ -32,4 +32,4 @@ fun main() { network.forEach { println(1.0 / (it as AverageNode).nodeValue) } -} \ No newline at end of file +} diff --git a/src/main/kotlin/drills/drill09/average/TestPush.kt b/src/main/kotlin/drills/drill09/average/TestPush.kt index a196d3f..83ced50 100644 --- a/src/main/kotlin/drills/drill09/average/TestPush.kt +++ b/src/main/kotlin/drills/drill09/average/TestPush.kt @@ -32,4 +32,4 @@ fun main() { network.forEach { println(1.0 / (it as AverageNode).nodeValue) } -} \ No newline at end of file +} diff --git a/src/main/kotlin/drills/drill09/average/TestPushPull.kt b/src/main/kotlin/drills/drill09/average/TestPushPull.kt index b34c7c1..93d3b70 100644 --- a/src/main/kotlin/drills/drill09/average/TestPushPull.kt +++ b/src/main/kotlin/drills/drill09/average/TestPushPull.kt @@ -32,4 +32,4 @@ fun main() { network.forEach { println(1.0 / (it as AverageNode).nodeValue) } -} \ No newline at end of file +} diff --git a/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt b/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt index 757b9a3..5f2908d 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/DbNode.kt @@ -37,9 +37,9 @@ class DbNode( override fun infect() { when (infectStrategy) { - InfectStrategy.PUSH -> sendPush() - InfectStrategy.PULL -> sendPull() - InfectStrategy.PUSHPULL -> sendPushPull() + InfectStrategy.PUSH -> sendPushToRandom() + InfectStrategy.PULL -> sendPullToRandom() + InfectStrategy.PUSHPULL -> sendPushPullToRandom() } Thread.sleep(DELTA) @@ -81,4 +81,4 @@ class DbNode( bufferSize: Int = 1 ) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize) } -} \ No newline at end of file +} diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt index a28c7d2..c82d709 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt @@ -4,7 +4,6 @@ import util.network.epidemic.TimedPayload import util.network.simulator.Address import util.network.simulator.Network - fun main() { val network = Network() @@ -29,4 +28,3 @@ fun main() { println((it as DbNode).nodeValue) } } - diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt index 628e235..4157744 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt @@ -4,7 +4,6 @@ import util.network.epidemic.TimedPayload import util.network.simulator.Address import util.network.simulator.Network - fun main() { val network = Network() @@ -29,4 +28,3 @@ fun main() { println((it as DbNode).nodeValue) } } - diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt index 65882d8..3954d8f 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt @@ -4,7 +4,6 @@ import util.network.epidemic.TimedPayload import util.network.simulator.Address import util.network.simulator.Network - fun main() { val network = Network() @@ -29,4 +28,3 @@ fun main() { println((it as DbNode).nodeValue) } } - diff --git a/src/main/kotlin/drills/drill09/gossip/QueryNode.kt b/src/main/kotlin/drills/drill09/gossip/QueryNode.kt index 781e7bd..e41e6b2 100644 --- a/src/main/kotlin/drills/drill09/gossip/QueryNode.kt +++ b/src/main/kotlin/drills/drill09/gossip/QueryNode.kt @@ -11,6 +11,7 @@ class QueryNode( address: Address, network: Network, initialValue: T, + private val demotivatingFactor: Int = 10, bufferSize: Int = 1 ) : EpidemicNode>( address, network, initialValue, bufferSize @@ -32,26 +33,25 @@ class QueryNode( } override fun receivePushOrReply(packet: FeedbackEpidemicPacket) { - //println(packet) - if (nodeState == State.REMOVED) - return + when (nodeState) { + State.REMOVED -> return + State.INFECTED -> whenInfected(packet) + State.SUSCEPTIBLE -> whenSusceptible(packet) + } + } - if (nodeState == State.SUSCEPTIBLE) { - // println("susceptible") - nodeState = State.INFECTED + private fun whenSusceptible(packet: FeedbackEpidemicPacket) { + nodeState = State.INFECTED + queryValue = packet.payload - queryValue = packet.payload - sendPush(1) + sendPushToRandom(2) - if (packet.payload == nodeValue) - println("find on node $address") - } else { - if (packet.isFeedback && random.nextBoolean()) { - nodeState = State.REMOVED - println("removed") - - } + if (packet.payload == nodeValue) + println("find ${packet.payload} on node $address") + } + private fun whenInfected(packet: FeedbackEpidemicPacket) { + if (!packet.isFeedback) send( FeedbackEpidemicPacket( address, @@ -61,7 +61,9 @@ class QueryNode( true ) ) - } + + if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor) + nodeState = State.REMOVED } override fun receivePull(packet: FeedbackEpidemicPacket) { @@ -71,4 +73,4 @@ class QueryNode( override fun infect() { Thread.interrupted() } -} \ No newline at end of file +} diff --git a/src/main/kotlin/drills/drill09/gossip/TestPush.kt b/src/main/kotlin/drills/drill09/gossip/TestPush.kt index 0660b2e..83eb42a 100644 --- a/src/main/kotlin/drills/drill09/gossip/TestPush.kt +++ b/src/main/kotlin/drills/drill09/gossip/TestPush.kt @@ -5,12 +5,11 @@ import util.network.epidemic.packet.FeedbackEpidemicPacket import util.network.simulator.Address import util.network.simulator.Network - fun main() { val network = Network() - repeat(10) { - QueryNode(Address(it), network, it*2) + repeat(20) { + QueryNode(Address(it), network, it * 2) } val alpha = network.getRandomNode() as QueryNode @@ -19,7 +18,7 @@ fun main() { val packet = FeedbackEpidemicPacket( alpha.address, beta.address, - 14, + 24, EpidemicPacket.Type.PUSH, false ) @@ -28,4 +27,3 @@ fun main() { network.start() } - diff --git a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt index de5c20d..6c0de80 100644 --- a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt +++ b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt @@ -5,7 +5,7 @@ import util.network.simulator.Address import util.network.simulator.Network import util.network.simulator.Node -abstract class EpidemicNode>( +abstract class EpidemicNode>( address: Address, network: Network, initialValue: T, @@ -41,21 +41,21 @@ abstract class EpidemicNode>( type: EpidemicPacket.Type ): EpidemicPacket - protected fun sendPush(numberOfReceiver: Int = 1) { + protected fun sendPushToRandom(numberOfReceiver: Int = 1) { sendToRandom( EpidemicPacket.Type.PUSH, numberOfReceiver ) } - protected fun sendPull(numberOfReceiver: Int = 1) { + protected fun sendPullToRandom(numberOfReceiver: Int = 1) { sendToRandom( EpidemicPacket.Type.PULL, numberOfReceiver ) } - protected fun sendPushPull(numberOfReceiver: Int = 1) { + protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) { sendToRandom( EpidemicPacket.Type.PUSHPULL, numberOfReceiver @@ -69,8 +69,6 @@ abstract class EpidemicNode>( protected abstract fun infect() protected fun onReceive(packet: P) { - //println(packet.type) - when (packet.type) { EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) EpidemicPacket.Type.PULL -> receivePull(packet) @@ -125,5 +123,4 @@ abstract class EpidemicNode>( return EpidemicSender(threadGroup) } } - -} \ No newline at end of file +} diff --git a/src/main/kotlin/util/network/epidemic/TimedPayload.kt b/src/main/kotlin/util/network/epidemic/TimedPayload.kt index 9177dd0..75915ba 100644 --- a/src/main/kotlin/util/network/epidemic/TimedPayload.kt +++ b/src/main/kotlin/util/network/epidemic/TimedPayload.kt @@ -6,4 +6,4 @@ data class TimedPayload(val value: T, val timestamp: Long) { return TimedPayload(value, System.nanoTime()) } } -} \ No newline at end of file +} diff --git a/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt index ee11230..b2cca97 100644 --- a/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt +++ b/src/main/kotlin/util/network/epidemic/packet/EpidemicPacket.kt @@ -2,12 +2,10 @@ package util.network.epidemic.packet import util.network.simulator.packet.PayloadPacket -interface EpidemicPacket: PayloadPacket { +interface EpidemicPacket : PayloadPacket { val type: Type enum class Type { PUSH, PULL, PUSHPULL, REPLY } - } - diff --git a/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt index f0ac662..60cc3df 100644 --- a/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt +++ b/src/main/kotlin/util/network/epidemic/packet/FeedbackEpidemicPacket.kt @@ -10,4 +10,3 @@ data class FeedbackEpidemicPacket( override val type: EpidemicPacket.Type, override val isFeedback: Boolean ) : EpidemicPacket, FeedbackPacket - diff --git a/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt index c8942f1..eb7b9fd 100644 --- a/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt +++ b/src/main/kotlin/util/network/epidemic/packet/TimedEpidemicPacket.kt @@ -8,4 +8,4 @@ data class TimedEpidemicPacket( override val receiverAddress: Address, override val payload: TimedPayload, override val type: EpidemicPacket.Type -): EpidemicPacket> \ No newline at end of file +) : EpidemicPacket> diff --git a/src/main/kotlin/util/network/simulator/Network.kt b/src/main/kotlin/util/network/simulator/Network.kt index c640f22..59f268c 100644 --- a/src/main/kotlin/util/network/simulator/Network.kt +++ b/src/main/kotlin/util/network/simulator/Network.kt @@ -17,7 +17,7 @@ class Network(threadGrupName: String = "network") { addresses.add(node.address) } - fun getRandomAddress() : Address { + fun getRandomAddress(): Address { val index = random.nextInt(addresses.size) return addresses[index] diff --git a/src/main/kotlin/util/network/simulator/Test.kt b/src/main/kotlin/util/network/simulator/Test.kt index 1344765..d20846d 100644 --- a/src/main/kotlin/util/network/simulator/Test.kt +++ b/src/main/kotlin/util/network/simulator/Test.kt @@ -12,12 +12,12 @@ fun main() { network.start() } -class TestNode(address: Address, network: Network): Node(address, network) { +class TestNode(address: Address, network: Network) : Node(address, network) { override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { return TestCommunicatorFactory(threadGroup) } - private inner class TestReceiver(threadGroup: ThreadGroup): Receiver(threadGroup) { + private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) { override fun run() { if (address != Address(0)) return @@ -28,7 +28,7 @@ class TestNode(address: Address, network: Network): Node(address, network) { } } - private inner class TestSender(threadGroup: ThreadGroup): Sender(threadGroup) { + private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) { override fun run() { if (address != Address(1)) return @@ -39,7 +39,8 @@ class TestNode(address: Address, network: Network): Node(address, network) { } } - private inner class TestCommunicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory(threadGroup) { + private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) : + CommunicatorFactory(threadGroup) { override fun createReceiver(): Receiver { return TestReceiver(threadGroup) } @@ -48,4 +49,4 @@ class TestNode(address: Address, network: Network): Node(address, network) { return TestSender(threadGroup) } } -} \ No newline at end of file +} diff --git a/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt b/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt index 21be67a..2577c2b 100644 --- a/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt +++ b/src/main/kotlin/util/network/simulator/packet/FeedbackPacket.kt @@ -1,5 +1,5 @@ package util.network.simulator.packet -interface FeedbackPacket: Packet { +interface FeedbackPacket : Packet { val isFeedback: Boolean -} \ No newline at end of file +} diff --git a/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt b/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt index 8ffa831..5ea8125 100644 --- a/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt +++ b/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt @@ -6,4 +6,3 @@ data class NetworkPacket( override val senderAddress: Address, override val receiverAddress: Address ) : Packet - diff --git a/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt b/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt index 669abee..51283c1 100644 --- a/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt +++ b/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt @@ -1,5 +1,5 @@ package util.network.simulator.packet -interface PayloadPacket: Packet { +interface PayloadPacket : Packet { val payload: T }