Compare commits

...

15 Commits

Author SHA1 Message Date
Raffaele Mignone b50890b44f
refactoring
continuous-integration/drone/tag Build is passing Details
2020-05-03 19:38:55 +02:00
Raffaele Mignone 711da44f53
add gossip 2020-05-03 17:30:17 +02:00
Raffaele Mignone 581d9365ee
add average and db push, pull and pushpull 2020-05-03 14:57:03 +02:00
Raffaele Mignone 32ca1dd9a0
remove packet prototype 2020-05-03 11:04:28 +02:00
Raffaele Mignone 5688bd653a
add average 2020-05-02 22:27:43 +02:00
Raffaele Mignone 9690167176
basic network 2020-05-02 13:12:44 +02:00
Raffaele Mignone 41bc91cb98
add documentation
continuous-integration/drone/tag Build is passing Details
2020-04-25 18:54:20 +02:00
Raffaele Mignone bcc7d4de68
remove clone 2020-04-25 18:51:02 +02:00
Raffaele Mignone bca1b43fa4
add drill08 2020-04-25 18:49:06 +02:00
Raffaele Mignone 2cd3dd6549
add trust all package 2020-04-24 18:58:13 +02:00
Raffaele Mignone 1381d706ed
add share tmpQueue
continuous-integration/drone/tag Build is passing Details
2020-04-22 18:01:59 +02:00
Raffaele Mignone 10bf3e3588
add drill 07 2020-04-22 11:59:55 +02:00
Raffaele Mignone d8f4e8bedf
add drill06
continuous-integration/drone/tag Build is passing Details
2020-04-21 16:33:19 +02:00
Raffaele Mignone 97f6fcbf82
fix style 2020-03-29 18:12:33 +02:00
Raffaele Mignone 4782ec428f
add future implementation 2020-03-29 18:12:08 +02:00
91 changed files with 2419 additions and 5 deletions

View File

@ -4,7 +4,7 @@
<component name="JavaScriptSettings">
<option name="languageLevel" value="ES6" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_11" project-jdk-name="11" project-jdk-type="JavaSDK">
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@ -20,6 +20,9 @@ group = "it.norangeb.unisannio"
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5")
implementation("org.apache.activemq:activemq-client:5.15.12")
implementation("org.glassfish.jersey.containers:jersey-container-grizzly2-http:2.25")
implementation("org.glassfish.jersey.media:jersey-media-json-jackson:2.25")
testCompile("junit", "junit", "4.12")
detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:1.7.0-beta2")
}

View File

@ -263,7 +263,7 @@ formatting:
active: true
autoCorrect: true
NoSemicolons:
active: true
active: false
autoCorrect: true
NoTrailingSpaces:
active: true
@ -501,9 +501,9 @@ style:
active: true
maxJumpCount: 1
MagicNumber:
active: true
active: false
excludes: "**/test/**,**/androidTest/**,**/*.Test.kt,**/*.Spec.kt,**/*.Spek.kt"
ignoreNumbers: '-1,0,1,2,100,500,1000'
ignoreNumbers: '-1,0,1,2,3,42,100,500,1000,4242'
ignoreHashCodeFunction: true
ignorePropertyDeclaration: false
ignoreLocalVariableDeclaration: false

3
doc/drill08.md Normal file
View File

@ -0,0 +1,3 @@
exercise 03 use `Client` from exercise 02
exercise 04 use `ReplicatedFinder` from exercise 03

View File

@ -1,6 +1,6 @@
package drills.drill03.exercise2
class PrintAgent(private val nodes: Array<Node>): AbstractAgent(){
class PrintAgent(private val nodes: Array<Node>) : AbstractAgent() {
private var index = 0
private val names = mutableListOf<String>()

View File

@ -0,0 +1,28 @@
package drills.drill06.exercise1.point1
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
connection.start()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("hello")
val receiver = session.createReceiver(queue)
val msg = receiver.receive()
println((msg as TextMessage).text)
session.close()
connection.close()
}

View File

@ -0,0 +1,25 @@
package drills.drill06.exercise1.point1
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("hello")
val sender = session.createSender(queue)
val msg = session.createTextMessage("Hello Home!")
sender.send(msg)
session.close()
connection.close()
}

View File

@ -0,0 +1,28 @@
package drills.drill06.exercise1.point2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
connection.start()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("hello")
val receiver = session.createReceiver(queue)
receiver.setMessageListener {
println((it as TextMessage).text)
session.close()
connection.close()
}
}

View File

@ -0,0 +1,25 @@
package drills.drill06.exercise1.point2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("hello")
val sender = session.createSender(queue)
val msg = session.createTextMessage("Hello Home!")
sender.send(msg)
session.close()
connection.close()
}

View File

@ -0,0 +1,25 @@
package drills.drill06.exercise1.point3
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
fun main() {
val url = "tcp://localhost:61616"
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createTopicConnection()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic("hello")
val publisher = session.createPublisher(topic)
val msg = session.createTextMessage("Hello Home!")
publisher.publish(msg)
session.close()
connection.close()
}

View File

@ -0,0 +1,28 @@
package drills.drill06.exercise1.point3
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
import javax.jms.TopicConnectionFactory
fun main() {
val url = "tcp://localhost:61616"
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic("hello")
val subscriber = session.createSubscriber(topic)
val msg = subscriber.receive()
println((msg as TextMessage).text)
session.close()
connection.close()
}

View File

@ -0,0 +1,25 @@
package drills.drill06.exercise1.point4
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
fun main() {
val url = "tcp://localhost:61616"
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createTopicConnection()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic("hello")
val publisher = session.createPublisher(topic)
val msg = session.createTextMessage("Hello Home!")
publisher.publish(msg)
session.close()
connection.close()
}

View File

@ -0,0 +1,28 @@
package drills.drill06.exercise1.point4
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
import javax.jms.TopicConnectionFactory
fun main() {
val url = "tcp://localhost:61616"
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic("hello")
val subscriber = session.createSubscriber(topic)
subscriber.setMessageListener {
println((it as TextMessage).text)
session.close()
connection.close()
}
}

View File

