123 lines
3.3 KiB
Kotlin
123 lines
3.3 KiB
Kotlin
package util.network.epidemic
|
|
|
|
import util.network.simulator.Address
|
|
import util.network.simulator.Network
|
|
import util.network.simulator.Node
|
|
|
|
abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
|
|
address: Address,
|
|
network: Network,
|
|
initialValue: T,
|
|
bufferSize: Int = 1
|
|
) : Node(address, network, bufferSize) {
|
|
@Volatile
|
|
var nodeValue = initialValue
|
|
private set
|
|
private var nodeState = State.SUSCETIBLE
|
|
|
|
@Synchronized
|
|
fun changeValue(value: T) {
|
|
nodeValue = value
|
|
}
|
|
|
|
private fun sendToRandom(
|
|
type: EpidemicPacket.Type,
|
|
numberOfReceiver: Int = 1
|
|
) {
|
|
val toExclude = mutableListOf(address)
|
|
repeat(numberOfReceiver) {
|
|
var receiverAddress = network.getRandomAddress()
|
|
while (toExclude.contains(receiverAddress))
|
|
receiverAddress = network.getRandomAddress()
|
|
toExclude.add(receiverAddress)
|
|
|
|
send(makeInfectionPacket(receiverAddress, type))
|
|
}
|
|
}
|
|
|
|
abstract fun makeInfectionPacket(
|
|
receiverAddress: Address,
|
|
type: EpidemicPacket.Type
|
|
): EpidemicPacket<T>
|
|
|
|
protected fun sendPush(numberOfReceiver: Int = 1) {
|
|
sendToRandom(
|
|
EpidemicPacket.Type.PUSH,
|
|
numberOfReceiver
|
|
)
|
|
}
|
|
|
|
protected fun sendPull(numberOfReceiver: Int = 1) {
|
|
sendToRandom(
|
|
EpidemicPacket.Type.PULL,
|
|
numberOfReceiver
|
|
)
|
|
}
|
|
|
|
protected fun sendPushPull(numberOfReceiver: Int = 1) {
|
|
sendToRandom(
|
|
EpidemicPacket.Type.PUSHPULL,
|
|
numberOfReceiver
|
|
)
|
|
}
|
|
|
|
protected abstract fun receivePushOrReply(packet: P)
|
|
|
|
protected abstract fun receivePull(packet: P)
|
|
|
|
protected abstract fun infect()
|
|
|
|
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
|
|
return EpidemicCommunicatorFactory(threadGroup)
|
|
}
|
|
|
|
private enum class State {
|
|
SUSCETIBLE, INFECTED, REMOVED
|
|
}
|
|
|
|
protected inner class EpidemicReceiver(
|
|
threadGroup: ThreadGroup
|
|
) : Node.Receiver(threadGroup) {
|
|
override fun run() {
|
|
while (!isInterrupted)
|
|
onReceive()
|
|
}
|
|
|
|
private fun onReceive() {
|
|
val packet = receive() as P
|
|
//println(packet.type)
|
|
|
|
when (packet.type) {
|
|
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
|
|
EpidemicPacket.Type.PULL -> receivePull(packet)
|
|
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
|
|
EpidemicPacket.Type.PUSHPULL -> {
|
|
receivePull(packet)
|
|
receivePushOrReply(packet)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
protected inner class EpidemicSender(
|
|
threadGroup: ThreadGroup
|
|
) : Node.Sender(threadGroup) {
|
|
override fun run() {
|
|
while (!isInterrupted)
|
|
infect()
|
|
}
|
|
}
|
|
|
|
protected inner class EpidemicCommunicatorFactory(
|
|
threadGroup: ThreadGroup
|
|
) : Node.CommunicatorFactory(threadGroup) {
|
|
override fun createReceiver(): Receiver {
|
|
return EpidemicReceiver(threadGroup)
|
|
}
|
|
|
|
override fun createSender(): Sender {
|
|
return EpidemicSender(threadGroup)
|
|
}
|
|
}
|
|
|
|
} |