diff --git a/src/main/kotlin/util/network/epidemic/Content.kt b/src/main/kotlin/util/network/epidemic/Content.kt new file mode 100644 index 0000000..224b9c5 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/Content.kt @@ -0,0 +1,9 @@ +package util.network.epidemic + +data class Content(val value: T, val timestamp: Long) { + companion object { + fun createContent(value: T): Content { + return Content(value, System.nanoTime()) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/EpidemicNode.kt b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt new file mode 100644 index 0000000..f3ec03c --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/EpidemicNode.kt @@ -0,0 +1,132 @@ +package util.network.epidemic + +import util.network.simulator.Address +import util.network.simulator.Network +import util.network.simulator.Node + +class EpidemicNode( + address: Address, + network: Network, + initialValue: Content, + bufferSize: Int = 1 +) : Node(address, network, bufferSize) { + @Volatile + var nodeValue = initialValue + private set + private var nodeState = State.SUSSCETIBLE + + @Synchronized + fun changeValue(value: Content) { + nodeValue = value + } + + private fun send(type: EpidemicPacket.Type, numberOfReceiver: Int = 1) { + val toExclude = mutableListOf(address) + repeat(numberOfReceiver) { + var receiverAddress = network.getRandomAddress() + while (toExclude.contains(receiverAddress)) + receiverAddress = network.getRandomAddress() + toExclude.add(receiverAddress) + + val packet = EpidemicPacket( + address, + receiverAddress, + nodeValue, + type + ) + + send(packet) + } + } + + private fun sendPush(numberOfReceiver: Int = 1) { + send(EpidemicPacket.Type.PUSH, numberOfReceiver) + } + + private fun sendPull(numberOfReceiver: Int = 1) { + send(EpidemicPacket.Type.PULL, numberOfReceiver) + } + + private fun sendPushPull(numberOfReceiver: Int = 1) { + send(EpidemicPacket.Type.PUSHPULL, numberOfReceiver) + } + + private fun receivePushOrReply(packet: EpidemicPacket>) { + val receivedValue = packet.payload + + if (receivedValue.timestamp > nodeValue.timestamp) + nodeValue = receivedValue + } + + private fun receivePull(packet: EpidemicPacket>) { + val receivedValue = packet.payload + + if (nodeValue.timestamp > receivedValue.timestamp) + send( + EpidemicPacket( + address, + packet.senderAddress, + nodeValue, + EpidemicPacket.Type.REPLY + ) + ) + } + + override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { + return EpidemicCommunicatorFactory(threadGroup) + } + + private enum class State { + SUSSCETIBLE, INFECTED, REMOVED + } + + protected inner class EpidemicReceiver( + threadGroup: ThreadGroup + ) : Node.Receiver(threadGroup) { + override fun run() { + while (!isInterrupted) + onReceive() + } + + private fun onReceive() { + val packet = receive() as EpidemicPacket> + + when (packet.type) { + EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) + EpidemicPacket.Type.PULL -> receivePull(packet) + EpidemicPacket.Type.REPLY -> receivePushOrReply(packet) + EpidemicPacket.Type.PUSHPULL -> { + receivePull(packet) + receivePushOrReply(packet) + } + } + } + } + + protected inner class EpidemicSender( + threadGroup: ThreadGroup + ) : Node.Sender(threadGroup) { + override fun run() { + while (!isInterrupted) + infect() + } + + private fun infect() { + sendPushPull() + sleep(100) + } + } + + protected inner class EpidemicCommunicatorFactory( + threadGroup: ThreadGroup + ) : Node.CommunicatorFactory(threadGroup) { + override fun createReceiver(): Receiver { + return EpidemicReceiver(threadGroup) + } + + override fun createSender(): Sender { + return EpidemicSender(threadGroup) + } + } + +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt b/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt new file mode 100644 index 0000000..125b295 --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/EpidemicPacket.kt @@ -0,0 +1,48 @@ +package util.network.epidemic + +import util.network.simulator.Address +import util.network.simulator.packet.Packet +import util.network.simulator.packet.PayloadPacket + +data class EpidemicPacket>( + override val senderAddress: Address, + override val receiverAddress: Address, + override val payload: T, + val type: Type +) : PayloadPacket { + + fun clone( + senderAddress: Address = this.senderAddress, + receiverAddress: Address = this.receiverAddress, + payload: T = this.payload, + type: Type = this.type + ): EpidemicPacket = + EpidemicPacket(senderAddress, receiverAddress, payload, type) + + enum class Type { + PUSH, PULL, PUSHPULL, REPLY + } + + companion object { + fun createFromNetworkPacket( + packet: Packet, + payload: Content, + type: Type + ): EpidemicPacket> = EpidemicPacket( + packet.senderAddress, + packet.receiverAddress, + payload, + type + ) + + fun createFromPayloadPacket( + packet: PayloadPacket>, + type: Type + ): EpidemicPacket> = EpidemicPacket( + packet.senderAddress, + packet.receiverAddress, + packet.payload, + type + ) + } +} diff --git a/src/main/kotlin/util/network/epidemic/Test.kt b/src/main/kotlin/util/network/epidemic/Test.kt new file mode 100644 index 0000000..99e6d5f --- /dev/null +++ b/src/main/kotlin/util/network/epidemic/Test.kt @@ -0,0 +1,31 @@ +package util.network.epidemic + +import util.network.simulator.Address +import util.network.simulator.Network + + +fun main() { + val network = Network() + + repeat(10) { + EpidemicNode(Address(it), network, Content(0, 0)) + } + + val alpha = network.getRandomNode() as EpidemicNode + + network.start() + + Thread.sleep(1000) + + network.forEach { + println((it as EpidemicNode).nodeValue) + } + + alpha.changeValue(Content(42, System.nanoTime())) + Thread.sleep(2000) + + network.forEach { + println((it as EpidemicNode).nodeValue) + } +} + diff --git a/src/main/kotlin/util/network/simulator/Address.kt b/src/main/kotlin/util/network/simulator/Address.kt new file mode 100644 index 0000000..09a6b7f --- /dev/null +++ b/src/main/kotlin/util/network/simulator/Address.kt @@ -0,0 +1,3 @@ +package util.network.simulator + +inline class Address(val value: Int) diff --git a/src/main/kotlin/util/network/simulator/Network.kt b/src/main/kotlin/util/network/simulator/Network.kt new file mode 100644 index 0000000..c640f22 --- /dev/null +++ b/src/main/kotlin/util/network/simulator/Network.kt @@ -0,0 +1,41 @@ +package util.network.simulator + +import kotlin.random.Random + +class Network(threadGrupName: String = "network") { + private val nodes = mutableMapOf() + private val addresses = mutableListOf
() + val threadGroup = ThreadGroup(threadGrupName) + private val random by lazy { Random(System.currentTimeMillis()) } + + operator fun get(address: Address): Node? { + return nodes[address] + } + + fun addNode(node: Node) { + nodes[node.address] = node + addresses.add(node.address) + } + + fun getRandomAddress() : Address { + val index = random.nextInt(addresses.size) + + return addresses[index] + } + + fun getRandomNode(): Node { + return nodes[getRandomAddress()]!! + } + + fun start() { + nodes.forEach { (_, node) -> + node.start() + } + } + + fun forEach(action: (Node) -> Unit) { + nodes.forEach { (_, node) -> + action(node) + } + } +} diff --git a/src/main/kotlin/util/network/simulator/Node.kt b/src/main/kotlin/util/network/simulator/Node.kt new file mode 100644 index 0000000..e0f03a8 --- /dev/null +++ b/src/main/kotlin/util/network/simulator/Node.kt @@ -0,0 +1,61 @@ +package util.network.simulator + +import util.network.simulator.packet.Packet +import java.util.concurrent.LinkedBlockingQueue + +abstract class Node( + val address: Address, + protected val network: Network, + bufferSize: Int = 1 +) { + private val buffer = LinkedBlockingQueue(bufferSize) + private val receiver: Receiver + private val sender: Sender + + init { + network.addNode(this) + val factory = communicatorFactory(network.threadGroup) + receiver = factory.createReceiver() + sender = factory.createSender() + } + + fun start() { + receiver.start() + sender.start() + } + + private fun setPacket(packet: Packet) { + buffer.put(packet) + } + + private fun getPacket(): Packet { + return buffer.take() + } + + protected fun send(packet: Packet) { + network[packet.receiverAddress]?.setPacket(packet) + } + + protected fun receive(): Packet = getPacket() + + protected abstract fun communicatorFactory( + threadGroup: ThreadGroup + ): CommunicatorFactory + + protected abstract inner class CommunicatorFactory( + protected val threadGroup: ThreadGroup + ) { + abstract fun createReceiver(): Receiver + abstract fun createSender(): Sender + } + + protected abstract inner class Receiver(threadGroup: ThreadGroup) : Thread( + threadGroup, + "${threadGroup.name}-receiver" + ) + + protected abstract inner class Sender(threadGroup: ThreadGroup) : Thread( + threadGroup, + "${threadGroup.name}-sender" + ) +} diff --git a/src/main/kotlin/util/network/simulator/Test.kt b/src/main/kotlin/util/network/simulator/Test.kt new file mode 100644 index 0000000..1344765 --- /dev/null +++ b/src/main/kotlin/util/network/simulator/Test.kt @@ -0,0 +1,51 @@ +package util.network.simulator + +import util.network.simulator.packet.NetworkPacket + +fun main() { + val network = Network() + + repeat(2) { + TestNode(Address(it), network) + } + + network.start() +} + +class TestNode(address: Address, network: Network): Node(address, network) { + override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { + return TestCommunicatorFactory(threadGroup) + } + + private inner class TestReceiver(threadGroup: ThreadGroup): Receiver(threadGroup) { + override fun run() { + if (address != Address(0)) + return + + val packet = receive() + + println(packet) + } + } + + private inner class TestSender(threadGroup: ThreadGroup): Sender(threadGroup) { + override fun run() { + if (address != Address(1)) + return + + val packet = NetworkPacket(address, Address(0)) + + send(packet) + } + } + + private inner class TestCommunicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory(threadGroup) { + override fun createReceiver(): Receiver { + return TestReceiver(threadGroup) + } + + override fun createSender(): Sender { + return TestSender(threadGroup) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/util/network/simulator/packet/Packet.kt b/src/main/kotlin/util/network/simulator/packet/Packet.kt new file mode 100644 index 0000000..5c23d3f --- /dev/null +++ b/src/main/kotlin/util/network/simulator/packet/Packet.kt @@ -0,0 +1,13 @@ +package util.network.simulator.packet + +import util.network.simulator.Address + +interface Packet { + val senderAddress: Address + val receiverAddress: Address +} + +data class NetworkPacket( + override val senderAddress: Address, + override val receiverAddress: Address +) : Packet \ No newline at end of file diff --git a/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt b/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt new file mode 100644 index 0000000..75f1c49 --- /dev/null +++ b/src/main/kotlin/util/network/simulator/packet/PayloadPacket.kt @@ -0,0 +1,5 @@ +package util.network.simulator.packet + +interface PayloadPacket: Packet { + val payload: T +} \ No newline at end of file