@ -0,0 +1,25 @@
package drills.drill06.exercise1.point5
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
fun main() {
val url = "tcp://localhost:61616"
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createTopicConnection()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic("hello")
val publisher = session.createPublisher(topic)
val msg = session.createTextMessage("Hello Home!")
publisher.publish(msg)
session.close()
connection.close()
}

View File

@ -0,0 +1,29 @@
package drills.drill06.exercise1.point5
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
import javax.jms.TopicConnectionFactory
fun main() {
val url = "tcp://localhost:61616"
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createTopicConnection()
connection.clientID = "client1"
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic("hello")
val subscriber = session.createDurableSubscriber(topic, "s1")
subscriber.setMessageListener {
println((it as TextMessage).text)
session.close()
connection.close()
}
}

View File

@ -0,0 +1,38 @@
package drills.drill06.exercise2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
import javax.jms.TopicConnectionFactory
fun main() {
val factory: TopicConnectionFactory = ActiveMQConnectionFactory()
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val room = session.createTopic("movies")
val publisher = session.createPublisher(room)
val subscriber = session.createSubscriber(room)
subscriber.setMessageListener {
println("peer: ${(it as TextMessage).text}")
}
var line = readLine()
val msg = session.createTextMessage()
while (line != ".") {
msg.text = line
publisher.publish(msg)
line = readLine()
}
session.close()
connection.close()
}

View File

@ -0,0 +1,41 @@
package drills.drill06.exercise3
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
import javax.jms.TopicConnectionFactory
fun main() {
val factory: TopicConnectionFactory = ActiveMQConnectionFactory()
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val room = session.createTopic("movies")
val publisher = session.createPublisher(room)
val selector = "Subtopic = 'Dark Comedy'"
val subscriber = session.createSubscriber(room, selector, true)
subscriber.setMessageListener {
println("peer: ${(it as TextMessage).text}")
}
var line = readLine()
val msg = session.createTextMessage()
while (line != ".") {
msg.text = line
msg.setStringProperty("Subtopic", "Dark Comedy")
publisher.publish(msg)
line = readLine()
}
session.close()
connection.close()
}

View File

@ -0,0 +1,7 @@
package drills.drill06.exercise4
object SensorNet {
const val TOPIC = "sensorNet"
const val URI = "tcp://localhost:61616"
const val PROPERTY = "temperature"
}

View File

@ -0,0 +1,28 @@
package drills.drill06.exercise4.actuator
interface Action {
enum class State {
ON, OFF
}
fun getActionName(): String
fun getState(): State
fun on()
fun off()
}
abstract class VirtualAction : Action {
private var state = Action.State.OFF
override fun getState(): Action.State = state
override fun off() {
println("Switching off: ${getActionName()}")
state = Action.State.OFF
}
override fun on() {
println("Switching on: ${getActionName()}")
state = Action.State.ON
}
}

View File

@ -0,0 +1,33 @@
package drills.drill06.exercise4.actuator
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class Actuator(private val action: Action) : Thread() {
var latch = CountDownLatch(1)
override fun run() {
while (true) {
latch.await()
synchronized(this) {
latch = CountDownLatch(1)
}
if (action.getState() == Action.State.OFF)
action.on()
latch.await(2050, TimeUnit.MILLISECONDS)
synchronized(this) {
if (latch.count == 1L)
action.off()
}
}
}
@Synchronized
fun actuate() {
latch.countDown()
}
}

View File

@ -0,0 +1,12 @@
package drills.drill06.exercise4.actuator
import drills.drill06.exercise4.SensorNet
fun main() {
TemperatureActuatorNode(
SensorNet.URI,
"anti-frost",
SensorNet.TOPIC,
"<= 4"
).start()
}

View File

@ -0,0 +1,12 @@
package drills.drill06.exercise4.actuator
import drills.drill06.exercise4.SensorNet
fun main() {
TemperatureActuatorNode(
SensorNet.URI,
"irrigation",
SensorNet.TOPIC,
">= 20"
).start()
}

View File

@ -0,0 +1,44 @@
package drills.drill06.exercise4.actuator
import drills.drill06.exercise4.SensorNet
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
class TemperatureActuatorNode(
private val uri: String,
private val name: String,
private val topicName: String,
private val selector: String
) {
private val actuator = Actuator(object : VirtualAction() {
override fun getActionName(): String = name
})
init {
actuator.start()
}
fun start() {
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(uri)
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic(topicName)
val subscriber = session.createSubscriber(
topic,
"${SensorNet.PROPERTY} $selector",
false
)
subscriber.setMessageListener {
println("$name: ${it.getDoubleProperty("temperature")}")
actuator.actuate()
}
}
}

View File

@ -0,0 +1,21 @@
package drills.drill06.exercise4.sensor
import kotlin.random.Random
interface Sampler<T> {
fun getSample(): T
}
class SampleGenerator(seed: Int = 0) : Sampler<Double> {
private val samples = listOf(
6, 4, 3, 2, 4, 10, 18, 20, 22, 24, 20, 15, 10, 7
)
private val random = Random(seed)
private var i = 0
override fun getSample(): Double {
return samples[i++ % samples.size].toDouble() + random.nextDouble(-1.5, 1.5)
}
}

View File

@ -0,0 +1,44 @@
package drills.drill06.exercise4.sensor
import drills.drill06.exercise4.SensorNet
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
class TemperatureSensorNode(val id: Int, val uri: String, val topicName: String) {
private val sampler: Sampler<Double> =
SampleGenerator(id)
fun start() {
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(uri)
val connection = factory.createTopicConnection()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topic = session.createTopic(topicName)
val publisher = session.createPublisher(topic)
val msg = session.createMessage()
while (true) {
val sample = sampler.getSample()
msg.setDoubleProperty(SensorNet.PROPERTY, sample)
publisher.publish(msg)
println("Sensor-$id: $sample")
Thread.sleep(2000)
}
}
}
fun main() {
TemperatureSensorNode(
0,
SensorNet.URI,
SensorNet.TOPIC
).start()
}

View File

@ -0,0 +1,8 @@
package drills.drill06.exercise5
object SensorNet {
const val TOPIC_SENSOR = "sensor"
const val TOPIC_ACTUATOR = "actuator"
const val URI = "tcp://localhost:61616"
const val PROPERTY = "temperature"
}

View File

