Compare commits

...

10 Commits

Author SHA1 Message Date
Raffaele Mignone b50890b44f
refactoring
continuous-integration/drone/tag Build is passing Details
2020-05-03 19:38:55 +02:00
Raffaele Mignone 711da44f53
add gossip 2020-05-03 17:30:17 +02:00
Raffaele Mignone 581d9365ee
add average and db push, pull and pushpull 2020-05-03 14:57:03 +02:00
Raffaele Mignone 32ca1dd9a0
remove packet prototype 2020-05-03 11:04:28 +02:00
Raffaele Mignone 5688bd653a
add average 2020-05-02 22:27:43 +02:00
Raffaele Mignone 9690167176
basic network 2020-05-02 13:12:44 +02:00
Raffaele Mignone 41bc91cb98
add documentation
continuous-integration/drone/tag Build is passing Details
2020-04-25 18:54:20 +02:00
Raffaele Mignone bcc7d4de68
remove clone 2020-04-25 18:51:02 +02:00
Raffaele Mignone bca1b43fa4
add drill08 2020-04-25 18:49:06 +02:00
Raffaele Mignone 2cd3dd6549
add trust all package 2020-04-24 18:58:13 +02:00
51 changed files with 1140 additions and 27 deletions

View File

@ -21,6 +21,8 @@ dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5")
implementation("org.apache.activemq:activemq-client:5.15.12")
implementation("org.glassfish.jersey.containers:jersey-container-grizzly2-http:2.25")
implementation("org.glassfish.jersey.media:jersey-media-json-jackson:2.25")
testCompile("junit", "junit", "4.12")
detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:1.7.0-beta2")
}

View File

@ -263,7 +263,7 @@ formatting:
active: true
autoCorrect: true
NoSemicolons:
active: true
active: false
autoCorrect: true
NoTrailingSpaces:
active: true

3
doc/drill08.md Normal file
View File

@ -0,0 +1,3 @@
exercise 03 use `Client` from exercise 02
exercise 04 use `ReplicatedFinder` from exercise 03

View File

