add average

This commit is contained in:
Raffaele Mignone 2020-05-02 22:27:43 +02:00
parent 9690167176
commit 5688bd653a
Signed by: norangebit
GPG Key ID: F5255658CB220573
14 changed files with 271 additions and 68 deletions

View File

@ -1,9 +0,0 @@
package util.network.epidemic
data class Content<T>(val value: T, val timestamp: Long) {
companion object {
fun <T> createContent(value: T): Content<T> {
return Content(value, System.nanoTime())
}
}
}

View File

@ -4,23 +4,26 @@ import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
import util.network.simulator.Node import util.network.simulator.Node
class EpidemicNode<T>( abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
address: Address, address: Address,
network: Network, network: Network,
initialValue: Content<T>, initialValue: T,
bufferSize: Int = 1 bufferSize: Int = 1
) : Node(address, network, bufferSize) { ) : Node(address, network, bufferSize) {
@Volatile @Volatile
var nodeValue = initialValue var nodeValue = initialValue
private set private set
private var nodeState = State.SUSSCETIBLE private var nodeState = State.SUSCETIBLE
@Synchronized @Synchronized
fun changeValue(value: Content<T>) { fun changeValue(value: T) {
nodeValue = value nodeValue = value
} }
private fun send(type: EpidemicPacket.Type, numberOfReceiver: Int = 1) { private fun sendToRandom(
packetPrototype: EpidemicPacketPrototype<T>,
numberOfReceiver: Int = 1
) {
val toExclude = mutableListOf(address) val toExclude = mutableListOf(address)
repeat(numberOfReceiver) { repeat(numberOfReceiver) {
var receiverAddress = network.getRandomAddress() var receiverAddress = network.getRandomAddress()
@ -28,56 +31,47 @@ class EpidemicNode<T>(
receiverAddress = network.getRandomAddress() receiverAddress = network.getRandomAddress()
toExclude.add(receiverAddress) toExclude.add(receiverAddress)
val packet = EpidemicPacket( send(packetPrototype.makePacket(receiverAddress))
address,
receiverAddress,
nodeValue,
type
)
send(packet)
} }
} }
private fun sendPush(numberOfReceiver: Int = 1) { abstract fun makeInfectionPrototypePacket(
send(EpidemicPacket.Type.PUSH, numberOfReceiver) type: EpidemicPacket.Type
): EpidemicPacketPrototype<T>
protected fun sendPush(numberOfReceiver: Int = 1) {
sendToRandom(
makeInfectionPrototypePacket(EpidemicPacket.Type.PUSH),
numberOfReceiver
)
} }
private fun sendPull(numberOfReceiver: Int = 1) { protected fun sendPull(numberOfReceiver: Int = 1) {
send(EpidemicPacket.Type.PULL, numberOfReceiver) sendToRandom(
makeInfectionPrototypePacket(EpidemicPacket.Type.PULL),
numberOfReceiver
)
} }
private fun sendPushPull(numberOfReceiver: Int = 1) { protected fun sendPushPull(numberOfReceiver: Int = 1) {
send(EpidemicPacket.Type.PUSHPULL, numberOfReceiver) sendToRandom(
makeInfectionPrototypePacket(EpidemicPacket.Type.PUSHPULL),
numberOfReceiver
)
} }
private fun receivePushOrReply(packet: EpidemicPacket<Content<T>>) { protected abstract fun receivePushOrReply(packet: P)
val receivedValue = packet.payload
if (receivedValue.timestamp > nodeValue.timestamp) protected abstract fun receivePull(packet: P)
nodeValue = receivedValue
}
private fun receivePull(packet: EpidemicPacket<Content<T>>) { protected abstract fun infect()
val receivedValue = packet.payload
if (nodeValue.timestamp > receivedValue.timestamp)
send(
EpidemicPacket(
address,
packet.senderAddress,
nodeValue,
EpidemicPacket.Type.REPLY
)
)
}
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return EpidemicCommunicatorFactory(threadGroup) return EpidemicCommunicatorFactory(threadGroup)
} }
private enum class State { private enum class State {
SUSSCETIBLE, INFECTED, REMOVED SUSCETIBLE, INFECTED, REMOVED
} }
protected inner class EpidemicReceiver( protected inner class EpidemicReceiver(
@ -89,7 +83,8 @@ class EpidemicNode<T>(
} }
private fun onReceive() { private fun onReceive() {
val packet = receive() as EpidemicPacket<Content<T>> val packet = receive() as P
//println(packet.type)
when (packet.type) { when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
@ -110,11 +105,6 @@ class EpidemicNode<T>(
while (!isInterrupted) while (!isInterrupted)
infect() infect()
} }
private fun infect() {
sendPushPull()
sleep(100)
}
} }
protected inner class EpidemicCommunicatorFactory( protected inner class EpidemicCommunicatorFactory(

View File

@ -3,8 +3,9 @@ package util.network.epidemic
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.packet.Packet import util.network.simulator.packet.Packet
import util.network.simulator.packet.PayloadPacket import util.network.simulator.packet.PayloadPacket
import util.network.simulator.packet.PayloadPacketPrototype
data class EpidemicPacket<T : Content<*>>( open class EpidemicPacket<T>(
override val senderAddress: Address, override val senderAddress: Address,
override val receiverAddress: Address, override val receiverAddress: Address,
override val payload: T, override val payload: T,
@ -26,9 +27,9 @@ data class EpidemicPacket<T : Content<*>>(
companion object { companion object {
fun <T> createFromNetworkPacket( fun <T> createFromNetworkPacket(
packet: Packet, packet: Packet,
payload: Content<T>, payload: TimePayload<T>,
type: Type type: Type
): EpidemicPacket<Content<T>> = EpidemicPacket( ): EpidemicPacket<TimePayload<T>> = EpidemicPacket(
packet.senderAddress, packet.senderAddress,
packet.receiverAddress, packet.receiverAddress,
payload, payload,
@ -36,9 +37,9 @@ data class EpidemicPacket<T : Content<*>>(
) )
fun <T> createFromPayloadPacket( fun <T> createFromPayloadPacket(
packet: PayloadPacket<Content<T>>, packet: PayloadPacket<TimePayload<T>>,
type: Type type: Type
): EpidemicPacket<Content<T>> = EpidemicPacket( ): EpidemicPacket<TimePayload<T>> = EpidemicPacket(
packet.senderAddress, packet.senderAddress,
packet.receiverAddress, packet.receiverAddress,
packet.payload, packet.payload,
@ -46,3 +47,13 @@ data class EpidemicPacket<T : Content<*>>(
) )
} }
} }
open class EpidemicPacketPrototype<T>(
override val senderAddress: Address,
override val payload: T,
val type: EpidemicPacket.Type
) : PayloadPacketPrototype<T> {
open fun makePacket(receiverAddress: Address): EpidemicPacket<T> {
return EpidemicPacket(senderAddress, receiverAddress, payload, type)
}
}

View File

@ -0,0 +1,6 @@
package util.network.epidemic
data class ReplayPayload<T>(
val value: T,
val isReplay: Boolean = true
)

View File

@ -1,5 +1,6 @@
package util.network.epidemic package util.network.epidemic
import util.network.epidemic.consinstence.DbNode
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
@ -8,24 +9,24 @@ fun main() {
val network = Network() val network = Network()
repeat(10) { repeat(10) {
EpidemicNode(Address(it), network, Content(0, 0)) DbNode(Address(it), network, TimePayload(0, 0))
} }
val alpha = network.getRandomNode() as EpidemicNode<Int> val alpha = network.getRandomNode() as DbNode<Int>
network.start() network.start()
Thread.sleep(1000) Thread.sleep(1000)
network.forEach { network.forEach {
println((it as EpidemicNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
alpha.changeValue(Content(42, System.nanoTime())) alpha.changeValue(TimePayload(42, System.nanoTime()))
Thread.sleep(2000) Thread.sleep(2000)
network.forEach { network.forEach {
println((it as EpidemicNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
} }

View File

@ -0,0 +1,9 @@
package util.network.epidemic
data class TimePayload<T>(val value: T, val timestamp: Long) {
companion object {
fun <T> createContent(value: T): TimePayload<T> {
return TimePayload(value, System.nanoTime())
}
}
}

View File

@ -0,0 +1,59 @@
package util.network.epidemic.consinstence
import util.network.epidemic.EpidemicNode
import util.network.epidemic.EpidemicPacket
import util.network.epidemic.EpidemicPacketPrototype
import util.network.epidemic.replay.ReplayPacket
import util.network.epidemic.replay.ReplayPacketPrototype
import util.network.simulator.Address
import util.network.simulator.Network
class AverageNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) : EpidemicNode<Double, ReplayPacket<Double>>(address, network, initialValue, bufferSize) {
override fun receivePushOrReply(packet: ReplayPacket<Double>) {
val payload = packet.payload
if (packet.isReplay)
changeValue(payload)
else {
val newValue = (nodeValue + payload) / 2
changeValue(newValue)
send(
ReplayPacket(
address,
packet.senderAddress,
newValue,
EpidemicPacket.Type.PUSH,
true
)
)
sendPush()
}
}
override fun receivePull(packet: ReplayPacket<Double>) {
TODO("Not yet implemented")
}
override fun infect() {
Thread.interrupted()
}
override fun makeInfectionPrototypePacket(
type: EpidemicPacket.Type
): EpidemicPacketPrototype<Double> {
return ReplayPacketPrototype(
address,
nodeValue,
type,
false
)
}
}

View File

@ -0,0 +1,52 @@
package util.network.epidemic.consinstence
import util.network.epidemic.TimePayload
import util.network.epidemic.EpidemicNode
import util.network.epidemic.EpidemicPacket
import util.network.epidemic.EpidemicPacketPrototype
import util.network.epidemic.replay.ReplayPacketPrototype
import util.network.simulator.Address
import util.network.simulator.Network
class DbNode<T>(
address: Address,
network: Network,
initialValue: TimePayload<T>,
bufferSize: Int = 1
) : EpidemicNode<TimePayload<T>, EpidemicPacket<TimePayload<T>>>(
address, network, initialValue, bufferSize
) {
override fun receivePushOrReply(packet: EpidemicPacket<TimePayload<T>>) {
val receivedValue = packet.payload
if (nodeValue.timestamp < receivedValue.timestamp)
changeValue(receivedValue)
}
override fun receivePull(packet: EpidemicPacket<TimePayload<T>>) {
val receivedValue = packet.payload
if (nodeValue.timestamp > receivedValue.timestamp)
send(
EpidemicPacket(
address,
packet.senderAddress,
nodeValue,
EpidemicPacket.Type.REPLY
)
)
}
override fun infect() {
sendPushPull()
Thread.sleep(100)
}
override fun makeInfectionPrototypePacket(type: EpidemicPacket.Type): EpidemicPacketPrototype<TimePayload<T>> {
return EpidemicPacketPrototype(
address,
nodeValue,
type
)
}
}

View File

@ -0,0 +1,35 @@
package util.network.epidemic.consinstence
import util.network.epidemic.EpidemicPacket
import util.network.epidemic.replay.ReplayPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = ReplayPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.PUSH,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,30 @@
package util.network.epidemic.replay
import util.network.epidemic.EpidemicPacket
import util.network.epidemic.EpidemicPacketPrototype
import util.network.simulator.Address
class ReplayPacket<T>(
senderAddress: Address,
receiverAddress: Address,
payload: T,
type: Type,
val isReplay: Boolean
) : EpidemicPacket<T>(senderAddress, receiverAddress, payload, type)
class ReplayPacketPrototype<T> (
senderAddress: Address,
payload: T,
type: EpidemicPacket.Type,
val isReplay: Boolean
) : EpidemicPacketPrototype<T>(senderAddress, payload, type) {
override fun makePacket(receiverAddress: Address): EpidemicPacket<T> {
return ReplayPacket(
senderAddress,
receiverAddress,
payload,
type,
isReplay
)
}
}

View File

@ -32,7 +32,7 @@ abstract class Node(
return buffer.take() return buffer.take()
} }
protected fun send(packet: Packet) { fun send(packet: Packet) {
network[packet.receiverAddress]?.setPacket(packet) network[packet.receiverAddress]?.setPacket(packet)
} }

View File

@ -0,0 +1,16 @@
package util.network.simulator.packet
import util.network.simulator.Address
data class NetworkPacket(
override val senderAddress: Address,
override val receiverAddress: Address
) : Packet
data class NetworkPacketPrototype(
override val senderAddress: Address
): PacketPrototype {
fun makePacket(receiverAddress: Address): NetworkPacket {
return NetworkPacket(senderAddress, receiverAddress)
}
}

View File

@ -7,7 +7,6 @@ interface Packet {
val receiverAddress: Address val receiverAddress: Address
} }
data class NetworkPacket( interface PacketPrototype {
override val senderAddress: Address, val senderAddress: Address
override val receiverAddress: Address }
) : Packet

View File

@ -3,3 +3,7 @@ package util.network.simulator.packet
interface PayloadPacket<T>: Packet { interface PayloadPacket<T>: Packet {
val payload: T val payload: T
} }
interface PayloadPacketPrototype<T> : PacketPrototype {
val payload: T
}