@ -0,0 +1,13 @@
package drills.drill06.exercise5.actuator
import drills.drill06.exercise4.actuator.TemperatureActuatorNode
import drills.drill06.exercise5.SensorNet
fun main() {
TemperatureActuatorNode(
SensorNet.URI,
"anti-frost",
SensorNet.TOPIC_ACTUATOR,
"<= 4"
).start()
}

View File

@ -0,0 +1,13 @@
package drills.drill06.exercise5.actuator
import drills.drill06.exercise4.actuator.TemperatureActuatorNode
import drills.drill06.exercise5.SensorNet
fun main() {
TemperatureActuatorNode(
SensorNet.URI,
"irrigation",
SensorNet.TOPIC_ACTUATOR,
">= 20"
).start()
}

View File

@ -0,0 +1,62 @@
package drills.drill06.exercise5.middleware
import drills.drill06.exercise5.SensorNet
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
class MeanNode(
private val uri: String,
private val topicNameSensor: String,
private val topicNameActuator: String
) {
fun start() {
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(uri)
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topicSensor = session.createTopic(topicNameSensor)
val topicActuator = session.createTopic(topicNameActuator)
val subscriber = session.createSubscriber(topicSensor)
val publisher = session.createPublisher(topicActuator)
val samples = mutableListOf<Double>()
val msg = session.createMessage()
while (true) {
val receiveMsg = subscriber.receiveNoWait()
if (receiveMsg != null) {
val sample = receiveMsg.getDoubleProperty("temperature")
samples.add(sample)
println("sample: $sample")
continue
}
if (samples.isNotEmpty()) {
val mean = samples.sum() / samples.size
samples.clear()
msg.setDoubleProperty("temperature", mean)
publisher.publish(msg)
println("MEAN: $mean")
println("-------------------------")
}
Thread.sleep(2000)
}
}
}
fun main() {
MeanNode(
SensorNet.URI,
SensorNet.TOPIC_SENSOR,
SensorNet.TOPIC_ACTUATOR
).start()
}

View File

@ -0,0 +1,12 @@
package drills.drill06.exercise5.sensor
import drills.drill06.exercise4.sensor.TemperatureSensorNode
import drills.drill06.exercise5.SensorNet
fun main() {
TemperatureSensorNode(
0,
SensorNet.URI,
SensorNet.TOPIC_SENSOR
).start()
}

View File

@ -0,0 +1,12 @@
package drills.drill06.exercise5.sensor
import drills.drill06.exercise4.sensor.TemperatureSensorNode
import drills.drill06.exercise5.SensorNet
fun main() {
TemperatureSensorNode(
1,
SensorNet.URI,
SensorNet.TOPIC_SENSOR
).start()
}

View File

@ -0,0 +1,12 @@
package drills.drill06.exercise5.sensor
import drills.drill06.exercise4.sensor.TemperatureSensorNode
import drills.drill06.exercise5.SensorNet
fun main() {
TemperatureSensorNode(
2,
SensorNet.URI,
SensorNet.TOPIC_SENSOR
).start()
}

View File

@ -0,0 +1,31 @@
package drills.drill07.exercise1.point1
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
connection.start()
val session = connection.createQueueSession(
false,
Session.CLIENT_ACKNOWLEDGE
)
val queue = session.createQueue("ack")
val receiver = session.createReceiver(queue)
repeat(100) {
val msg = receiver.receive()
val value = msg.getIntProperty("value")
println(value)
msg.acknowledge()
}
session.close()
connection.close()
}

View File

@ -0,0 +1,29 @@
package drills.drill07.exercise1.point1
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("ack")
val sender = session.createSender(queue)
val msg = session.createMessage()
repeat(100) {
msg.setIntProperty("value", it)
sender.send(msg)
}
session.close()
connection.close()
}

View File

@ -0,0 +1,41 @@
package drills.drill07.exercise1.point2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
connection.start()
val session = connection.createQueueSession(
true,
Session.SESSION_TRANSACTED
)
val queue = session.createQueue("transacted")
val receiver = session.createReceiver(queue)
repeat(10) {
val msg = receiver.receive()
val value = msg.getIntProperty("value")
println(value)
}
println("retry")
session.rollback()
repeat(10) {
val msg = receiver.receive()
val value = msg.getIntProperty("value")
println(value)
}
session.commit()
session.close()
connection.close()
}

View File

@ -0,0 +1,31 @@
package drills.drill07.exercise1.point2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
val session = connection.createQueueSession(
true,
Session.SESSION_TRANSACTED
)
val queue = session.createQueue("transacted")
val sender = session.createSender(queue)
val msg = session.createMessage()
repeat(10) {
msg.setIntProperty("value", it)
sender.send(msg)
}
session.commit()
session.close()
connection.close()
}

View File

@ -0,0 +1,35 @@
package drills.drill07.exercise2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Queue
import javax.jms.QueueConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
connection.start()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("requestor")
val receiver = session.createReceiver(queue)
val msg = receiver.receive() as TextMessage
println(msg.text)
val replyQueue = msg.jmsReplyTo as Queue
val sender = session.createSender(replyQueue)
val replyMsg = session.createTextMessage("Echo ${msg.text}")
sender.send(replyMsg)
session.close()
connection.close()
}

View File

@ -0,0 +1,35 @@
package drills.drill07.exercise2
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
import javax.jms.Session
import javax.jms.TextMessage
fun main() {
val url = "tcp://localhost:61616"
val factory: QueueConnectionFactory = ActiveMQConnectionFactory(url)
val connection = factory.createQueueConnection()
connection.start()
val session = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val queue = session.createQueue("requestor")
val sender = session.createSender(queue)
val tmpQueue = session.createTemporaryQueue()
val receiver = session.createReceiver(tmpQueue)
val msg = session.createTextMessage("Hello Home!")
msg.jmsReplyTo = tmpQueue
sender.send(msg)
val replyMsg = receiver.receive() as TextMessage
println(replyMsg.text)
session.close()
connection.close()
}

View File

@ -0,0 +1,17 @@
package drills.drill07.exercise3.hello
import util.jms.replier.ActiveMQReplier
import util.jms.replier.Replier
import javax.jms.TextMessage
fun main() {
val replier: Replier = ActiveMQReplier("hello", shared = true)
replier.start()
replier.onRequest {
val msg = it as TextMessage
println(msg.text)
replier.createTextMessage("Echo ${msg.text}")
}
}

