Compare commits

..

No commits in common. "master" and "network" have entirely different histories.

20 changed files with 77 additions and 74 deletions

View File

@ -21,35 +21,26 @@ class AverageNode(
if (packet.isFeedback)
changeValue(payload)
else {
computeAverage(payload)
sendFeedback(packet.senderAddress)
spreadNewValue()
}
}
val newValue = (nodeValue + payload) / 2
private fun computeAverage(payload: Double) {
val newValue = (nodeValue + payload) / 2
changeValue(newValue)
}
changeValue(newValue)
private fun sendFeedback(receiverAddress: Address) {
send(
FeedbackEpidemicPacket(
address,
receiverAddress,
nodeValue,
if (infectStrategy == InfectStrategy.PUSH)
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
true
send(
FeedbackEpidemicPacket(
address,
packet.senderAddress,
newValue,
if (infectStrategy == InfectStrategy.PUSH)
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
true
)
)
)
}
private fun spreadNewValue() {
when (infectStrategy) {
InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
when (infectStrategy) {
InfectStrategy.PUSH -> sendPush()
InfectStrategy.PULL -> sendPull()
InfectStrategy.PUSHPULL -> sendPushPull()
}
}
}

View File

@ -37,9 +37,9 @@ class DbNode<T>(
override fun infect() {
when (infectStrategy) {
InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
InfectStrategy.PUSH -> sendPush()
InfectStrategy.PULL -> sendPull()
InfectStrategy.PUSHPULL -> sendPushPull()
}
Thread.sleep(DELTA)

View File

@ -4,6 +4,7 @@ import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
@ -28,3 +29,4 @@ fun main() {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -4,6 +4,7 @@ import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
@ -28,3 +29,4 @@ fun main() {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -4,6 +4,7 @@ import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
@ -28,3 +29,4 @@ fun main() {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -11,7 +11,6 @@ class QueryNode<T>(
address: Address,
network: Network,
initialValue: T,
private val demotivatingFactor: Int = 10,
bufferSize: Int = 1
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
address, network, initialValue, bufferSize
@ -33,25 +32,26 @@ class QueryNode<T>(
}
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
when (nodeState) {
State.REMOVED -> return
State.INFECTED -> whenInfected(packet)
State.SUSCEPTIBLE -> whenSusceptible(packet)
}
}
//println(packet)
if (nodeState == State.REMOVED)
return
private fun whenSusceptible(packet: FeedbackEpidemicPacket<T>) {
nodeState = State.INFECTED
queryValue = packet.payload
if (nodeState == State.SUSCEPTIBLE) {
// println("susceptible")
nodeState = State.INFECTED
sendPushToRandom(2)
queryValue = packet.payload
sendPush(1)
if (packet.payload == nodeValue)
println("find ${packet.payload} on node $address")
}
if (packet.payload == nodeValue)
println("find on node $address")
} else {
if (packet.isFeedback && random.nextBoolean()) {
nodeState = State.REMOVED
println("removed")
}
private fun whenInfected(packet: FeedbackEpidemicPacket<T>) {
if (!packet.isFeedback)
send(
FeedbackEpidemicPacket(
address,
@ -61,9 +61,7 @@ class QueryNode<T>(
true
)
)
if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor)
nodeState = State.REMOVED
}
}
override fun receivePull(packet: FeedbackEpidemicPacket<T>) {

View File

@ -5,11 +5,12 @@ import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(20) {
QueryNode(Address(it), network, it * 2)
repeat(10) {
QueryNode(Address(it), network, it*2)
}
val alpha = network.getRandomNode() as QueryNode<Int>
@ -18,7 +19,7 @@ fun main() {
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
24,
14,
EpidemicPacket.Type.PUSH,
false
)
@ -27,3 +28,4 @@ fun main() {
network.start()
}

View File

@ -5,7 +5,7 @@ import util.network.simulator.Address
import util.network.simulator.Network
import util.network.simulator.Node
abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
abstract class EpidemicNode<T, P: EpidemicPacket<T>>(
address: Address,
network: Network,
initialValue: T,
@ -41,21 +41,21 @@ abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
type: EpidemicPacket.Type
): EpidemicPacket<T>
protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
protected fun sendPush(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSH,
numberOfReceiver
)
}
protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
protected fun sendPull(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PULL,
numberOfReceiver
)
}
protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
protected fun sendPushPull(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSHPULL,
numberOfReceiver
@ -69,6 +69,8 @@ abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
protected abstract fun infect()
protected fun onReceive(packet: P) {
//println(packet.type)
when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
EpidemicPacket.Type.PULL -> receivePull(packet)
@ -123,4 +125,5 @@ abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
return EpidemicSender(threadGroup)
}
}
}

View File

@ -2,10 +2,12 @@ package util.network.epidemic.packet
import util.network.simulator.packet.PayloadPacket
interface EpidemicPacket<T> : PayloadPacket<T> {
interface EpidemicPacket<T>: PayloadPacket<T> {
val type: Type
enum class Type {
PUSH, PULL, PUSHPULL, REPLY
}
}

View File

@ -10,3 +10,4 @@ data class FeedbackEpidemicPacket<T>(
override val type: EpidemicPacket.Type,
override val isFeedback: Boolean
) : EpidemicPacket<T>, FeedbackPacket

View File

@ -8,4 +8,4 @@ data class TimedEpidemicPacket<T>(
override val receiverAddress: Address,
override val payload: TimedPayload<T>,
override val type: EpidemicPacket.Type
) : EpidemicPacket<TimedPayload<T>>
): EpidemicPacket<TimedPayload<T>>

View File

@ -17,7 +17,7 @@ class Network(threadGrupName: String = "network") {
addresses.add(node.address)
}
fun getRandomAddress(): Address {
fun getRandomAddress() : Address {
val index = random.nextInt(addresses.size)
return addresses[index]

View File

@ -12,12 +12,12 @@ fun main() {
network.start()
}
class TestNode(address: Address, network: Network) : Node(address, network) {
class TestNode(address: Address, network: Network): Node(address, network) {
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return TestCommunicatorFactory(threadGroup)
}
private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) {
private inner class TestReceiver(threadGroup: ThreadGroup): Receiver(threadGroup) {
override fun run() {
if (address != Address(0))
return
@ -28,7 +28,7 @@ class TestNode(address: Address, network: Network) : Node(address, network) {
}
}
private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) {
private inner class TestSender(threadGroup: ThreadGroup): Sender(threadGroup) {
override fun run() {
if (address != Address(1))
return
@ -39,8 +39,7 @@ class TestNode(address: Address, network: Network) : Node(address, network) {
}
}
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) :
CommunicatorFactory(threadGroup) {
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return TestReceiver(threadGroup)
}

View File

@ -1,5 +1,5 @@
package util.network.simulator.packet
interface FeedbackPacket : Packet {
interface FeedbackPacket: Packet {
val isFeedback: Boolean
}

View File

@ -6,3 +6,4 @@ data class NetworkPacket(
override val senderAddress: Address,
override val receiverAddress: Address
) : Packet

View File

@ -1,5 +1,5 @@
package util.network.simulator.packet
interface PayloadPacket<T> : Packet {
interface PayloadPacket<T>: Packet {
val payload: T
}