@ -1,7 +1,7 @@
package drills.drill07.exercise3.hello
import drills.drill07.exercise3.replier.ActiveMQReplier
import drills.drill07.exercise3.replier.Replier
import util.jms.replier.ActiveMQReplier
import util.jms.replier.Replier
import javax.jms.TextMessage
fun main() {

View File

@ -1,11 +1,12 @@
package drills.drill07.exercise3.hello
import drills.drill07.exercise3.requestor.ActiveMQRequestor
import drills.drill07.exercise3.requestor.Requestor
import util.jms.requestor.ActiveMQRequestor
import util.jms.requestor.Requestor
import javax.jms.TextMessage
fun main() {
val requestor: Requestor = ActiveMQRequestor("hello", shared = true)
val requestor: Requestor =
ActiveMQRequestor("hello", shared = true)
val msg = requestor.request(requestor.createTextMessage("Hello Home!")) as TextMessage

View File

@ -1,7 +1,7 @@
package drills.drill07.exercise4
import drills.drill07.exercise3.replier.ActiveMQReplier
import drills.drill07.exercise3.replier.Replier
import util.jms.replier.ActiveMQReplier
import util.jms.replier.Replier
fun main() {
val replier: Replier = ActiveMQReplier("factorial", shared = true)

View File

@ -1,10 +1,11 @@
package drills.drill07.exercise4
import drills.drill07.exercise3.requestor.ActiveMQRequestor
import drills.drill07.exercise3.requestor.Requestor
import util.jms.requestor.ActiveMQRequestor
import util.jms.requestor.Requestor
fun main() {
val requestor: Requestor = ActiveMQRequestor("factorial", shared = true)
val requestor: Requestor =
ActiveMQRequestor("factorial", shared = true)
println("Enter the number of which you want to calculate the factorial:")
val n = readLine()?.toInt()

View File

@ -0,0 +1,11 @@
package drills.drill08.exercise1
import util.jms.replicatedobject.ReplicatedObjectFactory
fun main() {
val factory = ReplicatedObjectFactory(Finder::class.java)
val finder = factory.create("finder")
println("ciao: ${finder.find("ciao")}")
println("ciao!: ${finder.find("ciao!")}")
}

View File

@ -0,0 +1,11 @@
package drills.drill08.exercise1
interface Finder {
fun find(str: String): Boolean
}
class FakeFinder : Finder {
override fun find(str: String): Boolean {
return str.length % 2 == 0
}
}

View File

@ -0,0 +1,10 @@
package drills.drill08.exercise1
import util.jms.replicatedobject.ReplicatedObject
fun main() {
val finder: Finder = FakeFinder()
ReplicatedObject(finder, "finder")
.start()
}

View File

@ -0,0 +1,27 @@
package drills.drill08.exercise2
import util.rmi.Client
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@ExperimentalTime
fun main() {
Client(clientHandler).start()
}
@ExperimentalTime
val clientHandler = {
val finder = Client.lookup("finder") as Finder
val workerNumber = 4
val workers = Array(workerNumber) { Worker(finder) }
val elapsed = measureTime {
workers.forEach { it.start() }
workers.forEach { it.join() }
}
println("Total elapsed time: $elapsed")
println("Throughput: ${(workerNumber * Worker.REQUEST_NUMBER) / elapsed.inSeconds} req/s")
}

View File

@ -0,0 +1,18 @@
package drills.drill08.exercise2
import java.rmi.Remote
import java.rmi.RemoteException
import java.rmi.server.UnicastRemoteObject
interface Finder : Remote {
@Throws(RemoteException::class)
fun find(str: String): Boolean
}
class FakeFinder : UnicastRemoteObject(), Finder {
override fun find(str: String): Boolean {
println(str)
for (i in 0 until 100000000L);
return str.length % 2 == 0
}
}

View File

@ -0,0 +1,10 @@
package drills.drill08.exercise2
import util.rmi.Server
fun main() {
Server {
val finder: Finder = FakeFinder()
Server.bind(finder, "finder")
}.start()
}

View File

@ -0,0 +1,22 @@
package drills.drill08.exercise2
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
class Worker(private val finder: Finder) : Thread() {
@ExperimentalTime
override fun run() {
val elapsed = measureTime {
repeat(REQUEST_NUMBER) {
finder.find("test-$it")
}
}
println("Thread $this elapsed: $elapsed")
println("Thread $this mean time for request: ${elapsed / REQUEST_NUMBER}")
}
companion object {
const val REQUEST_NUMBER = 60
}
}

View File

@ -0,0 +1,9 @@
package drills.drill08.exercise3
import drills.drill08.exercise2.FakeFinder
import util.jms.replicatedobject.ReplicatedObject
fun main() {
ReplicatedObject(FakeFinder(), "finder")
.start()
}

View File

@ -0,0 +1,21 @@
package drills.drill08.exercise3
import drills.drill08.exercise2.Finder
import util.jms.replicatedobject.ReplicatedObjectFactory
import util.rmi.Server
import java.rmi.server.UnicastRemoteObject
fun main() {
Server {
val finder = FinderProxy()
Server.bind(finder, "finder")
}.start()
}
class FinderProxy : Finder, UnicastRemoteObject() {
private val finder = ReplicatedObjectFactory(Finder::class.java)
.create("finder")
override fun find(str: String): Boolean {
return finder.find(str)
}
}

View File

@ -0,0 +1,23 @@
package drills.drill08.exercise4
import drills.drill08.exercise2.Finder
import util.jms.replicatedobject.ReplicatedObjectFactory
import javax.ws.rs.Consumes
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.QueryParam
import javax.ws.rs.core.MediaType
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/search")
class SearchService {
private val finder = ReplicatedObjectFactory(Finder::class.java)
.create("finder")
@GET
fun search(@QueryParam("str") str: String): Boolean {
return finder.find(str)
}
}

View File

@ -0,0 +1,18 @@
package drills.drill08.exercise4
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory
import org.glassfish.jersey.server.ResourceConfig
import javax.ws.rs.core.UriBuilder
fun main() {
val uri = UriBuilder
.fromUri("http://localhost/rest")
.port(8484)
.build()
val resConfig = ResourceConfig().register(SearchService())
val server = GrizzlyHttpServerFactory.createHttpServer(uri, resConfig)
server.start()
while (server.isStarted);
}

View File

@ -0,0 +1,107 @@
package drills.drill09.average
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
class AverageNode(
address: Address,
network: Network,
initialValue: Double,
val infectStrategy: InfectStrategy = InfectStrategy.PUSH,
bufferSize: Int = 1
) : EpidemicNode<Double, FeedbackEpidemicPacket<Double>>(
address, network, initialValue, bufferSize
) {
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<Double>) {
val payload = packet.payload
if (packet.isFeedback)
changeValue(payload)
else {
computeAverage(payload)
sendFeedback(packet.senderAddress)
spreadNewValue()
}
}
private fun computeAverage(payload: Double) {
val newValue = (nodeValue + payload) / 2
changeValue(newValue)
}
private fun sendFeedback(receiverAddress: Address) {
send(
FeedbackEpidemicPacket(
address,
receiverAddress,
nodeValue,
if (infectStrategy == InfectStrategy.PUSH)
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
true
)
)
}
private fun spreadNewValue() {
when (infectStrategy) {
InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
}
}
override fun receivePull(packet: FeedbackEpidemicPacket<Double>) {
if (packet.type == EpidemicPacket.Type.PUSHPULL)
return
send(
makeInfectionPacket(
packet.senderAddress,
EpidemicPacket.Type.REPLY
)
)
}
override fun infect() {
Thread.interrupted()
}
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<Double> {
return FeedbackEpidemicPacket(
address,
receiverAddress,
nodeValue,
type,
false
)
}
companion object {
fun createPushNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
fun createPullNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
fun createPushPullNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
}
}

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPullNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.REPLY,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPushNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.PUSH,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPushPullNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.REPLY,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,84 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.TimedEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
class DbNode<T>(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
val infectStrategy: InfectStrategy = InfectStrategy.PUSHPULL,
bufferSize: Int = 1
) : EpidemicNode<TimedPayload<T>, TimedEpidemicPacket<T>>(
address, network, initialValue, bufferSize
) {
override fun receivePushOrReply(packet: TimedEpidemicPacket<T>) {
val receivedValue = packet.payload
if (nodeValue.timestamp < receivedValue.timestamp)
changeValue(receivedValue)
}
override fun receivePull(packet: TimedEpidemicPacket<T>) {
val receivedValue = packet.payload
if (nodeValue.timestamp > receivedValue.timestamp)
send(
makeInfectionPacket(
packet.senderAddress,
EpidemicPacket.Type.REPLY
)
)
}
override fun infect() {
when (infectStrategy) {
InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
}
Thread.sleep(DELTA)
}
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<TimedPayload<T>> {
return TimedEpidemicPacket(
address,
receiverAddress,
nodeValue,
type
)
}
companion object {
const val DELTA = 100L
fun <T> createPushNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
fun <T> createPullNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
fun <T> createPushPullNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
}
}

View File

@ -0,0 +1,30 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPullNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,30 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPushNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,30 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPushPullNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,76 @@
package drills.drill09.gossip
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
import kotlin.random.Random
class QueryNode<T>(
address: Address,
network: Network,
initialValue: T,
private val demotivatingFactor: Int = 10,
bufferSize: Int = 1
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
address, network, initialValue, bufferSize
) {
private var queryValue: T = nodeValue
private var random = Random(System.currentTimeMillis())
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<T> {
return FeedbackEpidemicPacket(
address,
receiverAddress,
queryValue,
type,
false
)
}
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
when (nodeState) {
State.REMOVED -> return
State.INFECTED -> whenInfected(packet)
State.SUSCEPTIBLE -> whenSusceptible(packet)
}
}
private fun whenSusceptible(packet: FeedbackEpidemicPacket<T>) {
nodeState = State.INFECTED
queryValue = packet.payload
sendPushToRandom(2)
if (packet.payload == nodeValue)
println("find ${packet.payload} on node $address")
}
private fun whenInfected(packet: FeedbackEpidemicPacket<T>) {
if (!packet.isFeedback)
send(
FeedbackEpidemicPacket(
address,
packet.senderAddress,
queryValue,
EpidemicPacket.Type.PUSH,
true
)
)
if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor)
nodeState = State.REMOVED
}
override fun receivePull(packet: FeedbackEpidemicPacket<T>) {
TODO("Not yet implemented")
}
override fun infect() {
Thread.interrupted()
}
}

View File

@ -0,0 +1,29 @@
package drills.drill09.gossip
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(20) {
QueryNode(Address(it), network, it * 2)
}
val alpha = network.getRandomNode() as QueryNode<Int>
val beta = network.getRandomNode() as QueryNode<Int>
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
24,
EpidemicPacket.Type.PUSH,
false
)
alpha.send(packet)
network.start()
}