View File

@ -0,0 +1,16 @@
package drills.drill07.exercise3.hello
import util.jms.requestor.ActiveMQRequestor
import util.jms.requestor.Requestor
import javax.jms.TextMessage
fun main() {
val requestor: Requestor =
ActiveMQRequestor("hello", shared = true)
val msg = requestor.request(requestor.createTextMessage("Hello Home!")) as TextMessage
println(msg.text)
requestor.close()
}

View File

@ -0,0 +1,21 @@
package drills.drill07.exercise4
import util.jms.replier.ActiveMQReplier
import util.jms.replier.Replier
fun main() {
val replier: Replier = ActiveMQReplier("factorial", shared = true)
replier.start()
replier.onRequest {
val n = it.getIntProperty("value")
val resultMessage = replier.createMessage()
resultMessage.setLongProperty("result", fact(n))
resultMessage
}
}
private fun fact(n: Int): Long {
return if (n <= 1) 1 else n * fact(n - 1)
}

View File

@ -0,0 +1,21 @@
package drills.drill07.exercise4
import util.jms.requestor.ActiveMQRequestor
import util.jms.requestor.Requestor
fun main() {
val requestor: Requestor =
ActiveMQRequestor("factorial", shared = true)
println("Enter the number of which you want to calculate the factorial:")
val n = readLine()?.toInt()
val sendMessage = requestor.createMessage()
.also { it.setIntProperty("value", n!!) }
val replyMessage = requestor.request(sendMessage)
println("The factorial of $n is ${replyMessage.getLongProperty("result")}")
requestor.close()
}

View File

@ -0,0 +1,11 @@
package drills.drill08.exercise1
import util.jms.replicatedobject.ReplicatedObjectFactory
fun main() {
val factory = ReplicatedObjectFactory(Finder::class.java)
val finder = factory.create("finder")
println("ciao: ${finder.find("ciao")}")
println("ciao!: ${finder.find("ciao!")}")
}

View File

@ -0,0 +1,11 @@
package drills.drill08.exercise1
interface Finder {
fun find(str: String): Boolean
}
class FakeFinder : Finder {
override fun find(str: String): Boolean {
return str.length % 2 == 0
}
}

View File

@ -0,0 +1,10 @@
package drills.drill08.exercise1
import util.jms.replicatedobject.ReplicatedObject
fun main() {
val finder: Finder = FakeFinder()
ReplicatedObject(finder, "finder")
.start()
}

View File

@ -0,0 +1,27 @@
package drills.drill08.exercise2
import util.rmi.Client
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@ExperimentalTime
fun main() {
Client(clientHandler).start()
}
@ExperimentalTime
val clientHandler = {
val finder = Client.lookup("finder") as Finder
val workerNumber = 4
val workers = Array(workerNumber) { Worker(finder) }
val elapsed = measureTime {
workers.forEach { it.start() }
workers.forEach { it.join() }
}
println("Total elapsed time: $elapsed")
println("Throughput: ${(workerNumber * Worker.REQUEST_NUMBER) / elapsed.inSeconds} req/s")
}

View File

@ -0,0 +1,18 @@
package drills.drill08.exercise2
import java.rmi.Remote
import java.rmi.RemoteException
import java.rmi.server.UnicastRemoteObject
interface Finder : Remote {
@Throws(RemoteException::class)
fun find(str: String): Boolean
}
class FakeFinder : UnicastRemoteObject(), Finder {
override fun find(str: String): Boolean {
println(str)
for (i in 0 until 100000000L);
return str.length % 2 == 0
}
}

View File

@ -0,0 +1,10 @@
package drills.drill08.exercise2
import util.rmi.Server
fun main() {
Server {
val finder: Finder = FakeFinder()
Server.bind(finder, "finder")
}.start()
}

View File

@ -0,0 +1,22 @@
package drills.drill08.exercise2
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
class Worker(private val finder: Finder) : Thread() {
@ExperimentalTime
override fun run() {
val elapsed = measureTime {
repeat(REQUEST_NUMBER) {
finder.find("test-$it")
}
}
println("Thread $this elapsed: $elapsed")
println("Thread $this mean time for request: ${elapsed / REQUEST_NUMBER}")
}
companion object {
const val REQUEST_NUMBER = 60
}
}

View File

@ -0,0 +1,9 @@
package drills.drill08.exercise3
import drills.drill08.exercise2.FakeFinder
import util.jms.replicatedobject.ReplicatedObject
fun main() {
ReplicatedObject(FakeFinder(), "finder")
.start()
}

View File

@ -0,0 +1,21 @@
package drills.drill08.exercise3
import drills.drill08.exercise2.Finder
import util.jms.replicatedobject.ReplicatedObjectFactory
import util.rmi.Server
import java.rmi.server.UnicastRemoteObject
fun main() {
Server {
val finder = FinderProxy()
Server.bind(finder, "finder")
}.start()
}
class FinderProxy : Finder, UnicastRemoteObject() {
private val finder = ReplicatedObjectFactory(Finder::class.java)
.create("finder")
override fun find(str: String): Boolean {
return finder.find(str)
}
}

View File

@ -0,0 +1,23 @@
package drills.drill08.exercise4
import drills.drill08.exercise2.Finder
import util.jms.replicatedobject.ReplicatedObjectFactory
import javax.ws.rs.Consumes
import javax.ws.rs.GET
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.QueryParam
import javax.ws.rs.core.MediaType
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@Path("/search")
class SearchService {
private val finder = ReplicatedObjectFactory(Finder::class.java)
.create("finder")
@GET
fun search(@QueryParam("str") str: String): Boolean {
return finder.find(str)
}
}

View File

@ -0,0 +1,18 @@
package drills.drill08.exercise4
import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory
import org.glassfish.jersey.server.ResourceConfig
import javax.ws.rs.core.UriBuilder
fun main() {
val uri = UriBuilder
.fromUri("http://localhost/rest")
.port(8484)
.build()
val resConfig = ResourceConfig().register(SearchService())
val server = GrizzlyHttpServerFactory.createHttpServer(uri, resConfig)
server.start()
while (server.isStarted);
}

View File

