Compare commits

...

1 Commits

Author SHA1 Message Date
b50890b44f
refactoring
All checks were successful
continuous-integration/drone/tag Build is passing
2020-05-03 19:38:55 +02:00
20 changed files with 74 additions and 77 deletions

View File

@ -21,26 +21,35 @@ class AverageNode(
if (packet.isFeedback) if (packet.isFeedback)
changeValue(payload) changeValue(payload)
else { else {
val newValue = (nodeValue + payload) / 2 computeAverage(payload)
sendFeedback(packet.senderAddress)
spreadNewValue()
}
}
changeValue(newValue) private fun computeAverage(payload: Double) {
val newValue = (nodeValue + payload) / 2
changeValue(newValue)
}
send( private fun sendFeedback(receiverAddress: Address) {
FeedbackEpidemicPacket( send(
address, FeedbackEpidemicPacket(
packet.senderAddress, address,
newValue, receiverAddress,
if (infectStrategy == InfectStrategy.PUSH) nodeValue,
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY, if (infectStrategy == InfectStrategy.PUSH)
true EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
) true
) )
)
}
when (infectStrategy) { private fun spreadNewValue() {
InfectStrategy.PUSH -> sendPush() when (infectStrategy) {
InfectStrategy.PULL -> sendPull() InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PUSHPULL -> sendPushPull() InfectStrategy.PULL -> sendPullToRandom()
} InfectStrategy.PUSHPULL -> sendPushPullToRandom()
} }
} }
@ -95,4 +104,4 @@ class AverageNode(
bufferSize: Int = 1 bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize) ) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
} }
} }

View File

@ -32,4 +32,4 @@ fun main() {
network.forEach { network.forEach {
println(1.0 / (it as AverageNode).nodeValue) println(1.0 / (it as AverageNode).nodeValue)
} }
} }

View File

@ -32,4 +32,4 @@ fun main() {
network.forEach { network.forEach {
println(1.0 / (it as AverageNode).nodeValue) println(1.0 / (it as AverageNode).nodeValue)
} }
} }

View File

@ -32,4 +32,4 @@ fun main() {
network.forEach { network.forEach {
println(1.0 / (it as AverageNode).nodeValue) println(1.0 / (it as AverageNode).nodeValue)
} }
} }

View File

@ -37,9 +37,9 @@ class DbNode<T>(
override fun infect() { override fun infect() {
when (infectStrategy) { when (infectStrategy) {
InfectStrategy.PUSH -> sendPush() InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPull() InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPull() InfectStrategy.PUSHPULL -> sendPushPullToRandom()
} }
Thread.sleep(DELTA) Thread.sleep(DELTA)
@ -81,4 +81,4 @@ class DbNode<T>(
bufferSize: Int = 1 bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize) ) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
} }
} }

View File

@ -4,7 +4,6 @@ import util.network.epidemic.TimedPayload
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
fun main() { fun main() {
val network = Network() val network = Network()
@ -29,4 +28,3 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
} }

View File

@ -4,7 +4,6 @@ import util.network.epidemic.TimedPayload
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
fun main() { fun main() {
val network = Network() val network = Network()
@ -29,4 +28,3 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
} }

View File

@ -4,7 +4,6 @@ import util.network.epidemic.TimedPayload
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
fun main() { fun main() {
val network = Network() val network = Network()
@ -29,4 +28,3 @@ fun main() {
println((it as DbNode<Int>).nodeValue) println((it as DbNode<Int>).nodeValue)
} }
} }

View File

