add average and db push, pull and pushpull

This commit is contained in:
Raffaele Mignone 2020-05-03 14:57:03 +02:00
parent 32ca1dd9a0
commit 581d9365ee
Signed by: norangebit
GPG Key ID: F5255658CB220573
18 changed files with 290 additions and 77 deletions

View File

@ -1,8 +1,8 @@
package drills.drill09.average package drills.drill09.average
import util.network.epidemic.EpidemicNode import util.network.epidemic.EpidemicNode
import util.network.epidemic.EpidemicPacket import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.replay.ReplayPacket import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
@ -10,12 +10,15 @@ class AverageNode(
address: Address, address: Address,
network: Network, network: Network,
initialValue: Double, initialValue: Double,
val infectStrategy: InfectStrategy = InfectStrategy.PUSH,
bufferSize: Int = 1 bufferSize: Int = 1
) : EpidemicNode<Double, ReplayPacket<Double>>(address, network, initialValue, bufferSize) { ) : EpidemicNode<Double, FeedbackEpidemicPacket<Double>>(
override fun receivePushOrReply(packet: ReplayPacket<Double>) { address, network, initialValue, bufferSize
) {
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<Double>) {
val payload = packet.payload val payload = packet.payload
if (packet.isReplay) if (packet.isFeedback)
changeValue(payload) changeValue(payload)
else { else {
val newValue = (nodeValue + payload) / 2 val newValue = (nodeValue + payload) / 2
@ -23,21 +26,34 @@ class AverageNode(
changeValue(newValue) changeValue(newValue)
send( send(
ReplayPacket( FeedbackEpidemicPacket(
address, address,
packet.senderAddress, packet.senderAddress,
newValue, newValue,
EpidemicPacket.Type.PUSH, if (infectStrategy == InfectStrategy.PUSH)
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
true true
) )
) )
sendPush() when (infectStrategy) {
InfectStrategy.PUSH -> sendPush()
InfectStrategy.PULL -> sendPull()
InfectStrategy.PUSHPULL -> sendPushPull()
}
} }
} }
override fun receivePull(packet: ReplayPacket<Double>) { override fun receivePull(packet: FeedbackEpidemicPacket<Double>) {
TODO("Not yet implemented") if (packet.type == EpidemicPacket.Type.PUSHPULL)
return
send(
makeInfectionPacket(
packet.senderAddress,
EpidemicPacket.Type.REPLY
)
)
} }
override fun infect() { override fun infect() {
@ -48,7 +64,7 @@ class AverageNode(
receiverAddress: Address, receiverAddress: Address,
type: EpidemicPacket.Type type: EpidemicPacket.Type
): EpidemicPacket<Double> { ): EpidemicPacket<Double> {
return ReplayPacket( return FeedbackEpidemicPacket(
address, address,
receiverAddress, receiverAddress,
nodeValue, nodeValue,
@ -56,4 +72,27 @@ class AverageNode(
false false
) )
} }
companion object {
fun createPushNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
fun createPullNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
fun createPushPullNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
}
} }

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPullNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.REPLY,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -1,7 +1,7 @@
package drills.drill09.average package drills.drill09.average
import util.network.epidemic.EpidemicPacket import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.replay.ReplayPacket import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
@ -9,13 +9,13 @@ fun main() {
val network = Network() val network = Network()
repeat(17) { repeat(17) {
AverageNode(Address(it), network, 0.0) AverageNode.createPushNode(Address(it), network, 0.0)
} }
val alpha = network.getRandomNode() as AverageNode val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode val beta = network.getRandomNode() as AverageNode
val packet = ReplayPacket( val packet = FeedbackEpidemicPacket(
alpha.address, alpha.address,
beta.address, beta.address,
1, 1,

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPushPullNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.REPLY,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -1,54 +1,84 @@
package drills.drill09.dbreplica package drills.drill09.dbreplica
import util.network.epidemic.TimePayload import util.network.epidemic.TimedPayload
import util.network.epidemic.EpidemicNode import util.network.epidemic.EpidemicNode
import util.network.epidemic.EpidemicPacket import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.TimedEpidemicPacket
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
class DbNode<T>( class DbNode<T>(
address: Address, address: Address,
network: Network, network: Network,
initialValue: TimePayload<T>, initialValue: TimedPayload<T>,
val infectStrategy: InfectStrategy = InfectStrategy.PUSHPULL,
bufferSize: Int = 1 bufferSize: Int = 1
) : EpidemicNode<TimePayload<T>, EpidemicPacket<TimePayload<T>>>( ) : EpidemicNode<TimedPayload<T>, TimedEpidemicPacket<T>>(
address, network, initialValue, bufferSize address, network, initialValue, bufferSize
) { ) {
override fun receivePushOrReply(packet: EpidemicPacket<TimePayload<T>>) { override fun receivePushOrReply(packet: TimedEpidemicPacket<T>) {
val receivedValue = packet.payload val receivedValue = packet.payload
if (nodeValue.timestamp < receivedValue.timestamp) if (nodeValue.timestamp < receivedValue.timestamp)
changeValue(receivedValue) changeValue(receivedValue)
} }
override fun receivePull(packet: EpidemicPacket<TimePayload<T>>) { override fun receivePull(packet: TimedEpidemicPacket<T>) {
val receivedValue = packet.payload val receivedValue = packet.payload
if (nodeValue.timestamp > receivedValue.timestamp) if (nodeValue.timestamp > receivedValue.timestamp)
send( send(
EpidemicPacket( makeInfectionPacket(
address,
packet.senderAddress, packet.senderAddress,
nodeValue,
EpidemicPacket.Type.REPLY EpidemicPacket.Type.REPLY
) )
) )
} }
override fun infect() { override fun infect() {
sendPushPull() when (infectStrategy) {
Thread.sleep(100) InfectStrategy.PUSH -> sendPush()
InfectStrategy.PULL -> sendPull()
InfectStrategy.PUSHPULL -> sendPushPull()
}
Thread.sleep(DELTA)
} }
override fun makeInfectionPacket( override fun makeInfectionPacket(
receiverAddress: Address, receiverAddress: Address,
type: EpidemicPacket.Type type: EpidemicPacket.Type
): EpidemicPacket<TimePayload<T>> { ): EpidemicPacket<TimedPayload<T>> {
return EpidemicPacket( return TimedEpidemicPacket(
address, address,
receiverAddress, receiverAddress,
nodeValue, nodeValue,
type type
) )
} }
companion object {
const val DELTA = 100L
fun <T> createPushNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
fun <T> createPullNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
fun <T> createPushPullNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
}
} }

View File

@ -1,6 +1,6 @@
package drills.drill09.dbreplica package drills.drill09.dbreplica
import util.network.epidemic.TimePayload import util.network.epidemic.TimedPayload
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
@ -9,7 +9,7 @@ fun main() {
val network = Network() val network = Network()
repeat(10) { repeat(10) {
DbNode(Address(it), network, TimePayload(0, 0)) DbNode.createPullNode(Address(it), network, TimedPayload(0, 0))
} }
val alpha = network.getRandomNode() as DbNode<Int> val alpha = network.getRandomNode() as DbNode<Int>
@ -22,7 +22,7 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
alpha.changeValue(TimePayload(42, System.nanoTime())) alpha.changeValue(TimedPayload(42, System.nanoTime()))
Thread.sleep(2000) Thread.sleep(2000)
network.forEach { network.forEach {

View File

@ -0,0 +1,32 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPushNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload(42, System.nanoTime()))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,32 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPushPullNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload(42, System.nanoTime()))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -1,5 +1,6 @@
package util.network.epidemic package util.network.epidemic
import util.network.epidemic.packet.EpidemicPacket
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
import util.network.simulator.Node import util.network.simulator.Node
@ -75,6 +76,10 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
SUSCETIBLE, INFECTED, REMOVED SUSCETIBLE, INFECTED, REMOVED
} }
enum class InfectStrategy {
PUSH, PULL, PUSHPULL
}
protected inner class EpidemicReceiver( protected inner class EpidemicReceiver(
threadGroup: ThreadGroup threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) { ) : Node.Receiver(threadGroup) {

View File

@ -1,18 +0,0 @@
package util.network.epidemic
import util.network.simulator.Address
import util.network.simulator.packet.PayloadPacket
open class EpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: T,
val type: Type
) : PayloadPacket<T> {
enum class Type {
PUSH, PULL, PUSHPULL, REPLY
}
}

View File

@ -1,6 +0,0 @@
package util.network.epidemic
data class ReplayPayload<T>(
val value: T,
val isReplay: Boolean = true
)

View File

@ -1,9 +0,0 @@
package util.network.epidemic
data class TimePayload<T>(val value: T, val timestamp: Long) {
companion object {
fun <T> createContent(value: T): TimePayload<T> {
return TimePayload(value, System.nanoTime())
}
}
}

View File

@ -0,0 +1,9 @@
package util.network.epidemic
data class TimedPayload<T>(val value: T, val timestamp: Long) {
companion object {
fun <T> createContent(value: T): TimedPayload<T> {
return TimedPayload(value, System.nanoTime())
}
}
}

View File

@ -0,0 +1,13 @@
package util.network.epidemic.packet
import util.network.simulator.packet.PayloadPacket
interface EpidemicPacket<T>: PayloadPacket<T> {
val type: Type
enum class Type {
PUSH, PULL, PUSHPULL, REPLY
}
}

View File

@ -0,0 +1,13 @@
package util.network.epidemic.packet
import util.network.simulator.Address
import util.network.simulator.packet.FeedbackPacket
data class FeedbackEpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: T,
override val type: EpidemicPacket.Type,
override val isFeedback: Boolean
) : EpidemicPacket<T>, FeedbackPacket

View File

@ -0,0 +1,11 @@
package util.network.epidemic.packet
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
data class TimedEpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: TimedPayload<T>,
override val type: EpidemicPacket.Type
): EpidemicPacket<TimedPayload<T>>

View File

@ -1,13 +0,0 @@
package util.network.epidemic.replay
import util.network.epidemic.EpidemicPacket
import util.network.simulator.Address
class ReplayPacket<T>(
senderAddress: Address,
receiverAddress: Address,
payload: T,
type: Type,
val isReplay: Boolean
) : EpidemicPacket<T>(senderAddress, receiverAddress, payload, type)

View File

@ -0,0 +1,5 @@
package util.network.simulator.packet
interface FeedbackPacket: Packet {
val isFeedback: Boolean
}