View File

@ -0,0 +1,17 @@
package util.jms.replicatedobject
import java.io.Serializable
data class Call(
val methodName: String,
val args: Array<out Any>
) : Serializable {
fun execOn(target: Any): Any {
val method = target::class.java.getMethod(
methodName,
*args.map { it::class.java }.toTypedArray()
)
return method.invoke(target, *args)
}
}

View File

@ -0,0 +1,24 @@
package util.jms.replicatedobject
import util.jms.replier.ActiveMQReplier
import java.io.Serializable
import javax.jms.ObjectMessage
class ReplicatedObject<T : Any>(
private val target: T,
queueName: String,
shared: Boolean = false
) {
private val replier = ActiveMQReplier(queueName, shared)
init {
replier.onRequest {
val call = (it as ObjectMessage).`object` as Call
replier.createObjectMessage(call.execOn(target) as Serializable)
}
}
fun start() {
replier.start()
}
}

View File

@ -0,0 +1,16 @@
package util.jms.replicatedobject
import util.jms.requestor.ActiveMQRequestor
import java.lang.reflect.Proxy
class ReplicatedObjectFactory<T>(private val classType: Class<T>) {
fun create(queueName: String, shared: Boolean = false): T {
val requestor = ActiveMQRequestor(queueName, shared)
return Proxy.newProxyInstance(
this::class.java.classLoader,
arrayOf(classType),
ReplicatedObjectHandler(requestor)
) as T
}
}

