Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
b50890b44f | |||
711da44f53 | |||
581d9365ee | |||
32ca1dd9a0 | |||
5688bd653a | |||
9690167176 |
107
src/main/kotlin/drills/drill09/average/AverageNode.kt
Normal file
107
src/main/kotlin/drills/drill09/average/AverageNode.kt
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
package drills.drill09.average
|
||||||
|
|
||||||
|
import util.network.epidemic.EpidemicNode
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
class AverageNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: Double,
|
||||||
|
val infectStrategy: InfectStrategy = InfectStrategy.PUSH,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) : EpidemicNode<Double, FeedbackEpidemicPacket<Double>>(
|
||||||
|
address, network, initialValue, bufferSize
|
||||||
|
) {
|
||||||
|
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<Double>) {
|
||||||
|
val payload = packet.payload
|
||||||
|
|
||||||
|
if (packet.isFeedback)
|
||||||
|
changeValue(payload)
|
||||||
|
else {
|
||||||
|
computeAverage(payload)
|
||||||
|
sendFeedback(packet.senderAddress)
|
||||||
|
spreadNewValue()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun computeAverage(payload: Double) {
|
||||||
|
val newValue = (nodeValue + payload) / 2
|
||||||
|
changeValue(newValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun sendFeedback(receiverAddress: Address) {
|
||||||
|
send(
|
||||||
|
FeedbackEpidemicPacket(
|
||||||
|
address,
|
||||||
|
receiverAddress,
|
||||||
|
nodeValue,
|
||||||
|
if (infectStrategy == InfectStrategy.PUSH)
|
||||||
|
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
|
||||||
|
true
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun spreadNewValue() {
|
||||||
|
when (infectStrategy) {
|
||||||
|
InfectStrategy.PUSH -> sendPushToRandom()
|
||||||
|
InfectStrategy.PULL -> sendPullToRandom()
|
||||||
|
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun receivePull(packet: FeedbackEpidemicPacket<Double>) {
|
||||||
|
if (packet.type == EpidemicPacket.Type.PUSHPULL)
|
||||||
|
return
|
||||||
|
|
||||||
|
send(
|
||||||
|
makeInfectionPacket(
|
||||||
|
packet.senderAddress,
|
||||||
|
EpidemicPacket.Type.REPLY
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun infect() {
|
||||||
|
Thread.interrupted()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun makeInfectionPacket(
|
||||||
|
receiverAddress: Address,
|
||||||
|
type: EpidemicPacket.Type
|
||||||
|
): EpidemicPacket<Double> {
|
||||||
|
return FeedbackEpidemicPacket(
|
||||||
|
address,
|
||||||
|
receiverAddress,
|
||||||
|
nodeValue,
|
||||||
|
type,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
fun createPushNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: Double,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) = AverageNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
|
||||||
|
|
||||||
|
fun createPullNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: Double,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) = AverageNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
|
||||||
|
|
||||||
|
fun createPushPullNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: Double,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
|
||||||
|
}
|
||||||
|
}
|
35
src/main/kotlin/drills/drill09/average/TestPull.kt
Normal file
35
src/main/kotlin/drills/drill09/average/TestPull.kt
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package drills.drill09.average
|
||||||
|
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(17) {
|
||||||
|
AverageNode.createPullNode(Address(it), network, 0.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as AverageNode
|
||||||
|
val beta = network.getRandomNode() as AverageNode
|
||||||
|
|
||||||
|
val packet = FeedbackEpidemicPacket(
|
||||||
|
alpha.address,
|
||||||
|
beta.address,
|
||||||
|
1,
|
||||||
|
EpidemicPacket.Type.REPLY,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
|
||||||
|
alpha.send(packet)
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println(1.0 / (it as AverageNode).nodeValue)
|
||||||
|
}
|
||||||
|
}
|
35
src/main/kotlin/drills/drill09/average/TestPush.kt
Normal file
35
src/main/kotlin/drills/drill09/average/TestPush.kt
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package drills.drill09.average
|
||||||
|
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(17) {
|
||||||
|
AverageNode.createPushNode(Address(it), network, 0.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as AverageNode
|
||||||
|
val beta = network.getRandomNode() as AverageNode
|
||||||
|
|
||||||
|
val packet = FeedbackEpidemicPacket(
|
||||||
|
alpha.address,
|
||||||
|
beta.address,
|
||||||
|
1,
|
||||||
|
EpidemicPacket.Type.PUSH,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
|
||||||
|
alpha.send(packet)
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println(1.0 / (it as AverageNode).nodeValue)
|
||||||
|
}
|
||||||
|
}
|
35
src/main/kotlin/drills/drill09/average/TestPushPull.kt
Normal file
35
src/main/kotlin/drills/drill09/average/TestPushPull.kt
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package drills.drill09.average
|
||||||
|
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(17) {
|
||||||
|
AverageNode.createPushPullNode(Address(it), network, 0.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as AverageNode
|
||||||
|
val beta = network.getRandomNode() as AverageNode
|
||||||
|
|
||||||
|
val packet = FeedbackEpidemicPacket(
|
||||||
|
alpha.address,
|
||||||
|
beta.address,
|
||||||
|
1,
|
||||||
|
EpidemicPacket.Type.REPLY,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
|
||||||
|
alpha.send(packet)
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println(1.0 / (it as AverageNode).nodeValue)
|
||||||
|
}
|
||||||
|
}
|
84
src/main/kotlin/drills/drill09/dbreplica/DbNode.kt
Normal file
84
src/main/kotlin/drills/drill09/dbreplica/DbNode.kt
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package drills.drill09.dbreplica
|
||||||
|
|
||||||
|
import util.network.epidemic.TimedPayload
|
||||||
|
import util.network.epidemic.EpidemicNode
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.TimedEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
class DbNode<T>(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: TimedPayload<T>,
|
||||||
|
val infectStrategy: InfectStrategy = InfectStrategy.PUSHPULL,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) : EpidemicNode<TimedPayload<T>, TimedEpidemicPacket<T>>(
|
||||||
|
address, network, initialValue, bufferSize
|
||||||
|
) {
|
||||||
|
override fun receivePushOrReply(packet: TimedEpidemicPacket<T>) {
|
||||||
|
val receivedValue = packet.payload
|
||||||
|
|
||||||
|
if (nodeValue.timestamp < receivedValue.timestamp)
|
||||||
|
changeValue(receivedValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun receivePull(packet: TimedEpidemicPacket<T>) {
|
||||||
|
val receivedValue = packet.payload
|
||||||
|
|
||||||
|
if (nodeValue.timestamp > receivedValue.timestamp)
|
||||||
|
send(
|
||||||
|
makeInfectionPacket(
|
||||||
|
packet.senderAddress,
|
||||||
|
EpidemicPacket.Type.REPLY
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun infect() {
|
||||||
|
when (infectStrategy) {
|
||||||
|
InfectStrategy.PUSH -> sendPushToRandom()
|
||||||
|
InfectStrategy.PULL -> sendPullToRandom()
|
||||||
|
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(DELTA)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun makeInfectionPacket(
|
||||||
|
receiverAddress: Address,
|
||||||
|
type: EpidemicPacket.Type
|
||||||
|
): EpidemicPacket<TimedPayload<T>> {
|
||||||
|
return TimedEpidemicPacket(
|
||||||
|
address,
|
||||||
|
receiverAddress,
|
||||||
|
nodeValue,
|
||||||
|
type
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
const val DELTA = 100L
|
||||||
|
|
||||||
|
fun <T> createPushNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: TimedPayload<T>,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) = DbNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
|
||||||
|
|
||||||
|
fun <T> createPullNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: TimedPayload<T>,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) = DbNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
|
||||||
|
|
||||||
|
fun <T> createPushPullNode(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: TimedPayload<T>,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
|
||||||
|
}
|
||||||
|
}
|
30
src/main/kotlin/drills/drill09/dbreplica/TestPull.kt
Normal file
30
src/main/kotlin/drills/drill09/dbreplica/TestPull.kt
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package drills.drill09.dbreplica
|
||||||
|
|
||||||
|
import util.network.epidemic.TimedPayload
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(10) {
|
||||||
|
DbNode.createPullNode(Address(it), network, TimedPayload(0, 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as DbNode<Int>
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
|
||||||
|
Thread.sleep(1000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println((it as DbNode<Int>).nodeValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
alpha.changeValue(TimedPayload.createContent(42))
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println((it as DbNode<Int>).nodeValue)
|
||||||
|
}
|
||||||
|
}
|
30
src/main/kotlin/drills/drill09/dbreplica/TestPush.kt
Normal file
30
src/main/kotlin/drills/drill09/dbreplica/TestPush.kt
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package drills.drill09.dbreplica
|
||||||
|
|
||||||
|
import util.network.epidemic.TimedPayload
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(10) {
|
||||||
|
DbNode.createPushNode(Address(it), network, TimedPayload(0, 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as DbNode<Int>
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
|
||||||
|
Thread.sleep(1000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println((it as DbNode<Int>).nodeValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
alpha.changeValue(TimedPayload.createContent(42))
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println((it as DbNode<Int>).nodeValue)
|
||||||
|
}
|
||||||
|
}
|
30
src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt
Normal file
30
src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package drills.drill09.dbreplica
|
||||||
|
|
||||||
|
import util.network.epidemic.TimedPayload
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(10) {
|
||||||
|
DbNode.createPushPullNode(Address(it), network, TimedPayload(0, 0))
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as DbNode<Int>
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
|
||||||
|
Thread.sleep(1000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println((it as DbNode<Int>).nodeValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
alpha.changeValue(TimedPayload.createContent(42))
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
network.forEach {
|
||||||
|
println((it as DbNode<Int>).nodeValue)
|
||||||
|
}
|
||||||
|
}
|
76
src/main/kotlin/drills/drill09/gossip/QueryNode.kt
Normal file
76
src/main/kotlin/drills/drill09/gossip/QueryNode.kt
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
package drills.drill09.gossip
|
||||||
|
|
||||||
|
import util.network.epidemic.EpidemicNode
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
import kotlin.random.Random
|
||||||
|
|
||||||
|
class QueryNode<T>(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: T,
|
||||||
|
private val demotivatingFactor: Int = 10,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
|
||||||
|
address, network, initialValue, bufferSize
|
||||||
|
) {
|
||||||
|
private var queryValue: T = nodeValue
|
||||||
|
private var random = Random(System.currentTimeMillis())
|
||||||
|
|
||||||
|
override fun makeInfectionPacket(
|
||||||
|
receiverAddress: Address,
|
||||||
|
type: EpidemicPacket.Type
|
||||||
|
): EpidemicPacket<T> {
|
||||||
|
return FeedbackEpidemicPacket(
|
||||||
|
address,
|
||||||
|
receiverAddress,
|
||||||
|
queryValue,
|
||||||
|
type,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
|
||||||
|
when (nodeState) {
|
||||||
|
State.REMOVED -> return
|
||||||
|
State.INFECTED -> whenInfected(packet)
|
||||||
|
State.SUSCEPTIBLE -> whenSusceptible(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun whenSusceptible(packet: FeedbackEpidemicPacket<T>) {
|
||||||
|
nodeState = State.INFECTED
|
||||||
|
queryValue = packet.payload
|
||||||
|
|
||||||
|
sendPushToRandom(2)
|
||||||
|
|
||||||
|
if (packet.payload == nodeValue)
|
||||||
|
println("find ${packet.payload} on node $address")
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun whenInfected(packet: FeedbackEpidemicPacket<T>) {
|
||||||
|
if (!packet.isFeedback)
|
||||||
|
send(
|
||||||
|
FeedbackEpidemicPacket(
|
||||||
|
address,
|
||||||
|
packet.senderAddress,
|
||||||
|
queryValue,
|
||||||
|
EpidemicPacket.Type.PUSH,
|
||||||
|
true
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor)
|
||||||
|
nodeState = State.REMOVED
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun receivePull(packet: FeedbackEpidemicPacket<T>) {
|
||||||
|
TODO("Not yet implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun infect() {
|
||||||
|
Thread.interrupted()
|
||||||
|
}
|
||||||
|
}
|
29
src/main/kotlin/drills/drill09/gossip/TestPush.kt
Normal file
29
src/main/kotlin/drills/drill09/gossip/TestPush.kt
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package drills.drill09.gossip
|
||||||
|
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(20) {
|
||||||
|
QueryNode(Address(it), network, it * 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
val alpha = network.getRandomNode() as QueryNode<Int>
|
||||||
|
val beta = network.getRandomNode() as QueryNode<Int>
|
||||||
|
|
||||||
|
val packet = FeedbackEpidemicPacket(
|
||||||
|
alpha.address,
|
||||||
|
beta.address,
|
||||||
|
24,
|
||||||
|
EpidemicPacket.Type.PUSH,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
|
||||||
|
alpha.send(packet)
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
}
|
126
src/main/kotlin/util/network/epidemic/EpidemicNode.kt
Normal file
126
src/main/kotlin/util/network/epidemic/EpidemicNode.kt
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
package util.network.epidemic
|
||||||
|
|
||||||
|
import util.network.epidemic.packet.EpidemicPacket
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.Network
|
||||||
|
import util.network.simulator.Node
|
||||||
|
|
||||||
|
abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
|
||||||
|
address: Address,
|
||||||
|
network: Network,
|
||||||
|
initialValue: T,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) : Node(address, network, bufferSize) {
|
||||||
|
@Volatile
|
||||||
|
var nodeValue = initialValue
|
||||||
|
private set
|
||||||
|
protected var nodeState = State.SUSCEPTIBLE
|
||||||
|
|
||||||
|
@Synchronized
|
||||||
|
fun changeValue(value: T) {
|
||||||
|
nodeValue = value
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun sendToRandom(
|
||||||
|
type: EpidemicPacket.Type,
|
||||||
|
numberOfReceiver: Int = 1
|
||||||
|
) {
|
||||||
|
val toExclude = mutableListOf(address)
|
||||||
|
repeat(numberOfReceiver) {
|
||||||
|
var receiverAddress = network.getRandomAddress()
|
||||||
|
while (toExclude.contains(receiverAddress))
|
||||||
|
receiverAddress = network.getRandomAddress()
|
||||||
|
toExclude.add(receiverAddress)
|
||||||
|
|
||||||
|
send(makeInfectionPacket(receiverAddress, type))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract fun makeInfectionPacket(
|
||||||
|
receiverAddress: Address,
|
||||||
|
type: EpidemicPacket.Type
|
||||||
|
): EpidemicPacket<T>
|
||||||
|
|
||||||
|
protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
|
||||||
|
sendToRandom(
|
||||||
|
EpidemicPacket.Type.PUSH,
|
||||||
|
numberOfReceiver
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
|
||||||
|
sendToRandom(
|
||||||
|
EpidemicPacket.Type.PULL,
|
||||||
|
numberOfReceiver
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
|
||||||
|
sendToRandom(
|
||||||
|
EpidemicPacket.Type.PUSHPULL,
|
||||||
|
numberOfReceiver
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract fun receivePushOrReply(packet: P)
|
||||||
|
|
||||||
|
protected abstract fun receivePull(packet: P)
|
||||||
|
|
||||||
|
protected abstract fun infect()
|
||||||
|
|
||||||
|
protected fun onReceive(packet: P) {
|
||||||
|
when (packet.type) {
|
||||||
|
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
|
||||||
|
EpidemicPacket.Type.PULL -> receivePull(packet)
|
||||||
|
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
|
||||||
|
EpidemicPacket.Type.PUSHPULL -> {
|
||||||
|
receivePull(packet)
|
||||||
|
receivePushOrReply(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
|
||||||
|
return EpidemicCommunicatorFactory(threadGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected enum class State {
|
||||||
|
SUSCEPTIBLE, INFECTED, REMOVED
|
||||||
|
}
|
||||||
|
|
||||||
|
enum class InfectStrategy {
|
||||||
|
PUSH, PULL, PUSHPULL
|
||||||
|
}
|
||||||
|
|
||||||
|
protected inner class EpidemicReceiver(
|
||||||
|
threadGroup: ThreadGroup
|
||||||
|
) : Node.Receiver(threadGroup) {
|
||||||
|
override fun run() {
|
||||||
|
while (!isInterrupted) {
|
||||||
|
val packet = receive() as P
|
||||||
|
onReceive(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected inner class EpidemicSender(
|
||||||
|
threadGroup: ThreadGroup
|
||||||
|
) : Node.Sender(threadGroup) {
|
||||||
|
override fun run() {
|
||||||
|
while (!isInterrupted)
|
||||||
|
infect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected inner class EpidemicCommunicatorFactory(
|
||||||
|
threadGroup: ThreadGroup
|
||||||
|
) : Node.CommunicatorFactory(threadGroup) {
|
||||||
|
override fun createReceiver(): Receiver {
|
||||||
|
return EpidemicReceiver(threadGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun createSender(): Sender {
|
||||||
|
return EpidemicSender(threadGroup)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
9
src/main/kotlin/util/network/epidemic/TimedPayload.kt
Normal file
9
src/main/kotlin/util/network/epidemic/TimedPayload.kt
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
package util.network.epidemic
|
||||||
|
|
||||||
|
data class TimedPayload<T>(val value: T, val timestamp: Long) {
|
||||||
|
companion object {
|
||||||
|
fun <T> createContent(value: T): TimedPayload<T> {
|
||||||
|
return TimedPayload(value, System.nanoTime())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,11 @@
|
|||||||
|
package util.network.epidemic.packet
|
||||||
|
|
||||||
|
import util.network.simulator.packet.PayloadPacket
|
||||||
|
|
||||||
|
interface EpidemicPacket<T> : PayloadPacket<T> {
|
||||||
|
val type: Type
|
||||||
|
|
||||||
|
enum class Type {
|
||||||
|
PUSH, PULL, PUSHPULL, REPLY
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,12 @@
|
|||||||
|
package util.network.epidemic.packet
|
||||||
|
|
||||||
|
import util.network.simulator.Address
|
||||||
|
import util.network.simulator.packet.FeedbackPacket
|
||||||
|
|
||||||
|
data class FeedbackEpidemicPacket<T>(
|
||||||
|
override val senderAddress: Address,
|
||||||
|
override val receiverAddress: Address,
|
||||||
|
override val payload: T,
|
||||||
|
override val type: EpidemicPacket.Type,
|
||||||
|
override val isFeedback: Boolean
|
||||||
|
) : EpidemicPacket<T>, FeedbackPacket
|
@ -0,0 +1,11 @@
|
|||||||
|
package util.network.epidemic.packet
|
||||||
|
|
||||||
|
import util.network.epidemic.TimedPayload
|
||||||
|
import util.network.simulator.Address
|
||||||
|
|
||||||
|
data class TimedEpidemicPacket<T>(
|
||||||
|
override val senderAddress: Address,
|
||||||
|
override val receiverAddress: Address,
|
||||||
|
override val payload: TimedPayload<T>,
|
||||||
|
override val type: EpidemicPacket.Type
|
||||||
|
) : EpidemicPacket<TimedPayload<T>>
|
3
src/main/kotlin/util/network/simulator/Address.kt
Normal file
3
src/main/kotlin/util/network/simulator/Address.kt
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
package util.network.simulator
|
||||||
|
|
||||||
|
inline class Address(val value: Int)
|
41
src/main/kotlin/util/network/simulator/Network.kt
Normal file
41
src/main/kotlin/util/network/simulator/Network.kt
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package util.network.simulator
|
||||||
|
|
||||||
|
import kotlin.random.Random
|
||||||
|
|
||||||
|
class Network(threadGrupName: String = "network") {
|
||||||
|
private val nodes = mutableMapOf<Address, Node>()
|
||||||
|
private val addresses = mutableListOf<Address>()
|
||||||
|
val threadGroup = ThreadGroup(threadGrupName)
|
||||||
|
private val random by lazy { Random(System.currentTimeMillis()) }
|
||||||
|
|
||||||
|
operator fun get(address: Address): Node? {
|
||||||
|
return nodes[address]
|
||||||
|
}
|
||||||
|
|
||||||
|
fun addNode(node: Node) {
|
||||||
|
nodes[node.address] = node
|
||||||
|
addresses.add(node.address)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun getRandomAddress(): Address {
|
||||||
|
val index = random.nextInt(addresses.size)
|
||||||
|
|
||||||
|
return addresses[index]
|
||||||
|
}
|
||||||
|
|
||||||
|
fun getRandomNode(): Node {
|
||||||
|
return nodes[getRandomAddress()]!!
|
||||||
|
}
|
||||||
|
|
||||||
|
fun start() {
|
||||||
|
nodes.forEach { (_, node) ->
|
||||||
|
node.start()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun forEach(action: (Node) -> Unit) {
|
||||||
|
nodes.forEach { (_, node) ->
|
||||||
|
action(node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
61
src/main/kotlin/util/network/simulator/Node.kt
Normal file
61
src/main/kotlin/util/network/simulator/Node.kt
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
package util.network.simulator
|
||||||
|
|
||||||
|
import util.network.simulator.packet.Packet
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
|
|
||||||
|
abstract class Node(
|
||||||
|
val address: Address,
|
||||||
|
protected val network: Network,
|
||||||
|
bufferSize: Int = 1
|
||||||
|
) {
|
||||||
|
private val buffer = LinkedBlockingQueue<Packet>(bufferSize)
|
||||||
|
private val receiver: Receiver
|
||||||
|
private val sender: Sender
|
||||||
|
|
||||||
|
init {
|
||||||
|
network.addNode(this)
|
||||||
|
val factory = communicatorFactory(network.threadGroup)
|
||||||
|
receiver = factory.createReceiver()
|
||||||
|
sender = factory.createSender()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun start() {
|
||||||
|
receiver.start()
|
||||||
|
sender.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun setPacket(packet: Packet) {
|
||||||
|
buffer.put(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getPacket(): Packet {
|
||||||
|
return buffer.take()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun send(packet: Packet) {
|
||||||
|
network[packet.receiverAddress]?.setPacket(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected fun receive(): Packet = getPacket()
|
||||||
|
|
||||||
|
protected abstract fun communicatorFactory(
|
||||||
|
threadGroup: ThreadGroup
|
||||||
|
): CommunicatorFactory
|
||||||
|
|
||||||
|
protected abstract inner class CommunicatorFactory(
|
||||||
|
protected val threadGroup: ThreadGroup
|
||||||
|
) {
|
||||||
|
abstract fun createReceiver(): Receiver
|
||||||
|
abstract fun createSender(): Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract inner class Receiver(threadGroup: ThreadGroup) : Thread(
|
||||||
|
threadGroup,
|
||||||
|
"${threadGroup.name}-receiver"
|
||||||
|
)
|
||||||
|
|
||||||
|
protected abstract inner class Sender(threadGroup: ThreadGroup) : Thread(
|
||||||
|
threadGroup,
|
||||||
|
"${threadGroup.name}-sender"
|
||||||
|
)
|
||||||
|
}
|
52
src/main/kotlin/util/network/simulator/Test.kt
Normal file
52
src/main/kotlin/util/network/simulator/Test.kt
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
package util.network.simulator
|
||||||
|
|
||||||
|
import util.network.simulator.packet.NetworkPacket
|
||||||
|
|
||||||
|
fun main() {
|
||||||
|
val network = Network()
|
||||||
|
|
||||||
|
repeat(2) {
|
||||||
|
TestNode(Address(it), network)
|
||||||
|
}
|
||||||
|
|
||||||
|
network.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestNode(address: Address, network: Network) : Node(address, network) {
|
||||||
|
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
|
||||||
|
return TestCommunicatorFactory(threadGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) {
|
||||||
|
override fun run() {
|
||||||
|
if (address != Address(0))
|
||||||
|
return
|
||||||
|
|
||||||
|
val packet = receive()
|
||||||
|
|
||||||
|
println(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) {
|
||||||
|
override fun run() {
|
||||||
|
if (address != Address(1))
|
||||||
|
return
|
||||||
|
|
||||||
|
val packet = NetworkPacket(address, Address(0))
|
||||||
|
|
||||||
|
send(packet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) :
|
||||||
|
CommunicatorFactory(threadGroup) {
|
||||||
|
override fun createReceiver(): Receiver {
|
||||||
|
return TestReceiver(threadGroup)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun createSender(): Sender {
|
||||||
|
return TestSender(threadGroup)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
package util.network.simulator.packet
|
||||||
|
|
||||||
|
interface FeedbackPacket : Packet {
|
||||||
|
val isFeedback: Boolean
|
||||||
|
}
|
@ -0,0 +1,8 @@
|
|||||||
|
package util.network.simulator.packet
|
||||||
|
|
||||||
|
import util.network.simulator.Address
|
||||||
|
|
||||||
|
data class NetworkPacket(
|
||||||
|
override val senderAddress: Address,
|
||||||
|
override val receiverAddress: Address
|
||||||
|
) : Packet
|
8
src/main/kotlin/util/network/simulator/packet/Packet.kt
Normal file
8
src/main/kotlin/util/network/simulator/packet/Packet.kt
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
package util.network.simulator.packet
|
||||||
|
|
||||||
|
import util.network.simulator.Address
|
||||||
|
|
||||||
|
interface Packet {
|
||||||
|
val senderAddress: Address
|
||||||
|
val receiverAddress: Address
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
package util.network.simulator.packet
|
||||||
|
|
||||||
|
interface PayloadPacket<T> : Packet {
|
||||||
|
val payload: T
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user