lm-sistemi-software-distrib.../src/main/kotlin/drills/drill06/exercise5/middleware/MeanNode.kt

63 lines
1.8 KiB
Kotlin

package drills.drill06.exercise5.middleware
import drills.drill06.exercise5.SensorNet
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Session
import javax.jms.TopicConnectionFactory
class MeanNode(
private val uri: String,
private val topicNameSensor: String,
private val topicNameActuator: String
) {
fun start() {
val factory: TopicConnectionFactory = ActiveMQConnectionFactory(uri)
val connection = factory.createTopicConnection()
connection.start()
val session = connection.createTopicSession(
false,
Session.AUTO_ACKNOWLEDGE
)
val topicSensor = session.createTopic(topicNameSensor)
val topicActuator = session.createTopic(topicNameActuator)
val subscriber = session.createSubscriber(topicSensor)
val publisher = session.createPublisher(topicActuator)
val samples = mutableListOf<Double>()
val msg = session.createMessage()
while (true) {
val receiveMsg = subscriber.receiveNoWait()
if (receiveMsg != null) {
val sample = receiveMsg.getDoubleProperty("temperature")
samples.add(sample)
println("sample: $sample")
continue
}
if (samples.isNotEmpty()) {
val mean = samples.sum() / samples.size
samples.clear()
msg.setDoubleProperty("temperature", mean)
publisher.publish(msg)
println("MEAN: $mean")
println("-------------------------")
}
Thread.sleep(2000)
}
}
}
fun main() {
MeanNode(
SensorNet.URI,
SensorNet.TOPIC_SENSOR,
SensorNet.TOPIC_ACTUATOR
).start()
}