@ -0,0 +1,107 @@
package drills.drill09.average
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
class AverageNode(
address: Address,
network: Network,
initialValue: Double,
val infectStrategy: InfectStrategy = InfectStrategy.PUSH,
bufferSize: Int = 1
) : EpidemicNode<Double, FeedbackEpidemicPacket<Double>>(
address, network, initialValue, bufferSize
) {
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<Double>) {
val payload = packet.payload
if (packet.isFeedback)
changeValue(payload)
else {
computeAverage(payload)
sendFeedback(packet.senderAddress)
spreadNewValue()
}
}
private fun computeAverage(payload: Double) {
val newValue = (nodeValue + payload) / 2
changeValue(newValue)
}
private fun sendFeedback(receiverAddress: Address) {
send(
FeedbackEpidemicPacket(
address,
receiverAddress,
nodeValue,
if (infectStrategy == InfectStrategy.PUSH)
EpidemicPacket.Type.PUSH else EpidemicPacket.Type.REPLY,
true
)
)
}
private fun spreadNewValue() {
when (infectStrategy) {
InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
}
}
override fun receivePull(packet: FeedbackEpidemicPacket<Double>) {
if (packet.type == EpidemicPacket.Type.PUSHPULL)
return
send(
makeInfectionPacket(
packet.senderAddress,
EpidemicPacket.Type.REPLY
)
)
}
override fun infect() {
Thread.interrupted()
}
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<Double> {
return FeedbackEpidemicPacket(
address,
receiverAddress,
nodeValue,
type,
false
)
}
companion object {
fun createPushNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
fun createPullNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
fun createPushPullNode(
address: Address,
network: Network,
initialValue: Double,
bufferSize: Int = 1
) = AverageNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
}
}

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPullNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.REPLY,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPushNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.PUSH,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,35 @@
package drills.drill09.average
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(17) {
AverageNode.createPushPullNode(Address(it), network, 0.0)
}
val alpha = network.getRandomNode() as AverageNode
val beta = network.getRandomNode() as AverageNode
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
1,
EpidemicPacket.Type.REPLY,
false
)
alpha.send(packet)
network.start()
Thread.sleep(2000)
network.forEach {
println(1.0 / (it as AverageNode).nodeValue)
}
}

View File

@ -0,0 +1,84 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.TimedEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
class DbNode<T>(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
val infectStrategy: InfectStrategy = InfectStrategy.PUSHPULL,
bufferSize: Int = 1
) : EpidemicNode<TimedPayload<T>, TimedEpidemicPacket<T>>(
address, network, initialValue, bufferSize
) {
override fun receivePushOrReply(packet: TimedEpidemicPacket<T>) {
val receivedValue = packet.payload
if (nodeValue.timestamp < receivedValue.timestamp)
changeValue(receivedValue)
}
override fun receivePull(packet: TimedEpidemicPacket<T>) {
val receivedValue = packet.payload
if (nodeValue.timestamp > receivedValue.timestamp)
send(
makeInfectionPacket(
packet.senderAddress,
EpidemicPacket.Type.REPLY
)
)
}
override fun infect() {
when (infectStrategy) {
InfectStrategy.PUSH -> sendPushToRandom()
InfectStrategy.PULL -> sendPullToRandom()
InfectStrategy.PUSHPULL -> sendPushPullToRandom()
}
Thread.sleep(DELTA)
}
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<TimedPayload<T>> {
return TimedEpidemicPacket(
address,
receiverAddress,
nodeValue,
type
)
}
companion object {
const val DELTA = 100L
fun <T> createPushNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSH, bufferSize)
fun <T> createPullNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PULL, bufferSize)
fun <T> createPushPullNode(
address: Address,
network: Network,
initialValue: TimedPayload<T>,
bufferSize: Int = 1
) = DbNode(address, network, initialValue, InfectStrategy.PUSHPULL, bufferSize)
}
}

View File

@ -0,0 +1,30 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPullNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,30 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPushNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,30 @@
package drills.drill09.dbreplica
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(10) {
DbNode.createPushPullNode(Address(it), network, TimedPayload(0, 0))
}
val alpha = network.getRandomNode() as DbNode<Int>
network.start()
Thread.sleep(1000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
alpha.changeValue(TimedPayload.createContent(42))
Thread.sleep(2000)
network.forEach {
println((it as DbNode<Int>).nodeValue)
}
}

View File

@ -0,0 +1,76 @@
package drills.drill09.gossip
import util.network.epidemic.EpidemicNode
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
import kotlin.random.Random
class QueryNode<T>(
address: Address,
network: Network,
initialValue: T,
private val demotivatingFactor: Int = 10,
bufferSize: Int = 1
) : EpidemicNode<T, FeedbackEpidemicPacket<T>>(
address, network, initialValue, bufferSize
) {
private var queryValue: T = nodeValue
private var random = Random(System.currentTimeMillis())
override fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<T> {
return FeedbackEpidemicPacket(
address,
receiverAddress,
queryValue,
type,
false
)
}
override fun receivePushOrReply(packet: FeedbackEpidemicPacket<T>) {
when (nodeState) {
State.REMOVED -> return
State.INFECTED -> whenInfected(packet)
State.SUSCEPTIBLE -> whenSusceptible(packet)
}
}
private fun whenSusceptible(packet: FeedbackEpidemicPacket<T>) {
nodeState = State.INFECTED
queryValue = packet.payload
sendPushToRandom(2)
if (packet.payload == nodeValue)
println("find ${packet.payload} on node $address")
}
private fun whenInfected(packet: FeedbackEpidemicPacket<T>) {
if (!packet.isFeedback)
send(
FeedbackEpidemicPacket(
address,
packet.senderAddress,
queryValue,
EpidemicPacket.Type.PUSH,
true
)
)
if (packet.isFeedback && random.nextDouble(1.0) < 1 / demotivatingFactor)
nodeState = State.REMOVED
}
override fun receivePull(packet: FeedbackEpidemicPacket<T>) {
TODO("Not yet implemented")
}
override fun infect() {
Thread.interrupted()
}
}

View File

