lm-sistemi-software-distrib.../src/main/kotlin/util/network/epidemic/EpidemicNode.kt

132 lines
3.6 KiB
Kotlin

package util.network.epidemic
import util.network.simulator.Address
import util.network.simulator.Network
import util.network.simulator.Node
class EpidemicNode<T>(
address: Address,
network: Network,
initialValue: Content<T>,
bufferSize: Int = 1
) : Node(address, network, bufferSize) {
@Volatile
var nodeValue = initialValue
private set
private var nodeState = State.SUSSCETIBLE
@Synchronized
fun changeValue(value: Content<T>) {
nodeValue = value
}
private fun send(type: EpidemicPacket.Type, numberOfReceiver: Int = 1) {
val toExclude = mutableListOf(address)
repeat(numberOfReceiver) {
var receiverAddress = network.getRandomAddress()
while (toExclude.contains(receiverAddress))
receiverAddress = network.getRandomAddress()
toExclude.add(receiverAddress)
val packet = EpidemicPacket(
address,
receiverAddress,
nodeValue,
type
)
send(packet)
}
}
private fun sendPush(numberOfReceiver: Int = 1) {
send(EpidemicPacket.Type.PUSH, numberOfReceiver)
}
private fun sendPull(numberOfReceiver: Int = 1) {
send(EpidemicPacket.Type.PULL, numberOfReceiver)
}
private fun sendPushPull(numberOfReceiver: Int = 1) {
send(EpidemicPacket.Type.PUSHPULL, numberOfReceiver)
}
private fun receivePushOrReply(packet: EpidemicPacket<Content<T>>) {
val receivedValue = packet.payload
if (receivedValue.timestamp > nodeValue.timestamp)
nodeValue = receivedValue
}
private fun receivePull(packet: EpidemicPacket<Content<T>>) {
val receivedValue = packet.payload
if (nodeValue.timestamp > receivedValue.timestamp)
send(
EpidemicPacket(
address,
packet.senderAddress,
nodeValue,
EpidemicPacket.Type.REPLY
)
)
}
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return EpidemicCommunicatorFactory(threadGroup)
}
private enum class State {
SUSSCETIBLE, INFECTED, REMOVED
}
protected inner class EpidemicReceiver(
threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) {
override fun run() {
while (!isInterrupted)
onReceive()
}
private fun onReceive() {
val packet = receive() as EpidemicPacket<Content<T>>
when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
EpidemicPacket.Type.PULL -> receivePull(packet)
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
EpidemicPacket.Type.PUSHPULL -> {
receivePull(packet)
receivePushOrReply(packet)
}
}
}
}
protected inner class EpidemicSender(
threadGroup: ThreadGroup
) : Node.Sender(threadGroup) {
override fun run() {
while (!isInterrupted)
infect()
}
private fun infect() {
sendPushPull()
sleep(100)
}
}
protected inner class EpidemicCommunicatorFactory(
threadGroup: ThreadGroup
) : Node.CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return EpidemicReceiver(threadGroup)
}
override fun createSender(): Sender {
return EpidemicSender(threadGroup)
}
}
}