diff --git a/src/main/kotlin/util/network/epidemic/Content.kt b/src/main/kotlin/util/network/epidemic/Content.kt deleted file mode 100644 index 224b9c5..0000000 --- a/src/main/kotlin/util/network/epidemic/Content.kt +++ /dev/null @@ -1,9 +0,0 @@ -package util.network.epidemic - -data class Content(val value: T, val timestamp: Long) { - companion object { - fun createContent(value: T): Content { - return Content(value, System.nanoTime()) - } - } -} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt index f3ec03c..02d1c23 100644 --- a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt +++ b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt @@ -4,23 +4,26 @@ import util.network.simulator.Address import util.network.simulator.Network import util.network.simulator.Node -class EpidemicNode( +abstract class EpidemicNode>( address: Address, network: Network, - initialValue: Content, + initialValue: T, bufferSize: Int = 1 ) : Node(address, network, bufferSize) { @Volatile var nodeValue = initialValue private set - private var nodeState = State.SUSSCETIBLE + private var nodeState = State.SUSCETIBLE @Synchronized - fun changeValue(value: Content) { + fun changeValue(value: T) { nodeValue = value } - private fun send(type: EpidemicPacket.Type, numberOfReceiver: Int = 1) { + private fun sendToRandom( + packetPrototype: EpidemicPacketPrototype, + numberOfReceiver: Int = 1 + ) { val toExclude = mutableListOf(address) repeat(numberOfReceiver) { var receiverAddress = network.getRandomAddress() @@ -28,56 +31,47 @@ class EpidemicNode( receiverAddress = network.getRandomAddress() toExclude.add(receiverAddress) - val packet = EpidemicPacket( - address, - receiverAddress, - nodeValue, - type - ) - - send(packet) + send(packetPrototype.makePacket(receiverAddress)) } } - private fun sendPush(numberOfReceiver: Int = 1) { - send(EpidemicPacket.Type.PUSH, numberOfReceiver) + abstract fun makeInfectionPrototypePacket( + type: EpidemicPacket.Type + ): EpidemicPacketPrototype + + protected fun sendPush(numberOfReceiver: Int = 1) { + sendToRandom( + makeInfectionPrototypePacket(EpidemicPacket.Type.PUSH), + numberOfReceiver + ) } - private fun sendPull(numberOfReceiver: Int = 1) { - send(EpidemicPacket.Type.PULL, numberOfReceiver) + protected fun sendPull(numberOfReceiver: Int = 1) { + sendToRandom( + makeInfectionPrototypePacket(EpidemicPacket.Type.PULL), + numberOfReceiver + ) } - private fun sendPushPull(numberOfReceiver: Int = 1) { - send(EpidemicPacket.Type.PUSHPULL, numberOfReceiver) + protected fun sendPushPull(numberOfReceiver: Int = 1) { + sendToRandom( + makeInfectionPrototypePacket(EpidemicPacket.Type.PUSHPULL), + numberOfReceiver + ) } - private fun receivePushOrReply(packet: EpidemicPacket>) { - val receivedValue = packet.payload + protected abstract fun receivePushOrReply(packet: P) - if (receivedValue.timestamp > nodeValue.timestamp) - nodeValue = receivedValue - } + protected abstract fun receivePull(packet: P) - private fun receivePull(packet: EpidemicPacket>) { - val receivedValue = packet.payload - - if (nodeValue.timestamp > receivedValue.timestamp) - send( - EpidemicPacket( - address, - packet.senderAddress, - nodeValue, - EpidemicPacket.Type.REPLY - ) - ) - } + protected abstract fun infect() override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { return EpidemicCommunicatorFactory(threadGroup) } private enum class State { - SUSSCETIBLE, INFECTED, REMOVED + SUSCETIBLE, INFECTED, REMOVED } protected inner class EpidemicReceiver( @@ -89,7 +83,8 @@ class EpidemicNode( } private fun onReceive() { - val packet = receive() as EpidemicPacket> + val packet = receive() as P + //println(packet.type) when (packet.type) { EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) @@ -110,11 +105,6 @@ class EpidemicNode( while (!isInterrupted) infect() } - - private fun infect() { - sendPushPull() - sleep(100) - } } protected inner class EpidemicCommunicatorFactory( diff --git a/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt index 125b295..5429f2c 100644 --- a/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt +++ b/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt @@ -3,8 +3,9 @@ package util.network.epidemic import util.network.simulator.Address import util.network.simulator.packet.Packet import util.network.simulator.packet.PayloadPacket +import util.network.simulator.packet.PayloadPacketPrototype -data class EpidemicPacket>( +open class EpidemicPacket( override val senderAddress: Address, override val receiverAddress: Address, override val payload: T, @@ -26,9 +27,9 @@ data class EpidemicPacket>( companion object { fun createFromNetworkPacket( packet: Packet, - payload: Content, + payload: TimePayload, type: Type - ): EpidemicPacket> = EpidemicPacket( + ): EpidemicPacket> = EpidemicPacket( packet.senderAddress, packet.receiverAddress, payload, @@ -36,9 +37,9 @@ data class EpidemicPacket>( ) fun createFromPayloadPacket( - packet: PayloadPacket>, + packet: PayloadPacket>, type: Type - ): EpidemicPacket> = EpidemicPacket( + ): EpidemicPacket> = EpidemicPacket( packet.senderAddress, packet.receiverAddress, packet.payload, @@ -46,3 +47,13 @@ data class EpidemicPacket>( ) } } + +open class EpidemicPacketPrototype( + override val senderAddress: Address, + override val payload: T, + val type: EpidemicPacket.Type +) : PayloadPacketPrototype { + open fun makePacket(receiverAddress: Address): EpidemicPacket { + return EpidemicPacket(senderAddress, receiverAddress, payload, type) + } +} diff --git a/src/main/kotlin/util/network/epidemic/ReplayPayload.kt b/src/main/kotlin/util/network/epidemic/ReplayPayload.kt new file mode 100644 index 0000000..d4532f1 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/ReplayPayload.kt @@ -0,0 +1,6 @@ +package util.network.epidemic + +data class ReplayPayload( + val value: T, + val isReplay: Boolean = true +) \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/Test.kt b/src/main/kotlin/util/network/epidemic/Test.kt index 99e6d5f..625f0de 100644 --- a/src/main/kotlin/util/network/epidemic/Test.kt +++ b/src/main/kotlin/util/network/epidemic/Test.kt @@ -1,5 +1,6 @@ package util.network.epidemic +import util.network.epidemic.consinstence.DbNode import util.network.simulator.Address import util.network.simulator.Network @@ -8,24 +9,24 @@ fun main() { val network = Network() repeat(10) { - EpidemicNode(Address(it), network, Content(0, 0)) + DbNode(Address(it), network, TimePayload(0, 0)) } - val alpha = network.getRandomNode() as EpidemicNode + val alpha = network.getRandomNode() as DbNode network.start() Thread.sleep(1000) network.forEach { - println((it as EpidemicNode).nodeValue) + println((it as DbNode).nodeValue) } - alpha.changeValue(Content(42, System.nanoTime())) + alpha.changeValue(TimePayload(42, System.nanoTime())) Thread.sleep(2000) network.forEach { - println((it as EpidemicNode).nodeValue) + println((it as DbNode).nodeValue) } } diff --git a/src/main/kotlin/util/network/epidemic/TimePayload.kt b/src/main/kotlin/util/network/epidemic/TimePayload.kt new file mode 100644 index 0000000..209ff21 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/TimePayload.kt @@ -0,0 +1,9 @@ +package util.network.epidemic + +data class TimePayload(val value: T, val timestamp: Long) { + companion object { + fun createContent(value: T): TimePayload { + return TimePayload(value, System.nanoTime()) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/consinstence/AverageNode.kt b/src/main/kotlin/util/network/epidemic/consinstence/AverageNode.kt new file mode 100644 index 0000000..e6089cc --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/consinstence/AverageNode.kt @@ -0,0 +1,59 @@ +package util.network.epidemic.consinstence + +import util.network.epidemic.EpidemicNode +import util.network.epidemic.EpidemicPacket +import util.network.epidemic.EpidemicPacketPrototype +import util.network.epidemic.replay.ReplayPacket +import util.network.epidemic.replay.ReplayPacketPrototype +import util.network.simulator.Address +import util.network.simulator.Network + +class AverageNode( + address: Address, + network: Network, + initialValue: Double, + bufferSize: Int = 1 +) : EpidemicNode>(address, network, initialValue, bufferSize) { + override fun receivePushOrReply(packet: ReplayPacket) { + val payload = packet.payload + + if (packet.isReplay) + changeValue(payload) + else { + val newValue = (nodeValue + payload) / 2 + + changeValue(newValue) + + send( + ReplayPacket( + address, + packet.senderAddress, + newValue, + EpidemicPacket.Type.PUSH, + true + ) + ) + + sendPush() + } + } + + override fun receivePull(packet: ReplayPacket) { + TODO("Not yet implemented") + } + + override fun infect() { + Thread.interrupted() + } + + override fun makeInfectionPrototypePacket( + type: EpidemicPacket.Type + ): EpidemicPacketPrototype { + return ReplayPacketPrototype( + address, + nodeValue, + type, + false + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/consinstence/DbNode.kt b/src/main/kotlin/util/network/epidemic/consinstence/DbNode.kt new file mode 100644 index 0000000..5a06bff --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/consinstence/DbNode.kt @@ -0,0 +1,52 @@ +package util.network.epidemic.consinstence + +import util.network.epidemic.TimePayload +import util.network.epidemic.EpidemicNode +import util.network.epidemic.EpidemicPacket +import util.network.epidemic.EpidemicPacketPrototype +import util.network.epidemic.replay.ReplayPacketPrototype +import util.network.simulator.Address +import util.network.simulator.Network + +class DbNode( + address: Address, + network: Network, + initialValue: TimePayload, + bufferSize: Int = 1 +) : EpidemicNode, EpidemicPacket>>( + address, network, initialValue, bufferSize +) { + override fun receivePushOrReply(packet: EpidemicPacket>) { + val receivedValue = packet.payload + + if (nodeValue.timestamp < receivedValue.timestamp) + changeValue(receivedValue) + } + + override fun receivePull(packet: EpidemicPacket>) { + val receivedValue = packet.payload + + if (nodeValue.timestamp > receivedValue.timestamp) + send( + EpidemicPacket( + address, + packet.senderAddress, + nodeValue, + EpidemicPacket.Type.REPLY + ) + ) + } + + override fun infect() { + sendPushPull() + Thread.sleep(100) + } + + override fun makeInfectionPrototypePacket(type: EpidemicPacket.Type): EpidemicPacketPrototype> { + return EpidemicPacketPrototype( + address, + nodeValue, + type + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/consinstence/Test.kt b/src/main/kotlin/util/network/epidemic/consinstence/Test.kt new file mode 100644 index 0000000..9db4cb1 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/consinstence/Test.kt @@ -0,0 +1,35 @@ +package util.network.epidemic.consinstence + +import util.network.epidemic.EpidemicPacket +import util.network.epidemic.replay.ReplayPacket +import util.network.simulator.Address +import util.network.simulator.Network + +fun main() { + val network = Network() + + repeat(17) { + AverageNode(Address(it), network, 0.0) + } + + val alpha = network.getRandomNode() as AverageNode + val beta = network.getRandomNode() as AverageNode + + val packet = ReplayPacket( + alpha.address, + beta.address, + 1, + EpidemicPacket.Type.PUSH, + false + ) + + alpha.send(packet) + + network.start() + + Thread.sleep(2000) + + network.forEach { + println(1.0 / (it as AverageNode).nodeValue) + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt b/src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt new file mode 100644 index 0000000..4d11798 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/replay/ReplayPacket.kt @@ -0,0 +1,30 @@ +package util.network.epidemic.replay + +import util.network.epidemic.EpidemicPacket +import util.network.epidemic.EpidemicPacketPrototype +import util.network.simulator.Address + +class ReplayPacket( + senderAddress: Address, + receiverAddress: Address, + payload: T, + type: Type, + val isReplay: Boolean +) : EpidemicPacket(senderAddress, receiverAddress, payload, type) + +class ReplayPacketPrototype ( + senderAddress: Address, + payload: T, + type: EpidemicPacket.Type, + val isReplay: Boolean +) : EpidemicPacketPrototype(senderAddress, payload, type) { + override fun makePacket(receiverAddress: Address): EpidemicPacket { + return ReplayPacket( + senderAddress, + receiverAddress, + payload, + type, + isReplay + ) + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/simulator/Node.kt b/src/main/kotlin/util/network/simulator/Node.kt index e0f03a8..11898a1 100644 --- a/src/main/kotlin/util/network/simulator/Node.kt +++ b/src/main/kotlin/util/network/simulator/Node.kt @@ -32,7 +32,7 @@ abstract class Node( return buffer.take() } - protected fun send(packet: Packet) { + fun send(packet: Packet) { network[packet.receiverAddress]?.setPacket(packet) } diff --git a/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt b/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt new file mode 100644 index 0000000..f1d9e37 --- /dev/null +++ b/src/main/kotlin/util/network/simulator/packet/NetworkPacket.kt @@ -0,0 +1,16 @@ +package util.network.simulator.packet + +import util.network.simulator.Address + +data class NetworkPacket( + override val senderAddress: Address, + override val receiverAddress: Address +) : Packet + +data class NetworkPacketPrototype( + override val senderAddress: Address +): PacketPrototype { + fun makePacket(receiverAddress: Address): NetworkPacket { + return NetworkPacket(senderAddress, receiverAddress) + } +} diff --git a/src/main/kotlin/util/network/simulator/packet/Packet.kt b/src/main/kotlin/util/network/simulator/packet/Packet.kt index 5c23d3f..8083556 100644 --- a/src/main/kotlin/util/network/simulator/packet/Packet.kt +++ b/src/main/kotlin/util/network/simulator/packet/Packet.kt @@ -7,7 +7,6 @@ interface Packet { val receiverAddress: Address } -data class NetworkPacket( - override val senderAddress: Address, - override val receiverAddress: Address -) : Packet \ No newline at end of file +interface PacketPrototype { + val senderAddress: Address +} diff --git a/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt b/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt index 75f1c49..bc23b2c 100644 --- a/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt +++ b/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt @@ -2,4 +2,8 @@ package util.network.simulator.packet interface PayloadPacket: Packet { val payload: T +} + +interface PayloadPacketPrototype : PacketPrototype { + val payload: T } \ No newline at end of file