Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
b50890b44f | |||
711da44f53 | |||
581d9365ee | |||
32ca1dd9a0 | |||
5688bd653a | |||
9690167176 |
107
src/main/kotlin/drills/drill09/average/AverageNode.kt
Normal file
107
src/main/kotlin/drills/drill09/average/AverageNode.kt
Normal file
@ -0,0 +1,107 @@
|
||||
package drills.drill09.average
|
||||
|
||||
import util.network.epidemic.EpidemicNode
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
class AverageNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: Double,
|
||||
val infectStrategy: InfectStrategy = InfectStrategy.PUSH,
|
||||
bufferSize: Int = 1
|
||||
) : EpidemicNode<Double, FeedbackEpidemicPacket<Double>>(
|
||||
address, network, initialValue, bufferSize
|
||||
) {
|
||||
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<Double>) {
|
||||
val payload = packet.payload
|
||||
|
||||
if (packet.isFeedback)
|
||||
changeValue(payload)
|
||||
else {
|
||||
computeAverage(payload)
|
||||
sendFeedback(packet.senderAddress)
|
||||
spreadNewValue()
|
||||
}
|
||||
}
|
||||
|
||||
private fun computeAverage(payload: Double) {
|
||||
val newValue = (nodeValue + payload) / 2
|
||||
changeValue(newValue)
|
||||
}
|
||||
|
||||
private fun sendFeedback(receiverAddress: Address) {
|
||||
send(
|
||||
FeedbackEpidemicPacket(
|
||||
address,
|
||||
receiverAddress,
|
||||
nodeValue,
|
||||
if (infectStrategy == InfectStrategy.PUSH)
|
||||
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
|
||||
true
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
private fun spreadNewValue() {
|
||||
when (infectStrategy) {
|
||||
InfectStrategy.PUSH -> sendPushToRandom()
|
||||
InfectStrategy.PULL -> sendPullToRandom()
|
||||
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
|
||||
}
|
||||
}
|
||||
|
||||
override fun receivePull(packet: FeedbackEpidemicPacket<Double>) {
|
||||
if (packet.type == EpidemicPacket.Type.PUSHPULL)
|
||||
return
|
||||
|
||||
send(
|
||||
makeInfectionPacket(
|
||||
packet.senderAddress,
|
||||
EpidemicPacket.Type.REPLY
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
override fun infect() {
|
||||
Thread.interrupted()
|
||||
}
|
||||
|
||||
override fun makeInfectionPacket(
|
||||
receiverAddress: Address,
|
||||
type: EpidemicPacket.Type
|
||||
): EpidemicPacket<Double> {
|
||||
return FeedbackEpidemicPacket(
|
||||
address,
|
||||
receiverAddress,
|
||||
nodeValue,
|
||||
type,
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun createPushNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: Double,
|
||||
bufferSize: Int = 1
|
||||
) = AverageNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
|
||||
|
||||
fun createPullNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: Double,
|
||||
bufferSize: Int = 1
|
||||
) = AverageNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
|
||||
|
||||
fun createPushPullNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: Double,
|
||||
bufferSize: Int = 1
|
||||
) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
|
||||
}
|
||||
}
|
35
src/main/kotlin/drills/drill09/average/TestPull.kt
Normal file
35
src/main/kotlin/drills/drill09/average/TestPull.kt
Normal file
@ -0,0 +1,35 @@
|
||||
package drills.drill09.average
|
||||
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(17) {
|
||||
AverageNode.createPullNode(Address(it), network, 0.0)
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as AverageNode
|
||||
val beta = network.getRandomNode() as AverageNode
|
||||
|
||||
val packet = FeedbackEpidemicPacket(
|
||||
alpha.address,
|
||||
beta.address,
|
||||
1,
|
||||
EpidemicPacket.Type.REPLY,
|
||||
false
|
||||
)
|
||||
|
||||
alpha.send(packet)
|
||||
|
||||
network.start()
|
||||
|
||||
Thread.sleep(2000)
|
||||
|
||||
network.forEach {
|
||||
println(1.0 / (it as AverageNode).nodeValue)
|
||||
}
|
||||
}
|
35
src/main/kotlin/drills/drill09/average/TestPush.kt
Normal file
35
src/main/kotlin/drills/drill09/average/TestPush.kt
Normal file
@ -0,0 +1,35 @@
|
||||
package drills.drill09.average
|
||||
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(17) {
|
||||
AverageNode.createPushNode(Address(it), network, 0.0)
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as AverageNode
|
||||
val beta = network.getRandomNode() as AverageNode
|
||||
|
||||
val packet = FeedbackEpidemicPacket(
|
||||
alpha.address,
|
||||
beta.address,
|
||||
1,
|
||||
EpidemicPacket.Type.PUSH,
|
||||
false
|
||||
)
|
||||
|
||||
alpha.send(packet)
|
||||
|
||||
network.start()
|
||||
|
||||
Thread.sleep(2000)
|
||||
|
||||
network.forEach {
|
||||
println(1.0 / (it as AverageNode).nodeValue)
|
||||
}
|
||||
}
|
35
src/main/kotlin/drills/drill09/average/TestPushPull.kt
Normal file
35
src/main/kotlin/drills/drill09/average/TestPushPull.kt
Normal file
@ -0,0 +1,35 @@
|
||||
package drills.drill09.average
|
||||
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(17) {
|
||||
AverageNode.createPushPullNode(Address(it), network, 0.0)
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as AverageNode
|
||||
val beta = network.getRandomNode() as AverageNode
|
||||
|
||||
val packet = FeedbackEpidemicPacket(
|
||||
alpha.address,
|
||||
beta.address,
|
||||
1,
|
||||
EpidemicPacket.Type.REPLY,
|
||||
false
|
||||
)
|
||||
|
||||
alpha.send(packet)
|
||||
|
||||
network.start()
|
||||
|
||||
Thread.sleep(2000)
|
||||
|
||||
network.forEach {
|
||||
println(1.0 / (it as AverageNode).nodeValue)
|
||||
}
|
||||
}
|
84
src/main/kotlin/drills/drill09/dbreplica/DbNode.kt
Normal file
84
src/main/kotlin/drills/drill09/dbreplica/DbNode.kt
Normal file
@ -0,0 +1,84 @@
|
||||
package drills.drill09.dbreplica
|
||||
|
||||
import util.network.epidemic.TimedPayload
|
||||
import util.network.epidemic.EpidemicNode
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.TimedEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
class DbNode<T>(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: TimedPayload<T>,
|
||||
val infectStrategy: InfectStrategy = InfectStrategy.PUSHPULL,
|
||||
bufferSize: Int = 1
|
||||
) : EpidemicNode<TimedPayload<T>, TimedEpidemicPacket<T>>(
|
||||
address, network, initialValue, bufferSize
|
||||
) {
|
||||
override fun receivePushOrReply(packet: TimedEpidemicPacket<T>) {
|
||||
val receivedValue = packet.payload
|
||||
|
||||
if (nodeValue.timestamp < receivedValue.timestamp)
|
||||
changeValue(receivedValue)
|
||||
}
|
||||
|
||||
override fun receivePull(packet: TimedEpidemicPacket<T>) {
|
||||
val receivedValue = packet.payload
|
||||
|
||||
if (nodeValue.timestamp > receivedValue.timestamp)
|
||||
send(
|
||||
makeInfectionPacket(
|
||||
packet.senderAddress,
|
||||
EpidemicPacket.Type.REPLY
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
override fun infect() {
|
||||
when (infectStrategy) {
|
||||
InfectStrategy.PUSH -> sendPushToRandom()
|
||||
InfectStrategy.PULL -> sendPullToRandom()
|
||||
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
|
||||
}
|
||||
|
||||
Thread.sleep(DELTA)
|
||||
}
|
||||
|
||||
override fun makeInfectionPacket(
|
||||
receiverAddress: Address,
|
||||
type: EpidemicPacket.Type
|
||||
): EpidemicPacket<TimedPayload<T>> {
|
||||
return TimedEpidemicPacket(
|
||||
address,
|
||||
receiverAddress,
|
||||
nodeValue,
|
||||
type
|
||||
)
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val DELTA = 100L
|
||||
|
||||
fun <T> createPushNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: TimedPayload<T>,
|
||||
bufferSize: Int = 1
|
||||
) = DbNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
|
||||
|
||||
fun <T> createPullNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: TimedPayload<T>,
|
||||
bufferSize: Int = 1
|
||||
) = DbNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
|
||||
|
||||
fun <T> createPushPullNode(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: TimedPayload<T>,
|
||||
bufferSize: Int = 1
|
||||
) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
|
||||
}
|
||||
}
|
30
src/main/kotlin/drills/drill09/dbreplica/TestPull.kt
Normal file
30
src/main/kotlin/drills/drill09/dbreplica/TestPull.kt
Normal file
@ -0,0 +1,30 @@
|
||||
package drills.drill09.dbreplica
|
||||
|
||||
import util.network.epidemic.TimedPayload
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(10) {
|
||||
DbNode.createPullNode(Address(it), network, TimedPayload(0, 0))
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as DbNode<Int>
|
||||
|
||||
network.start()
|
||||
|
||||
Thread.sleep(1000)
|
||||
|
||||
network.forEach {
|
||||
println((it as DbNode<Int>).nodeValue)
|
||||
}
|
||||
|
||||
alpha.changeValue(TimedPayload.createContent(42))
|
||||
Thread.sleep(2000)
|
||||
|
||||
network.forEach {
|
||||
println((it as DbNode<Int>).nodeValue)
|
||||
}
|
||||
}
|
30
src/main/kotlin/drills/drill09/dbreplica/TestPush.kt
Normal file
30
src/main/kotlin/drills/drill09/dbreplica/TestPush.kt
Normal file
@ -0,0 +1,30 @@
|
||||
package drills.drill09.dbreplica
|
||||
|
||||
import util.network.epidemic.TimedPayload
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(10) {
|
||||
DbNode.createPushNode(Address(it), network, TimedPayload(0, 0))
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as DbNode<Int>
|
||||
|
||||
network.start()
|
||||
|
||||
Thread.sleep(1000)
|
||||
|
||||
network.forEach {
|
||||
println((it as DbNode<Int>).nodeValue)
|
||||
}
|
||||
|
||||
alpha.changeValue(TimedPayload.createContent(42))
|
||||
Thread.sleep(2000)
|
||||
|
||||
network.forEach {
|
||||
println((it as DbNode<Int>).nodeValue)
|
||||
}
|
||||
}
|
30
src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt
Normal file
30
src/main/kotlin/drills/drill09/dbreplica/TestPushPull.kt
Normal file
@ -0,0 +1,30 @@
|
||||
package drills.drill09.dbreplica
|
||||
|
||||
import util.network.epidemic.TimedPayload
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(10) {
|
||||
DbNode.createPushPullNode(Address(it), network, TimedPayload(0, 0))
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as DbNode<Int>
|
||||
|
||||
network.start()
|
||||
|
||||
Thread.sleep(1000)
|
||||
|
||||
network.forEach {
|
||||
println((it as DbNode<Int>).nodeValue)
|
||||
}
|
||||
|
||||
alpha.changeValue(TimedPayload.createContent(42))
|
||||
Thread.sleep(2000)
|
||||
|
||||
network.forEach {
|
||||
println((it as DbNode<Int>).nodeValue)
|
||||
}
|
||||
}
|
76
src/main/kotlin/drills/drill09/gossip/QueryNode.kt
Normal file
76
src/main/kotlin/drills/drill09/gossip/QueryNode.kt
Normal file
@ -0,0 +1,76 @@
|
||||
package drills.drill09.gossip
|
||||
|
||||
import util.network.epidemic.EpidemicNode
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
import kotlin.random.Random
|
||||
|
||||
class QueryNode<T>(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: T,
|
||||
private val demotivatingFactor: Int = 10,
|
||||
bufferSize: Int = 1
|
||||
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
|
||||
address, network, initialValue, bufferSize
|
||||
) {
|
||||
private var queryValue: T = nodeValue
|
||||
private var random = Random(System.currentTimeMillis())
|
||||
|
||||
override fun makeInfectionPacket(
|
||||
receiverAddress: Address,
|
||||
type: EpidemicPacket.Type
|
||||
): EpidemicPacket<T> {
|
||||
return FeedbackEpidemicPacket(
|
||||
address,
|
||||
receiverAddress,
|
||||
queryValue,
|
||||
type,
|
||||
false
|
||||
)
|
||||
}
|
||||
|
||||
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
|
||||
when (nodeState) {
|
||||
State.REMOVED -> return
|
||||
State.INFECTED -> whenInfected(packet)
|
||||
State.SUSCEPTIBLE -> whenSusceptible(packet)
|
||||
}
|
||||
}
|
||||
|
||||
private fun whenSusceptible(packet: FeedbackEpidemicPacket<T>) {
|
||||
nodeState = State.INFECTED
|
||||
queryValue = packet.payload
|
||||
|
||||
sendPushToRandom(2)
|
||||
|
||||
if (packet.payload == nodeValue)
|
||||
println("find ${packet.payload} on node $address")
|
||||
}
|
||||
|
||||
private fun whenInfected(packet: FeedbackEpidemicPacket<T>) {
|
||||
if (!packet.isFeedback)
|
||||
send(
|
||||
FeedbackEpidemicPacket(
|
||||
address,
|
||||
packet.senderAddress,
|
||||
queryValue,
|
||||
EpidemicPacket.Type.PUSH,
|
||||
true
|
||||
)
|
||||
)
|
||||
|
||||
if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor)
|
||||
nodeState = State.REMOVED
|
||||
}
|
||||
|
||||
override fun receivePull(packet: FeedbackEpidemicPacket<T>) {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
override fun infect() {
|
||||
Thread.interrupted()
|
||||
}
|
||||
}
|
29
src/main/kotlin/drills/drill09/gossip/TestPush.kt
Normal file
29
src/main/kotlin/drills/drill09/gossip/TestPush.kt
Normal file
@ -0,0 +1,29 @@
|
||||
package drills.drill09.gossip
|
||||
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.epidemic.packet.FeedbackEpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(20) {
|
||||
QueryNode(Address(it), network, it * 2)
|
||||
}
|
||||
|
||||
val alpha = network.getRandomNode() as QueryNode<Int>
|
||||
val beta = network.getRandomNode() as QueryNode<Int>
|
||||
|
||||
val packet = FeedbackEpidemicPacket(
|
||||
alpha.address,
|
||||
beta.address,
|
||||
24,
|
||||
EpidemicPacket.Type.PUSH,
|
||||
false
|
||||
)
|
||||
|
||||
alpha.send(packet)
|
||||
|
||||
network.start()
|
||||
}
|
126
src/main/kotlin/util/network/epidemic/EpidemicNode.kt
Normal file
126
src/main/kotlin/util/network/epidemic/EpidemicNode.kt
Normal file
@ -0,0 +1,126 @@
|
||||
package util.network.epidemic
|
||||
|
||||
import util.network.epidemic.packet.EpidemicPacket
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.Network
|
||||
import util.network.simulator.Node
|
||||
|
||||
abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
|
||||
address: Address,
|
||||
network: Network,
|
||||
initialValue: T,
|
||||
bufferSize: Int = 1
|
||||
) : Node(address, network, bufferSize) {
|
||||
@Volatile
|
||||
var nodeValue = initialValue
|
||||
private set
|
||||
protected var nodeState = State.SUSCEPTIBLE
|
||||
|
||||
@Synchronized
|
||||
fun changeValue(value: T) {
|
||||
nodeValue = value
|
||||
}
|
||||
|
||||
private fun sendToRandom(
|
||||
type: EpidemicPacket.Type,
|
||||
numberOfReceiver: Int = 1
|
||||
) {
|
||||
val toExclude = mutableListOf(address)
|
||||
repeat(numberOfReceiver) {
|
||||
var receiverAddress = network.getRandomAddress()
|
||||
while (toExclude.contains(receiverAddress))
|
||||
receiverAddress = network.getRandomAddress()
|
||||
toExclude.add(receiverAddress)
|
||||
|
||||
send(makeInfectionPacket(receiverAddress, type))
|
||||
}
|
||||
}
|
||||
|
||||
abstract fun makeInfectionPacket(
|
||||
receiverAddress: Address,
|
||||
type: EpidemicPacket.Type
|
||||
): EpidemicPacket<T>
|
||||
|
||||
protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
|
||||
sendToRandom(
|
||||
EpidemicPacket.Type.PUSH,
|
||||
numberOfReceiver
|
||||
)
|
||||
}
|
||||
|
||||
protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
|
||||
sendToRandom(
|
||||
EpidemicPacket.Type.PULL,
|
||||
numberOfReceiver
|
||||
)
|
||||
}
|
||||
|
||||
protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
|
||||
sendToRandom(
|
||||
EpidemicPacket.Type.PUSHPULL,
|
||||
numberOfReceiver
|
||||
)
|
||||
}
|
||||
|
||||
protected abstract fun receivePushOrReply(packet: P)
|
||||
|
||||
protected abstract fun receivePull(packet: P)
|
||||
|
||||
protected abstract fun infect()
|
||||
|
||||
protected fun onReceive(packet: P) {
|
||||
when (packet.type) {
|
||||
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
|
||||
EpidemicPacket.Type.PULL -> receivePull(packet)
|
||||
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
|
||||
EpidemicPacket.Type.PUSHPULL -> {
|
||||
receivePull(packet)
|
||||
receivePushOrReply(packet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
|
||||
return EpidemicCommunicatorFactory(threadGroup)
|
||||
}
|
||||
|
||||
protected enum class State {
|
||||
SUSCEPTIBLE, INFECTED, REMOVED
|
||||
}
|
||||
|
||||
enum class InfectStrategy {
|
||||
PUSH, PULL, PUSHPULL
|
||||
}
|
||||
|
||||
protected inner class EpidemicReceiver(
|
||||
threadGroup: ThreadGroup
|
||||
) : Node.Receiver(threadGroup) {
|
||||
override fun run() {
|
||||
while (!isInterrupted) {
|
||||
val packet = receive() as P
|
||||
onReceive(packet)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected inner class EpidemicSender(
|
||||
threadGroup: ThreadGroup
|
||||
) : Node.Sender(threadGroup) {
|
||||
override fun run() {
|
||||
while (!isInterrupted)
|
||||
infect()
|
||||
}
|
||||
}
|
||||
|
||||
protected inner class EpidemicCommunicatorFactory(
|
||||
threadGroup: ThreadGroup
|
||||
) : Node.CommunicatorFactory(threadGroup) {
|
||||
override fun createReceiver(): Receiver {
|
||||
return EpidemicReceiver(threadGroup)
|
||||
}
|
||||
|
||||
override fun createSender(): Sender {
|
||||
return EpidemicSender(threadGroup)
|
||||
}
|
||||
}
|
||||
}
|
9
src/main/kotlin/util/network/epidemic/TimedPayload.kt
Normal file
9
src/main/kotlin/util/network/epidemic/TimedPayload.kt
Normal file
@ -0,0 +1,9 @@
|
||||
package util.network.epidemic
|
||||
|
||||
data class TimedPayload<T>(val value: T, val timestamp: Long) {
|
||||
companion object {
|
||||
fun <T> createContent(value: T): TimedPayload<T> {
|
||||
return TimedPayload(value, System.nanoTime())
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
package util.network.epidemic.packet
|
||||
|
||||
import util.network.simulator.packet.PayloadPacket
|
||||
|
||||
interface EpidemicPacket<T> : PayloadPacket<T> {
|
||||
val type: Type
|
||||
|
||||
enum class Type {
|
||||
PUSH, PULL, PUSHPULL, REPLY
|
||||
}
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package util.network.epidemic.packet
|
||||
|
||||
import util.network.simulator.Address
|
||||
import util.network.simulator.packet.FeedbackPacket
|
||||
|
||||
data class FeedbackEpidemicPacket<T>(
|
||||
override val senderAddress: Address,
|
||||
override val receiverAddress: Address,
|
||||
override val payload: T,
|
||||
override val type: EpidemicPacket.Type,
|
||||
override val isFeedback: Boolean
|
||||
) : EpidemicPacket<T>, FeedbackPacket
|
@ -0,0 +1,11 @@
|
||||
package util.network.epidemic.packet
|
||||
|
||||
import util.network.epidemic.TimedPayload
|
||||
import util.network.simulator.Address
|
||||
|
||||
data class TimedEpidemicPacket<T>(
|
||||
override val senderAddress: Address,
|
||||
override val receiverAddress: Address,
|
||||
override val payload: TimedPayload<T>,
|
||||
override val type: EpidemicPacket.Type
|
||||
) : EpidemicPacket<TimedPayload<T>>
|
3
src/main/kotlin/util/network/simulator/Address.kt
Normal file
3
src/main/kotlin/util/network/simulator/Address.kt
Normal file
@ -0,0 +1,3 @@
|
||||
package util.network.simulator
|
||||
|
||||
inline class Address(val value: Int)
|
41
src/main/kotlin/util/network/simulator/Network.kt
Normal file
41
src/main/kotlin/util/network/simulator/Network.kt
Normal file
@ -0,0 +1,41 @@
|
||||
package util.network.simulator
|
||||
|
||||
import kotlin.random.Random
|
||||
|
||||
class Network(threadGrupName: String = "network") {
|
||||
private val nodes = mutableMapOf<Address, Node>()
|
||||
private val addresses = mutableListOf<Address>()
|
||||
val threadGroup = ThreadGroup(threadGrupName)
|
||||
private val random by lazy { Random(System.currentTimeMillis()) }
|
||||
|
||||
operator fun get(address: Address): Node? {
|
||||
return nodes[address]
|
||||
}
|
||||
|
||||
fun addNode(node: Node) {
|
||||
nodes[node.address] = node
|
||||
addresses.add(node.address)
|
||||
}
|
||||
|
||||
fun getRandomAddress(): Address {
|
||||
val index = random.nextInt(addresses.size)
|
||||
|
||||
return addresses[index]
|
||||
}
|
||||
|
||||
fun getRandomNode(): Node {
|
||||
return nodes[getRandomAddress()]!!
|
||||
}
|
||||
|
||||
fun start() {
|
||||
nodes.forEach { (_, node) ->
|
||||
node.start()
|
||||
}
|
||||
}
|
||||
|
||||
fun forEach(action: (Node) -> Unit) {
|
||||
nodes.forEach { (_, node) ->
|
||||
action(node)
|
||||
}
|
||||
}
|
||||
}
|
61
src/main/kotlin/util/network/simulator/Node.kt
Normal file
61
src/main/kotlin/util/network/simulator/Node.kt
Normal file
@ -0,0 +1,61 @@
|
||||
package util.network.simulator
|
||||
|
||||
import util.network.simulator.packet.Packet
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
|
||||
abstract class Node(
|
||||
val address: Address,
|
||||
protected val network: Network,
|
||||
bufferSize: Int = 1
|
||||
) {
|
||||
private val buffer = LinkedBlockingQueue<Packet>(bufferSize)
|
||||
private val receiver: Receiver
|
||||
private val sender: Sender
|
||||
|
||||
init {
|
||||
network.addNode(this)
|
||||
val factory = communicatorFactory(network.threadGroup)
|
||||
receiver = factory.createReceiver()
|
||||
sender = factory.createSender()
|
||||
}
|
||||
|
||||
fun start() {
|
||||
receiver.start()
|
||||
sender.start()
|
||||
}
|
||||
|
||||
private fun setPacket(packet: Packet) {
|
||||
buffer.put(packet)
|
||||
}
|
||||
|
||||
private fun getPacket(): Packet {
|
||||
return buffer.take()
|
||||
}
|
||||
|
||||
fun send(packet: Packet) {
|
||||
network[packet.receiverAddress]?.setPacket(packet)
|
||||
}
|
||||
|
||||
protected fun receive(): Packet = getPacket()
|
||||
|
||||
protected abstract fun communicatorFactory(
|
||||
threadGroup: ThreadGroup
|
||||
): CommunicatorFactory
|
||||
|
||||
protected abstract inner class CommunicatorFactory(
|
||||
protected val threadGroup: ThreadGroup
|
||||
) {
|
||||
abstract fun createReceiver(): Receiver
|
||||
abstract fun createSender(): Sender
|
||||
}
|
||||
|
||||
protected abstract inner class Receiver(threadGroup: ThreadGroup) : Thread(
|
||||
threadGroup,
|
||||
"${threadGroup.name}-receiver"
|
||||
)
|
||||
|
||||
protected abstract inner class Sender(threadGroup: ThreadGroup) : Thread(
|
||||
threadGroup,
|
||||
"${threadGroup.name}-sender"
|
||||
)
|
||||
}
|
52
src/main/kotlin/util/network/simulator/Test.kt
Normal file
52
src/main/kotlin/util/network/simulator/Test.kt
Normal file
@ -0,0 +1,52 @@
|
||||
package util.network.simulator
|
||||
|
||||
import util.network.simulator.packet.NetworkPacket
|
||||
|
||||
fun main() {
|
||||
val network = Network()
|
||||
|
||||
repeat(2) {
|
||||
TestNode(Address(it), network)
|
||||
}
|
||||
|
||||
network.start()
|
||||
}
|
||||
|
||||
class TestNode(address: Address, network: Network) : Node(address, network) {
|
||||
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
|
||||
return TestCommunicatorFactory(threadGroup)
|
||||
}
|
||||
|
||||
private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) {
|
||||
override fun run() {
|
||||
if (address != Address(0))
|
||||
return
|
||||
|
||||
val packet = receive()
|
||||
|
||||
println(packet)
|
||||
}
|
||||
}
|
||||
|
||||
private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) {
|
||||
override fun run() {
|
||||
if (address != Address(1))
|
||||
return
|
||||
|
||||
val packet = NetworkPacket(address, Address(0))
|
||||
|
||||
send(packet)
|
||||
}
|
||||
}
|
||||
|
||||
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) :
|
||||
CommunicatorFactory(threadGroup) {
|
||||
override fun createReceiver(): Receiver {
|
||||
return TestReceiver(threadGroup)
|
||||
}
|
||||
|
||||
override fun createSender(): Sender {
|
||||
return TestSender(threadGroup)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package util.network.simulator.packet
|
||||
|
||||
interface FeedbackPacket : Packet {
|
||||
val isFeedback: Boolean
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package util.network.simulator.packet
|
||||
|
||||
import util.network.simulator.Address
|
||||
|
||||
data class NetworkPacket(
|
||||
override val senderAddress: Address,
|
||||
override val receiverAddress: Address
|
||||
) : Packet
|
8
src/main/kotlin/util/network/simulator/packet/Packet.kt
Normal file
8
src/main/kotlin/util/network/simulator/packet/Packet.kt
Normal file
@ -0,0 +1,8 @@
|
||||
package util.network.simulator.packet
|
||||
|
||||
import util.network.simulator.Address
|
||||
|
||||
interface Packet {
|
||||
val senderAddress: Address
|
||||
val receiverAddress: Address
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
package util.network.simulator.packet
|
||||
|
||||
interface PayloadPacket<T> : Packet {
|
||||
val payload: T
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user