lm-sistemi-software-distrib.../src/main/kotlin/util/network/epidemic/EpidemicNode.kt

127 lines
3.4 KiB
Kotlin

package util.network.epidemic
import util.network.epidemic.packet.EpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
import util.network.simulator.Node
abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
address: Address,
network: Network,
initialValue: T,
bufferSize: Int = 1
) : Node(address, network, bufferSize) {
@Volatile
var nodeValue = initialValue
private set
protected var nodeState = State.SUSCEPTIBLE
@Synchronized
fun changeValue(value: T) {
nodeValue = value
}
private fun sendToRandom(
type: EpidemicPacket.Type,
numberOfReceiver: Int = 1
) {
val toExclude = mutableListOf(address)
repeat(numberOfReceiver) {
var receiverAddress = network.getRandomAddress()
while (toExclude.contains(receiverAddress))
receiverAddress = network.getRandomAddress()
toExclude.add(receiverAddress)
send(makeInfectionPacket(receiverAddress, type))
}
}
abstract fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<T>
protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSH,
numberOfReceiver
)
}
protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PULL,
numberOfReceiver
)
}
protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSHPULL,
numberOfReceiver
)
}
protected abstract fun receivePushOrReply(packet: P)
protected abstract fun receivePull(packet: P)
protected abstract fun infect()
protected fun onReceive(packet: P) {
when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
EpidemicPacket.Type.PULL -> receivePull(packet)
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
EpidemicPacket.Type.PUSHPULL -> {
receivePull(packet)
receivePushOrReply(packet)
}
}
}
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return EpidemicCommunicatorFactory(threadGroup)
}
protected enum class State {
SUSCEPTIBLE, INFECTED, REMOVED
}
enum class InfectStrategy {
PUSH, PULL, PUSHPULL
}
protected inner class EpidemicReceiver(
threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) {
override fun run() {
while (!isInterrupted) {
val packet = receive() as P
onReceive(packet)
}
}
}
protected inner class EpidemicSender(
threadGroup: ThreadGroup
) : Node.Sender(threadGroup) {
override fun run() {
while (!isInterrupted)
infect()
}
}
protected inner class EpidemicCommunicatorFactory(
threadGroup: ThreadGroup
) : Node.CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return EpidemicReceiver(threadGroup)
}
override fun createSender(): Sender {
return EpidemicSender(threadGroup)
}
}
}