@ -0,0 +1,29 @@
package drills.drill09.gossip
import util.network.epidemic.packet.EpidemicPacket
import util.network.epidemic.packet.FeedbackEpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
fun main() {
val network = Network()
repeat(20) {
QueryNode(Address(it), network, it * 2)
}
val alpha = network.getRandomNode() as QueryNode<Int>
val beta = network.getRandomNode() as QueryNode<Int>
val packet = FeedbackEpidemicPacket(
alpha.address,
beta.address,
24,
EpidemicPacket.Type.PUSH,
false
)
alpha.send(packet)
network.start()
}

View File

@ -0,0 +1,66 @@
package util.future
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
interface Future<T> {
suspend fun get(): T
suspend fun get(timeout: Long): T?
fun isDone(): Boolean
fun cancel()
fun isCancelled(): Boolean
}
class CoFuture<T>(private val func: suspend () -> T) : Future<T> {
private var value: T? = null
private var job = GlobalScope.launch {
value = func()
}
override suspend fun get(): T {
job.join()
return value!!
}
override suspend fun get(timeout: Long): T? {
delay(timeout)
return value
}
override fun isDone(): Boolean = job.isCompleted
override fun cancel() {
job.cancel()
}
override fun isCancelled(): Boolean = job.isCancelled
}
fun <T> future(func: suspend () -> T): Future<T> = CoFuture(func)
fun <T> delayedEcho(value: T, delay: Long = 1000): Future<T> = future {
delay(delay)
value
}
class MathFuture {
fun add(a: Int, b: Int): Future<Int> = future { a + b }
}
fun main() = runBlocking {
val future = delayedEcho(42, delay = 2000)
println("do other work.")
println(future.get())
val math = MathFuture()
val sums = (0 until 100)
.map { math.add(it, it * 2) }
println("End all sums.")
sums.forEach {
println(it.get())
}
}

View File

@ -0,0 +1,45 @@
package util.future
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.lang.reflect.Proxy
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
class FutureHandler(private val target: Any) : InvocationHandler {
private val queue = LinkedBlockingQueue<LatchFuture<*>>()
override fun invoke(proxy: Any?, method: Method?, args: Array<out Any>?): Any {
val future = method!!.invoke(target, *args!!) as LatchFuture<*>
queue.add(future)
return future
}
fun start(): Job {
return GlobalScope.launch {
while (isActive || queue.isNotEmpty()) {
queue.poll(100, TimeUnit.MILLISECONDS)
?.exec()
}
}
}
}
fun <T> Any.toFutureHandler(inter: Class<T>): Pair<T, Job> {
val futureHandler = FutureHandler(this)
val job = futureHandler.start()
return Pair(
Proxy.newProxyInstance(
this::class.java.classLoader,
arrayOf(inter),
futureHandler
) as T, job
)
}

View File

@ -0,0 +1,38 @@
package util.future
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
class LatchFuture<T>(private val func: () -> T) : Future<T> {
private val latch = CountDownLatch(1)
private var isCancelled = false
private var value: T? = null
override fun isDone(): Boolean = value != null
override fun get(): T {
latch.await()
return value!!
}
override fun get(p0: Long, p1: TimeUnit): T? {
p1.sleep(p0)
return value
}
override fun cancel(p0: Boolean): Boolean {
isCancelled = true
latch.countDown()
return isCancelled
}
override fun isCancelled(): Boolean = isCancelled
fun exec() {
if (!isCancelled) {
value = func()
latch.countDown()
}
}
}

View File

@ -0,0 +1,24 @@
package util.future
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Future
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@ExperimentalTime
fun main() = runBlocking {
val (math, job) = FutureMathImpl().toFutureHandler(FutureMath::class.java)
lateinit var sums: List<Future<Int>>
val elapsed = measureTime {
sums = (0 until 100).map { math.add(it, it * 2) }
}
println("Elapsed time: $elapsed")
sums.forEach { println(it.get()) }
job.cancelAndJoin()
}

View File

@ -0,0 +1,13 @@
package util.future
import java.util.concurrent.Future
interface FutureMath {
fun add(a: Int, b: Int): Future<Int>
}
class FutureMathImpl : FutureMath {
override fun add(a: Int, b: Int): Future<Int> {
return LatchFuture { a + b }
}
}

View File

@ -0,0 +1,17 @@
package util.jms.replicatedobject
import java.io.Serializable
data class Call(
val methodName: String,
val args: Array<out Any>
) : Serializable {
fun execOn(target: Any): Any {
val method = target::class.java.getMethod(
methodName,
*args.map { it::class.java }.toTypedArray()
)
return method.invoke(target, *args)
}
}

View File

@ -0,0 +1,24 @@
package util.jms.replicatedobject
import util.jms.replier.ActiveMQReplier
import java.io.Serializable
import javax.jms.ObjectMessage
class ReplicatedObject<T : Any>(
private val target: T,
queueName: String,
shared: Boolean = false
) {
private val replier = ActiveMQReplier(queueName, shared)
init {
replier.onRequest {
val call = (it as ObjectMessage).`object` as Call
replier.createObjectMessage(call.execOn(target) as Serializable)
}
}
fun start() {
replier.start()
}
}

View File

@ -0,0 +1,16 @@
package util.jms.replicatedobject
import util.jms.requestor.ActiveMQRequestor
import java.lang.reflect.Proxy
class ReplicatedObjectFactory<T>(private val classType: Class<T>) {
fun create(queueName: String, shared: Boolean = false): T {
val requestor = ActiveMQRequestor(queueName, shared)
return Proxy.newProxyInstance(
this::class.java.classLoader,
arrayOf(classType),
ReplicatedObjectHandler(requestor)
) as T
}
}

View File

@ -0,0 +1,18 @@
package util.jms.replicatedobject
import util.jms.requestor.Requestor
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import javax.jms.ObjectMessage
class ReplicatedObjectHandler(
private val requestor: Requestor
) : InvocationHandler {
override fun invoke(p0: Any, p1: Method, p2: Array<out Any>): Any {
val call = Call(p1.name, p2)
val msg = requestor.createObjectMessage(call)
return (requestor.request(msg) as ObjectMessage).`object`
}
}

View File