View File

@ -0,0 +1,18 @@
package util.jms.replicatedobject
import util.jms.requestor.Requestor
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import javax.jms.ObjectMessage
class ReplicatedObjectHandler(
private val requestor: Requestor
) : InvocationHandler {
override fun invoke(p0: Any, p1: Method, p2: Array<out Any>): Any {
val call = Call(p1.name, p2)
val msg = requestor.createObjectMessage(call)
return (requestor.request(msg) as ObjectMessage).`object`
}
}

View File

@ -1,5 +1,6 @@
package drills.drill07.exercise3.replier
package util.jms.replier
import java.io.Serializable
import javax.jms.Message
import javax.jms.Queue
import javax.jms.QueueConnectionFactory
@ -13,12 +14,12 @@ abstract class AbstarctReplier(
) : Replier {
private val factory: QueueConnectionFactory = this.getFactory()
private val connection = factory.createQueueConnection()
private val session = connection.createQueueSession(
private val mainSession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
private val mainQueue = session.createQueue(queueName)
private val receiver = session.createReceiver(mainQueue)
private val mainQueue = mainSession.createQueue(queueName)
private val receiver = mainSession.createReceiver(mainQueue)
private val secondarySession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
@ -37,18 +38,19 @@ abstract class AbstarctReplier(
}
}
override fun createTextMessage(text: String): TextMessage = session.createTextMessage(text)
override fun createTextMessage(text: String): TextMessage = mainSession.createTextMessage(text)
override fun createObjectMessage(): ObjectMessage = session.createObjectMessage()
override fun createObjectMessage(obj: Serializable?): ObjectMessage =
mainSession.createObjectMessage(obj)
override fun createMessage(): Message = session.createMessage()
override fun createMessage(): Message = mainSession.createMessage()
override fun start() {
connection.start()
}
override fun close() {
session.close()
mainSession.close()
connection.close()
}

View File

@ -1,4 +1,4 @@
package drills.drill07.exercise3.replier
package util.jms.replier
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
@ -8,4 +8,5 @@ class ActiveMQReplier(
shared: Boolean = false
) : AbstarctReplier(queueName, shared) {
override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri)
.also { it.isTrustAllPackages = true }
}

View File