@ -11,6 +11,7 @@ class QueryNode<T>(
address: Address, address: Address,
network: Network, network: Network,
initialValue: T, initialValue: T,
private val demotivatingFactor: Int = 10,
bufferSize: Int = 1 bufferSize: Int = 1
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>( ) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
address, network, initialValue, bufferSize address, network, initialValue, bufferSize
@ -32,26 +33,25 @@ class QueryNode<T>(
} }
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) { override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
//println(packet) when (nodeState) {
if (nodeState == State.REMOVED) State.REMOVED -> return
return State.INFECTED -> whenInfected(packet)
State.SUSCEPTIBLE -> whenSusceptible(packet)
}
}
if (nodeState == State.SUSCEPTIBLE) { private fun whenSusceptible(packet: FeedbackEpidemicPacket<T>) {
// println("susceptible") nodeState = State.INFECTED
nodeState = State.INFECTED queryValue = packet.payload
queryValue = packet.payload sendPushToRandom(2)
sendPush(1)
if (packet.payload == nodeValue) if (packet.payload == nodeValue)
println("find on node $address") println("find ${packet.payload} on node $address")
} else { }
if (packet.isFeedback && random.nextBoolean()) {
nodeState = State.REMOVED
println("removed")
}
private fun whenInfected(packet: FeedbackEpidemicPacket<T>) {
if (!packet.isFeedback)
send( send(
FeedbackEpidemicPacket( FeedbackEpidemicPacket(
address, address,
@ -61,7 +61,9 @@ class QueryNode<T>(
true true
) )
) )
}
if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor)
nodeState = State.REMOVED
} }
override fun receivePull(packet: FeedbackEpidemicPacket<T>) { override fun receivePull(packet: FeedbackEpidemicPacket<T>) {
@ -71,4 +73,4 @@ class QueryNode<T>(
override fun infect() { override fun infect() {
Thread.interrupted() Thread.interrupted()
} }
} }

View File

@ -5,12 +5,11 @@ import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
fun main() { fun main() {
val network = Network() val network = Network()
repeat(10) { repeat(20) {
QueryNode(Address(it), network, it*2) QueryNode(Address(it), network, it * 2)
} }
val alpha = network.getRandomNode() as QueryNode<Int> val alpha = network.getRandomNode() as QueryNode<Int>
@ -19,7 +18,7 @@ fun main() {
val packet = FeedbackEpidemicPacket( val packet = FeedbackEpidemicPacket(
alpha.address, alpha.address,
beta.address, beta.address,
14, 24,
EpidemicPacket.Type.PUSH, EpidemicPacket.Type.PUSH,
false false
) )
@ -28,4 +27,3 @@ fun main() {
network.start() network.start()
} }

View File

@ -5,7 +5,7 @@ import util.network.simulator.Address
import util.network.simulator.Network import util.network.simulator.Network
import util.network.simulator.Node import util.network.simulator.Node
abstract class EpidemicNode<T, P: EpidemicPacket<T>>( abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
address: Address, address: Address,
network: Network, network: Network,
initialValue: T, initialValue: T,
@ -41,21 +41,21 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
type: EpidemicPacket.Type type: EpidemicPacket.Type
): EpidemicPacket<T> ): EpidemicPacket<T>
protected fun sendPush(numberOfReceiver: Int = 1) { protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
sendToRandom( sendToRandom(
EpidemicPacket.Type.PUSH, EpidemicPacket.Type.PUSH,
numberOfReceiver numberOfReceiver
) )
} }
protected fun sendPull(numberOfReceiver: Int = 1) { protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom( sendToRandom(
EpidemicPacket.Type.PULL, EpidemicPacket.Type.PULL,
numberOfReceiver numberOfReceiver
) )
} }
protected fun sendPushPull(numberOfReceiver: Int = 1) { protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom( sendToRandom(
EpidemicPacket.Type.PUSHPULL, EpidemicPacket.Type.PUSHPULL,
numberOfReceiver numberOfReceiver
@ -69,8 +69,6 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
protected abstract fun infect() protected abstract fun infect()
protected fun onReceive(packet: P) { protected fun onReceive(packet: P) {
//println(packet.type)
when (packet.type) { when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet) EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
EpidemicPacket.Type.PULL -> receivePull(packet) EpidemicPacket.Type.PULL -> receivePull(packet)
@ -125,5 +123,4 @@ abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
return EpidemicSender(threadGroup) return EpidemicSender(threadGroup)
} }
} }
}
}

