diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt index b840212..a28c7d2 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPull.kt @@ -22,7 +22,7 @@ fun main() { println((it as DbNode).nodeValue) } - alpha.changeValue(TimedPayload(42, System.nanoTime())) + alpha.changeValue(TimedPayload.createContent(42)) Thread.sleep(2000) network.forEach { diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt index e396619..628e235 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPush.kt @@ -22,7 +22,7 @@ fun main() { println((it as DbNode).nodeValue) } - alpha.changeValue(TimedPayload(42, System.nanoTime())) + alpha.changeValue(TimedPayload.createContent(42)) Thread.sleep(2000) network.forEach { diff --git a/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt b/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt index 15a4c0d..65882d8 100644 --- a/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt +++ b/src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt @@ -22,7 +22,7 @@ fun main() { println((it as DbNode).nodeValue) } - alpha.changeValue(TimedPayload(42, System.nanoTime())) + alpha.changeValue(TimedPayload.createContent(42)) Thread.sleep(2000) network.forEach { diff --git a/src/main/kotlin/drills/drill09/gossip/QueryNode.kt b/src/main/kotlin/drills/drill09/gossip/QueryNode.kt new file mode 100644 index 0000000..781e7bd --- /dev/null +++ b/src/main/kotlin/drills/drill09/gossip/QueryNode.kt @@ -0,0 +1,74 @@ +package drills.drill09.gossip + +import util.network.epidemic.EpidemicNode +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.FeedbackEpidemicPacket +import util.network.simulator.Address +import util.network.simulator.Network +import kotlin.random.Random + +class QueryNode( + address: Address, + network: Network, + initialValue: T, + bufferSize: Int = 1 +) : EpidemicNode>( + address, network, initialValue, bufferSize +) { + private var queryValue: T = nodeValue + private var random = Random(System.currentTimeMillis()) + + override fun makeInfectionPacket( + receiverAddress: Address, + type: EpidemicPacket.Type + ): EpidemicPacket { + return FeedbackEpidemicPacket( + address, + receiverAddress, + queryValue, + type, + false + ) + } + + override fun receivePushOrReply(packet: FeedbackEpidemicPacket) { + //println(packet) + if (nodeState == State.REMOVED) + return + + if (nodeState == State.SUSCEPTIBLE) { + // println("susceptible") + nodeState = State.INFECTED + + queryValue = packet.payload + sendPush(1) + + if (packet.payload == nodeValue) + println("find on node $address") + } else { + if (packet.isFeedback && random.nextBoolean()) { + nodeState = State.REMOVED + println("removed") + + } + + send( + FeedbackEpidemicPacket( + address, + packet.senderAddress, + queryValue, + EpidemicPacket.Type.PUSH, + true + ) + ) + } + } + + override fun receivePull(packet: FeedbackEpidemicPacket) { + TODO("Not yet implemented") + } + + override fun infect() { + Thread.interrupted() + } +} \ No newline at end of file diff --git a/src/main/kotlin/drills/drill09/gossip/TestPush.kt b/src/main/kotlin/drills/drill09/gossip/TestPush.kt new file mode 100644 index 0000000..0660b2e --- /dev/null +++ b/src/main/kotlin/drills/drill09/gossip/TestPush.kt @@ -0,0 +1,31 @@ +package drills.drill09.gossip + +import util.network.epidemic.packet.EpidemicPacket +import util.network.epidemic.packet.FeedbackEpidemicPacket +import util.network.simulator.Address +import util.network.simulator.Network + + +fun main() { + val network = Network() + + repeat(10) { + QueryNode(Address(it), network, it*2) + } + + val alpha = network.getRandomNode() as QueryNode + val beta = network.getRandomNode() as QueryNode + + val packet = FeedbackEpidemicPacket( + alpha.address, + beta.address, + 14, + EpidemicPacket.Type.PUSH, + false + ) + + alpha.send(packet) + + network.start() +} + diff --git a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt index a45aa35..de5c20d 100644 --- a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt +++ b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt @@ -14,7 +14,7 @@ abstract class EpidemicNode>( @Volatile var nodeValue = initialValue private set - private var nodeState = State.SUSCETIBLE + protected var nodeState = State.SUSCEPTIBLE @Synchronized fun changeValue(value: T) { @@ -68,12 +68,26 @@ abstract class EpidemicNode>( protected abstract fun infect() + protected fun onReceive(packet: P) { + //println(packet.type) + + when (packet.type) { + EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) + EpidemicPacket.Type.PULL -> receivePull(packet) + EpidemicPacket.Type.REPLY -> receivePushOrReply(packet) + EpidemicPacket.Type.PUSHPULL -> { + receivePull(packet) + receivePushOrReply(packet) + } + } + } + override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { return EpidemicCommunicatorFactory(threadGroup) } - private enum class State { - SUSCETIBLE, INFECTED, REMOVED + protected enum class State { + SUSCEPTIBLE, INFECTED, REMOVED } enum class InfectStrategy { @@ -84,22 +98,9 @@ abstract class EpidemicNode>( threadGroup: ThreadGroup ) : Node.Receiver(threadGroup) { override fun run() { - while (!isInterrupted) - onReceive() - } - - private fun onReceive() { - val packet = receive() as P - //println(packet.type) - - when (packet.type) { - EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) - EpidemicPacket.Type.PULL -> receivePull(packet) - EpidemicPacket.Type.REPLY -> receivePushOrReply(packet) - EpidemicPacket.Type.PUSHPULL -> { - receivePull(packet) - receivePushOrReply(packet) - } + while (!isInterrupted) { + val packet = receive() as P + onReceive(packet) } } }