@ -0,0 +1,58 @@
package util.jms.replier
import java.io.Serializable
import javax.jms.Message
import javax.jms.Queue
import javax.jms.QueueConnectionFactory
import javax.jms.Session
import javax.jms.ObjectMessage
import javax.jms.TextMessage
abstract class AbstarctReplier(
queueName: String,
private val shared: Boolean = false
) : Replier {
private val factory: QueueConnectionFactory = this.getFactory()
private val connection = factory.createQueueConnection()
private val mainSession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
private val mainQueue = mainSession.createQueue(queueName)
private val receiver = mainSession.createReceiver(mainQueue)
private val secondarySession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
override fun onRequest(buildResponse: (Message) -> Message) {
receiver.setMessageListener {
val replyQueue = it.jmsReplyTo as Queue
val sender = secondarySession.createSender(replyQueue)
val replyMessage = buildResponse(it)
if (shared)
replyMessage.jmsCorrelationID = it.jmsMessageID
sender.send(replyMessage)
}
}
override fun createTextMessage(text: String): TextMessage = mainSession.createTextMessage(text)
override fun createObjectMessage(obj: Serializable?): ObjectMessage =
mainSession.createObjectMessage(obj)
override fun createMessage(): Message = mainSession.createMessage()
override fun start() {
connection.start()
}
override fun close() {
mainSession.close()
connection.close()
}
abstract fun getFactory(uri: String = "tcp://localhost:61616"): QueueConnectionFactory
}

View File

@ -0,0 +1,12 @@
package util.jms.replier
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
class ActiveMQReplier(
queueName: String,
shared: Boolean = false
) : AbstarctReplier(queueName, shared) {
override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri)
.also { it.isTrustAllPackages = true }
}

View File

@ -0,0 +1,15 @@
package util.jms.replier
import java.io.Serializable
import javax.jms.Message
import javax.jms.ObjectMessage
import javax.jms.TextMessage
interface Replier {
fun onRequest(buildResponse: (Message) -> Message)
fun createTextMessage(text: String = ""): TextMessage
fun createObjectMessage(obj: Serializable? = null): ObjectMessage
fun createMessage(): Message
fun start()
fun close()
}

View File

@ -0,0 +1,58 @@
package util.jms.requestor
import java.io.Serializable
import javax.jms.Message
import javax.jms.QueueConnectionFactory
import javax.jms.Session
import javax.jms.ObjectMessage
import javax.jms.TextMessage
abstract class AbstractRequestor(
queueName: String,
private val shared: Boolean = false
) : Requestor {
private val connectionFactory: QueueConnectionFactory = this.getFactory()
private val connection = connectionFactory.createQueueConnection()
.also { it.start() }
private val mainSession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
private val queue = mainSession.createQueue(queueName)
private val sender = mainSession.createSender(queue)
private val secondarySession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
private val replyQueue = secondarySession.createTemporaryQueue()
private var receiver = if (!shared) secondarySession.createReceiver(replyQueue) else null
override fun request(message: Message): Message {
message.jmsReplyTo = replyQueue
sender.send(message)
if (shared)
receiver = secondarySession.createReceiver(
replyQueue,
"JMSCorrelationID = '${message.jmsMessageID}'"
)
return receiver!!.receive()
}
override fun createTextMessage(text: String): TextMessage = mainSession.createTextMessage(text)
override fun createObjectMessage(obj: Serializable?): ObjectMessage =
mainSession.createObjectMessage(obj)
override fun createMessage(): Message = mainSession.createMessage()
override fun close() {
mainSession.close()
secondarySession.close()
connection.close()
}
abstract fun getFactory(uri: String = "tcp://localhost:61616"): QueueConnectionFactory
}

View File

@ -0,0 +1,12 @@
package util.jms.requestor
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory
class ActiveMQRequestor(
queueName: String,
shared: Boolean = false
) : AbstractRequestor(queueName, shared) {
override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri)
.also { it.isTrustAllPackages = true }
}

View File

@ -0,0 +1,14 @@
package util.jms.requestor
import java.io.Serializable
import javax.jms.Message
import javax.jms.ObjectMessage
import javax.jms.TextMessage
interface Requestor {
fun request(message: Message): Message
fun createTextMessage(text: String = ""): TextMessage
fun createObjectMessage(obj: Serializable? = null): ObjectMessage
fun createMessage(): Message
fun close()
}

View File

@ -0,0 +1,126 @@
package util.network.epidemic
import util.network.epidemic.packet.EpidemicPacket
import util.network.simulator.Address
import util.network.simulator.Network
import util.network.simulator.Node
abstract class EpidemicNode<T, P : EpidemicPacket<T>>(
address: Address,
network: Network,
initialValue: T,
bufferSize: Int = 1
) : Node(address, network, bufferSize) {
@Volatile
var nodeValue = initialValue
private set
protected var nodeState = State.SUSCEPTIBLE
@Synchronized
fun changeValue(value: T) {
nodeValue = value
}
private fun sendToRandom(
type: EpidemicPacket.Type,
numberOfReceiver: Int = 1
) {
val toExclude = mutableListOf(address)
repeat(numberOfReceiver) {
var receiverAddress = network.getRandomAddress()
while (toExclude.contains(receiverAddress))
receiverAddress = network.getRandomAddress()
toExclude.add(receiverAddress)
send(makeInfectionPacket(receiverAddress, type))
}
}
abstract fun makeInfectionPacket(
receiverAddress: Address,
type: EpidemicPacket.Type
): EpidemicPacket<T>
protected fun sendPushToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSH,
numberOfReceiver
)
}
protected fun sendPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PULL,
numberOfReceiver
)
}
protected fun sendPushPullToRandom(numberOfReceiver: Int = 1) {
sendToRandom(
EpidemicPacket.Type.PUSHPULL,
numberOfReceiver
)
}
protected abstract fun receivePushOrReply(packet: P)
protected abstract fun receivePull(packet: P)
protected abstract fun infect()
protected fun onReceive(packet: P) {
when (packet.type) {
EpidemicPacket.Type.PUSH -> receivePushOrReply(packet)
EpidemicPacket.Type.PULL -> receivePull(packet)
EpidemicPacket.Type.REPLY -> receivePushOrReply(packet)
EpidemicPacket.Type.PUSHPULL -> {
receivePull(packet)
receivePushOrReply(packet)
}
}
}
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return EpidemicCommunicatorFactory(threadGroup)
}
protected enum class State {
SUSCEPTIBLE, INFECTED, REMOVED
}
enum class InfectStrategy {
PUSH, PULL, PUSHPULL
}
protected inner class EpidemicReceiver(
threadGroup: ThreadGroup
) : Node.Receiver(threadGroup) {
override fun run() {
while (!isInterrupted) {
val packet = receive() as P
onReceive(packet)
}
}
}
protected inner class EpidemicSender(
threadGroup: ThreadGroup
) : Node.Sender(threadGroup) {
override fun run() {
while (!isInterrupted)
infect()
}
}
protected inner class EpidemicCommunicatorFactory(
threadGroup: ThreadGroup
) : Node.CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return EpidemicReceiver(threadGroup)
}
override fun createSender(): Sender {
return EpidemicSender(threadGroup)
}
}
}