View File

@ -6,4 +6,4 @@ data class TimedPayload<T>(val value: T, val timestamp: Long) {
return TimedPayload(value, System.nanoTime()) return TimedPayload(value, System.nanoTime())
} }
} }
} }

View File

@ -2,12 +2,10 @@ package util.network.epidemic.packet
import util.network.simulator.packet.PayloadPacket import util.network.simulator.packet.PayloadPacket
interface EpidemicPacket<T>: PayloadPacket<T> { interface EpidemicPacket<T> : PayloadPacket<T> {
val type: Type val type: Type
enum class Type { enum class Type {
PUSH, PULL, PUSHPULL, REPLY PUSH, PULL, PUSHPULL, REPLY
} }
} }

View File

@ -10,4 +10,3 @@ data class FeedbackEpidemicPacket<T>(
override val type: EpidemicPacket.Type, override val type: EpidemicPacket.Type,
override val isFeedback: Boolean override val isFeedback: Boolean
) : EpidemicPacket<T>, FeedbackPacket ) : EpidemicPacket<T>, FeedbackPacket

View File

@ -8,4 +8,4 @@ data class TimedEpidemicPacket<T>(
override val receiverAddress: Address, override val receiverAddress: Address,
override val payload: TimedPayload<T>, override val payload: TimedPayload<T>,
override val type: EpidemicPacket.Type override val type: EpidemicPacket.Type
): EpidemicPacket<TimedPayload<T>> ) : EpidemicPacket<TimedPayload<T>>

View File

@ -17,7 +17,7 @@ class Network(threadGrupName: String = "network") {
addresses.add(node.address) addresses.add(node.address)
} }
fun getRandomAddress() : Address { fun getRandomAddress(): Address {
val index = random.nextInt(addresses.size) val index = random.nextInt(addresses.size)
return addresses[index] return addresses[index]

View File

@ -12,12 +12,12 @@ fun main() {
network.start() network.start()
} }
class TestNode(address: Address, network: Network): Node(address, network) { class TestNode(address: Address, network: Network) : Node(address, network) {
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory { override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return TestCommunicatorFactory(threadGroup) return TestCommunicatorFactory(threadGroup)
} }
private inner class TestReceiver(threadGroup: ThreadGroup): Receiver(threadGroup) { private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) {
override fun run() { override fun run() {
if (address != Address(0)) if (address != Address(0))
return return
@ -28,7 +28,7 @@ class TestNode(address: Address, network: Network): Node(address, network) {
} }
} }
private inner class TestSender(threadGroup: ThreadGroup): Sender(threadGroup) { private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) {
override fun run() { override fun run() {
if (address != Address(1)) if (address != Address(1))
return return
@ -39,7 +39,8 @@ class TestNode(address: Address, network: Network): Node(address, network) {
} }
} }
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory(threadGroup) { private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) :
CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver { override fun createReceiver(): Receiver {
return TestReceiver(threadGroup) return TestReceiver(threadGroup)
} }
@ -48,4 +49,4 @@ class TestNode(address: Address, network: Network): Node(address, network) {
return TestSender(threadGroup) return TestSender(threadGroup)
} }
} }
} }

View File

@ -1,5 +1,5 @@
package util.network.simulator.packet package util.network.simulator.packet
interface FeedbackPacket: Packet { interface FeedbackPacket : Packet {
val isFeedback: Boolean val isFeedback: Boolean
} }

View File

@ -6,4 +6,3 @@ data class NetworkPacket(
override val senderAddress: Address, override val senderAddress: Address,
override val receiverAddress: Address override val receiverAddress: Address
) : Packet ) : Packet

View File

@ -1,5 +1,5 @@
package util.network.simulator.packet package util.network.simulator.packet
interface PayloadPacket<T>: Packet { interface PayloadPacket<T> : Packet {
val payload: T val payload: T
} }