@ -1,5 +1,6 @@
package drills.drill07.exercise3.replier
package util.jms.replier
import java.io.Serializable
import javax.jms.Message
import javax.jms.ObjectMessage
import javax.jms.TextMessage
@ -7,7 +8,7 @@ import javax.jms.TextMessage
interface Replier {
fun onRequest(buildResponse: (Message) -> Message)
fun createTextMessage(text: String = ""): TextMessage
fun createObjectMessage(): ObjectMessage
fun createObjectMessage(obj: Serializable? = null): ObjectMessage
fun createMessage(): Message
fun start()
fun close()

View File

@ -1,5 +1,6 @@
package drills.drill07.exercise3.requestor
package util.jms.requestor
import java.io.Serializable
import javax.jms.Message
import javax.jms.QueueConnectionFactory
import javax.jms.Session
@ -42,7 +43,8 @@ abstract class AbstractRequestor(
override fun createTextMessage(text: String): TextMessage = mainSession.createTextMessage(text)
override fun createObjectMessage(): ObjectMessage = mainSession.createObjectMessage()
override fun createObjectMessage(obj: Serializable?): ObjectMessage =
mainSession.createObjectMessage(obj)
override fun createMessage(): Message = mainSession.createMessage()

View File

@ -1,4 +1,4 @@
package drills.drill07.exercise3.requestor
package util.jms.requestor
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
@ -8,4 +8,5 @@ class ActiveMQRequestor(
shared: Boolean = false
) : AbstractRequestor(queueName, shared) {
override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri)
.also { it.isTrustAllPackages = true }
}

View File

@ -1,5 +1,6 @@
package drills.drill07.exercise3.requestor
package util.jms.requestor
import java.io.Serializable
import javax.jms.Message
import javax.jms.ObjectMessage
import javax.jms.TextMessage
@ -7,7 +8,7 @@ import javax.jms.TextMessage
interface Requestor {
fun request(message: Message): Message
fun createTextMessage(text: String = ""): TextMessage
fun createObjectMessage(): ObjectMessage
fun createObjectMessage(obj: Serializable? = null): ObjectMessage
fun createMessage(): Message
fun close()
}

View File

@ -0,0 +1,126 @@
package util.network.epidemic
import util.network.epidemic.packet.EpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
import util.network.simulator.Node
abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
address: Address,
network: Network,
initialValue: T,
bufferSize: Int = 1
) : Node(address, network, bufferSize) {
@Volatile
var nodeValue = initialValue
private set
protected var nodeState = State.SUSCEPTIBLE
@Synchronized
fun changeValue(value: T) {
nodeValue = value
}
private fun sendToRandom(
type: EpidemicPacket.Type,
numberOfReceiver: Int = 1
) {
val toExclude = mutableListOf(address)
repeat(numberOfReceiver) {
var receiverAddress = network.getRandomAddress()
while (toExclude.contains(receiverAddress))
receiverAddress = network.getRandomAddress()
toExclude.add(receiverAddress)
send(makeInfectionPacket(receiverAddress, type))
}
}
abstract fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<T>
protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSH,
numberOfReceiver
)
}
protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PULL,
numberOfReceiver
)
}
protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSHPULL,
numberOfReceiver
)
}
protected abstract fun receivePushOrReply(packet: P)
protected abstract fun receivePull(packet: P)
protected abstract fun infect()
protected fun onReceive(packet: P) {
when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
EpidemicPacket.Type.PULL -> receivePull(packet)
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
EpidemicPacket.Type.PUSHPULL -> {
receivePull(packet)
receivePushOrReply(packet)
}
}
}
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return EpidemicCommunicatorFactory(threadGroup)
}
protected enum class State {
SUSCEPTIBLE, INFECTED, REMOVED
}
enum class InfectStrategy {
PUSH, PULL, PUSHPULL
}
protected inner class EpidemicReceiver(
threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) {
override fun run() {
while (!isInterrupted) {
val packet = receive() as P
onReceive(packet)
}
}
}
protected inner class EpidemicSender(
threadGroup: ThreadGroup
) : Node.Sender(threadGroup) {
override fun run() {
while (!isInterrupted)
infect()
}
}
protected inner class EpidemicCommunicatorFactory(
threadGroup: ThreadGroup
) : Node.CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return EpidemicReceiver(threadGroup)
}
override fun createSender(): Sender {
return EpidemicSender(threadGroup)
}
}
}

View File

@ -0,0 +1,9 @@
package util.network.epidemic
data class TimedPayload<T>(val value: T, val timestamp: Long) {
companion object {
fun <T> createContent(value: T): TimedPayload<T> {
return TimedPayload(value, System.nanoTime())
}
}
}

View File

@ -0,0 +1,11 @@
package util.network.epidemic.packet
import util.network.simulator.packet.PayloadPacket
interface EpidemicPacket<T> : PayloadPacket<T> {
val type: Type
enum class Type {
PUSH, PULL, PUSHPULL, REPLY
}
}

View File

