add gossip

This commit is contained in:
Raffaele Mignone 2020-05-03 17:30:17 +02:00
parent 581d9365ee
commit 711da44f53
Signed by: norangebit
GPG Key ID: F5255658CB220573
6 changed files with 128 additions and 22 deletions

View File

@ -22,7 +22,7 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
alpha.changeValue(TimedPayload(42, System.nanoTime())) alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000) Thread.sleep(2000)
network.forEach { network.forEach {

View File

@ -22,7 +22,7 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
alpha.changeValue(TimedPayload(42, System.nanoTime())) alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000) Thread.sleep(2000)
network.forEach { network.forEach {

View File

@ -22,7 +22,7 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
alpha.changeValue(TimedPayload(42, System.nanoTime())) alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000) Thread.sleep(2000)
network.forEach { network.forEach {

View File

@ -0,0 +1,74 @@
package drills.drill09.gossip
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
import kotlin.random.Random
class QueryNode<T>(
address: Address,
network: Network,
initialValue: T,
bufferSize: Int = 1
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
address, network, initialValue, bufferSize
) {
private var queryValue: T = nodeValue
private var random = Random(System.currentTimeMillis())
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<T> {
return FeedbackEpidemicPacket(
address,
receiverAddress,
queryValue,
type,
false
)
}
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
//println(packet)
if (nodeState == State.REMOVED)
return
if (nodeState == State.SUSCEPTIBLE) {
// println("susceptible")
nodeState = State.INFECTED
queryValue = packet.payload
sendPush(1)
if (packet.payload == nodeValue)
println("find on node $address")
} else {
if (packet.isFeedback && random.nextBoolean()) {
nodeState = State.REMOVED
println("removed")
}
send(
FeedbackEpidemicPacket(
address,
packet.senderAddress,
queryValue,
EpidemicPacket.Type.PUSH,
true
)
)
}
}
override fun receivePull(packet: FeedbackEpidemicPacket<T>) {
TODO("Not yet implemented")
}
override fun infect() {
Thread.interrupted()
}
}

View File

@ -0,0 +1,31 @@
package drills.drill09.gossip
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
QueryNode(Address(it), network, it*2)
}
val alpha = network.getRandomNode() as QueryNode<Int>
val beta = network.getRandomNode() as QueryNode<Int>
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
14,
EpidemicPacket.Type.PUSH,
false
)
alpha.send(packet)
network.start()
}

View File

@ -14,7 +14,7 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
@Volatile @Volatile
var nodeValue = initialValue var nodeValue = initialValue
private set private set
private var nodeState = State.SUSCETIBLE protected var nodeState = State.SUSCEPTIBLE
@Synchronized @Synchronized
fun changeValue(value: T) { fun changeValue(value: T) {
@ -68,28 +68,7 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
protected abstract fun infect() protected abstract fun infect()
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { protected fun onReceive(packet: P) {
return EpidemicCommunicatorFactory(threadGroup)
}
private enum class State {
SUSCETIBLE, INFECTED, REMOVED
}
enum class InfectStrategy {
PUSH, PULL, PUSHPULL
}
protected inner class EpidemicReceiver(
threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) {
override fun run() {
while (!isInterrupted)
onReceive()
}
private fun onReceive() {
val packet = receive() as P
//println(packet.type) //println(packet.type)
when (packet.type) { when (packet.type) {
@ -102,6 +81,28 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
} }
} }
} }
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return EpidemicCommunicatorFactory(threadGroup)
}
protected enum class State {
SUSCEPTIBLE, INFECTED, REMOVED
}
enum class InfectStrategy {
PUSH, PULL, PUSHPULL
}
protected inner class EpidemicReceiver(
threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) {
override fun run() {
while (!isInterrupted) {
val packet = receive() as P
onReceive(packet)
}
}
} }
protected inner class EpidemicSender( protected inner class EpidemicSender(