View File

@ -0,0 +1,9 @@
package util.network.epidemic
data class TimedPayload<T>(val value: T, val timestamp: Long) {
companion object {
fun <T> createContent(value: T): TimedPayload<T> {
return TimedPayload(value, System.nanoTime())
}
}
}

View File

@ -0,0 +1,11 @@
package util.network.epidemic.packet
import util.network.simulator.packet.PayloadPacket
interface EpidemicPacket<T> : PayloadPacket<T> {
val type: Type
enum class Type {
PUSH, PULL, PUSHPULL, REPLY
}
}

View File

@ -0,0 +1,12 @@
package util.network.epidemic.packet
import util.network.simulator.Address
import util.network.simulator.packet.FeedbackPacket
data class FeedbackEpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: T,
override val type: EpidemicPacket.Type,
override val isFeedback: Boolean
) : EpidemicPacket<T>, FeedbackPacket

View File

@ -0,0 +1,11 @@
package util.network.epidemic.packet
import util.network.epidemic.TimedPayload
import util.network.simulator.Address
data class TimedEpidemicPacket<T>(
override val senderAddress: Address,
override val receiverAddress: Address,
override val payload: TimedPayload<T>,
override val type: EpidemicPacket.Type
) : EpidemicPacket<TimedPayload<T>>

View File

@ -0,0 +1,3 @@
package util.network.simulator
inline class Address(val value: Int)

View File

@ -0,0 +1,41 @@
package util.network.simulator
import kotlin.random.Random
class Network(threadGrupName: String = "network") {
private val nodes = mutableMapOf<Address, Node>()
private val addresses = mutableListOf<Address>()
val threadGroup = ThreadGroup(threadGrupName)
private val random by lazy { Random(System.currentTimeMillis()) }
operator fun get(address: Address): Node? {
return nodes[address]
}
fun addNode(node: Node) {
nodes[node.address] = node
addresses.add(node.address)
}
fun getRandomAddress(): Address {
val index = random.nextInt(addresses.size)
return addresses[index]
}
fun getRandomNode(): Node {
return nodes[getRandomAddress()]!!
}
fun start() {
nodes.forEach { (_, node) ->
node.start()
}
}
fun forEach(action: (Node) -> Unit) {
nodes.forEach { (_, node) ->
action(node)
}
}
}

View File

@ -0,0 +1,61 @@
package util.network.simulator
import util.network.simulator.packet.Packet
import java.util.concurrent.LinkedBlockingQueue
abstract class Node(
val address: Address,
protected val network: Network,
bufferSize: Int = 1
) {
private val buffer = LinkedBlockingQueue<Packet>(bufferSize)
private val receiver: Receiver
private val sender: Sender
init {
network.addNode(this)
val factory = communicatorFactory(network.threadGroup)
receiver = factory.createReceiver()
sender = factory.createSender()
}
fun start() {
receiver.start()
sender.start()
}
private fun setPacket(packet: Packet) {
buffer.put(packet)
}
private fun getPacket(): Packet {
return buffer.take()
}
fun send(packet: Packet) {
network[packet.receiverAddress]?.setPacket(packet)
}
protected fun receive(): Packet = getPacket()
protected abstract fun communicatorFactory(
threadGroup: ThreadGroup
): CommunicatorFactory
protected abstract inner class CommunicatorFactory(
protected val threadGroup: ThreadGroup
) {
abstract fun createReceiver(): Receiver
abstract fun createSender(): Sender
}
protected abstract inner class Receiver(threadGroup: ThreadGroup) : Thread(
threadGroup,
"${threadGroup.name}-receiver"
)
protected abstract inner class Sender(threadGroup: ThreadGroup) : Thread(
threadGroup,
"${threadGroup.name}-sender"
)
}

View File

@ -0,0 +1,52 @@
package util.network.simulator
import util.network.simulator.packet.NetworkPacket
fun main() {
val network = Network()
repeat(2) {
TestNode(Address(it), network)
}
network.start()
}
class TestNode(address: Address, network: Network) : Node(address, network) {
override fun communicatorFactory(threadGroup: ThreadGroup): CommunicatorFactory {
return TestCommunicatorFactory(threadGroup)
}
private inner class TestReceiver(threadGroup: ThreadGroup) : Receiver(threadGroup) {
override fun run() {
if (address != Address(0))
return
val packet = receive()
println(packet)
}
}
private inner class TestSender(threadGroup: ThreadGroup) : Sender(threadGroup) {
override fun run() {
if (address != Address(1))
return
val packet = NetworkPacket(address, Address(0))
send(packet)
}
}
private inner class TestCommunicatorFactory(threadGroup: ThreadGroup) :
CommunicatorFactory(threadGroup) {
override fun createReceiver(): Receiver {
return TestReceiver(threadGroup)
}
override fun createSender(): Sender {
return TestSender(threadGroup)
}
}
}

View File

@ -0,0 +1,5 @@
package util.network.simulator.packet
interface FeedbackPacket : Packet {
val isFeedback: Boolean
}

View File

@ -0,0 +1,8 @@
package util.network.simulator.packet
import util.network.simulator.Address
data class NetworkPacket(
override val senderAddress: Address,
override val receiverAddress: Address
) : Packet

View File

@ -0,0 +1,8 @@
package util.network.simulator.packet
import util.network.simulator.Address
interface Packet {
val senderAddress: Address
val receiverAddress: Address
}

View File

@ -0,0 +1,5 @@
package util.network.simulator.packet
interface PayloadPacket<T> : Packet {
val payload: T
}