@ -0,0 +1,12 @@
package util.network.epidemic.packet
import util.network.simulator.Address
import util.network.simulator.packet.FeedbackPacket
data class FeedbackEpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: T,
override val type: EpidemicPacket.Type,
override val isFeedback: Boolean
) : EpidemicPacket<T>, FeedbackPacket

View File

@ -0,0 +1,11 @@
package util.network.epidemic.packet
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
data class TimedEpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: TimedPayload<T>,
override val type: EpidemicPacket.Type
) : EpidemicPacket<TimedPayload<T>>

View File

@ -0,0 +1,3 @@
package util.network.simulator
inline class Address(val value: Int)

View File

@ -0,0 +1,41 @@
package util.network.simulator
import kotlin.random.Random
class Network(threadGrupName: String = "network") {
private val nodes = mutableMapOf<Address, Node>()
private val addresses = mutableListOf<Address>()
val threadGroup = ThreadGroup(threadGrupName)
private val random by lazy { Random(System.currentTimeMillis()) }
operator fun get(address: Address): Node? {
return nodes[address]
}
fun addNode(node: Node) {
nodes[node.address] = node
addresses.add(node.address)
}
fun getRandomAddress(): Address {
val index = random.nextInt(addresses.size)
return addresses[index]
}
fun getRandomNode(): Node {
return nodes[getRandomAddress()]!!
}
fun start() {
nodes.forEach { (_, node) ->
node.start()
}
}
fun forEach(action: (Node) -> Unit) {
nodes.forEach { (_, node) ->
action(node)
}
}
}

View File

@ -0,0 +1,61 @@
package util.network.simulator
import util.network.simulator.packet.Packet
import java.util.concurrent.LinkedBlockingQueue
abstract class Node(
val address: Address,
protected val network: Network,
bufferSize: Int = 1
) {
private val buffer = LinkedBlockingQueue<Packet>(bufferSize)
private val receiver: Receiver
private val sender: Sender
init {
network.addNode(this)
val factory = communicatorFactory(network.threadGroup)
receiver = factory.createReceiver()
sender = factory.createSender()
}
fun start() {
receiver.start()
sender.start()
}
private fun setPacket(packet: Packet) {
buffer.put(packet)
}
private fun getPacket(): Packet {
return buffer.take()
}
fun send(packet: Packet) {
network[packet.receiverAddress]?.setPacket(packet)
}
protected fun receive(): Packet = getPacket()
protected abstract fun communicatorFactory(
threadGroup: ThreadGroup
): CommunicatorFactory
protected abstract inner class CommunicatorFactory(
protected val threadGroup: ThreadGroup
) {
abstract fun createReceiver(): Receiver
abstract fun createSender(): Sender
}
protected abstract inner class Receiver(threadGroup: ThreadGroup) : Thread(
threadGroup,
"${threadGroup.name}-receiver"
)
protected abstract inner class Sender(threadGroup: ThreadGroup) : Thread(
threadGroup,
"${threadGroup.name}-sender"
)
}

View File

@ -0,0 +1,52 @@
package util.network.simulator
import util.network.simulator.packet.NetworkPacket
fun main() {
val network = Network()
repeat(2) {
TestNode(Address(it), network)
}
network.start()
}
class TestNode(address: Address, network: Network) : Node(address, network) {
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return TestCommunicatorFactory(threadGroup)
}
private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) {
override fun run() {
if (address != Address(0))
return
val packet = receive()
println(packet)
}
}
private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) {
override fun run() {
if (address != Address(1))
return
val packet = NetworkPacket(address, Address(0))
send(packet)
}
}
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) :
CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return TestReceiver(threadGroup)
}
override fun createSender(): Sender {
return TestSender(threadGroup)
}
}
}

View File

@ -0,0 +1,5 @@
package util.network.simulator.packet
interface FeedbackPacket : Packet {
val isFeedback: Boolean
}

View File

@ -0,0 +1,8 @@
package util.network.simulator.packet
import util.network.simulator.Address
data class NetworkPacket(
override val senderAddress: Address,
override val receiverAddress: Address
) : Packet

View File

@ -0,0 +1,8 @@
package util.network.simulator.packet
import util.network.simulator.Address
interface Packet {
val senderAddress: Address
val receiverAddress: Address
}

View File

@ -0,0 +1,5 @@
package util.network.simulator.packet
interface PayloadPacket<T> : Packet {
val payload: T
}