From 1381d706ed5bf04595e85c9ea5ac40865bf971f6 Mon Sep 17 00:00:00 2001 From: norangebit Date: Wed, 22 Apr 2020 18:01:59 +0200 Subject: [PATCH] add share tmpQueue --- .../drills/drill07/exercise3/hello/Replier.kt | 2 +- .../drill07/exercise3/hello/Requestor.kt | 2 +- .../exercise3/replier/AbstarctReplier.kt | 22 ++++++++--- .../exercise3/replier/ActiveMQReplier.kt | 5 ++- .../exercise3/requestor/AbstractRequestor.kt | 38 +++++++++++++------ .../exercise3/requestor/ActiveMQRequestor.kt | 5 ++- .../drills/drill07/exercise4/Replier.kt | 4 +- .../drills/drill07/exercise4/Requestor.kt | 4 +- 8 files changed, 57 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/drills/drill07/exercise3/hello/Replier.kt b/src/main/kotlin/drills/drill07/exercise3/hello/Replier.kt index bbbd9c0..99a4e35 100644 --- a/src/main/kotlin/drills/drill07/exercise3/hello/Replier.kt +++ b/src/main/kotlin/drills/drill07/exercise3/hello/Replier.kt @@ -5,7 +5,7 @@ import drills.drill07.exercise3.replier.Replier import javax.jms.TextMessage fun main() { - val replier: Replier = ActiveMQReplier("hello") + val replier: Replier = ActiveMQReplier("hello", shared = true) replier.start() diff --git a/src/main/kotlin/drills/drill07/exercise3/hello/Requestor.kt b/src/main/kotlin/drills/drill07/exercise3/hello/Requestor.kt index ee5dab5..83ce803 100644 --- a/src/main/kotlin/drills/drill07/exercise3/hello/Requestor.kt +++ b/src/main/kotlin/drills/drill07/exercise3/hello/Requestor.kt @@ -5,7 +5,7 @@ import drills.drill07.exercise3.requestor.Requestor import javax.jms.TextMessage fun main() { - val requestor: Requestor = ActiveMQRequestor("hello") + val requestor: Requestor = ActiveMQRequestor("hello", shared = true) val msg = requestor.request(requestor.createTextMessage("Hello Home!")) as TextMessage diff --git a/src/main/kotlin/drills/drill07/exercise3/replier/AbstarctReplier.kt b/src/main/kotlin/drills/drill07/exercise3/replier/AbstarctReplier.kt index a2e55d9..b78fc6f 100644 --- a/src/main/kotlin/drills/drill07/exercise3/replier/AbstarctReplier.kt +++ b/src/main/kotlin/drills/drill07/exercise3/replier/AbstarctReplier.kt @@ -7,21 +7,33 @@ import javax.jms.Session import javax.jms.ObjectMessage import javax.jms.TextMessage -abstract class AbstarctReplier(val queueName: String) : Replier { +abstract class AbstarctReplier( + queueName: String, + private val shared: Boolean = false +) : Replier { private val factory: QueueConnectionFactory = this.getFactory() private val connection = factory.createQueueConnection() private val session = connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ) - private val queue = session.createQueue(queueName) - private val receiver = session.createReceiver(queue) + private val mainQueue = session.createQueue(queueName) + private val receiver = session.createReceiver(mainQueue) + private val secondarySession = connection.createQueueSession( + false, + Session.AUTO_ACKNOWLEDGE + ) override fun onRequest(buildResponse: (Message) -> Message) { receiver.setMessageListener { val replyQueue = it.jmsReplyTo as Queue - val sender = session.createSender(replyQueue) - sender.send(buildResponse(it)) + val sender = secondarySession.createSender(replyQueue) + val replyMessage = buildResponse(it) + + if (shared) + replyMessage.jmsCorrelationID = it.jmsMessageID + + sender.send(replyMessage) } } diff --git a/src/main/kotlin/drills/drill07/exercise3/replier/ActiveMQReplier.kt b/src/main/kotlin/drills/drill07/exercise3/replier/ActiveMQReplier.kt index 4658be8..06e5cec 100644 --- a/src/main/kotlin/drills/drill07/exercise3/replier/ActiveMQReplier.kt +++ b/src/main/kotlin/drills/drill07/exercise3/replier/ActiveMQReplier.kt @@ -3,6 +3,9 @@ package drills.drill07.exercise3.replier import org.apache.activemq.ActiveMQConnectionFactory import javax.jms.QueueConnectionFactory -class ActiveMQReplier(queueName: String) : AbstarctReplier(queueName) { +class ActiveMQReplier( + queueName: String, + shared: Boolean = false +) : AbstarctReplier(queueName, shared) { override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri) } diff --git a/src/main/kotlin/drills/drill07/exercise3/requestor/AbstractRequestor.kt b/src/main/kotlin/drills/drill07/exercise3/requestor/AbstractRequestor.kt index 70a3430..bb9de61 100644 --- a/src/main/kotlin/drills/drill07/exercise3/requestor/AbstractRequestor.kt +++ b/src/main/kotlin/drills/drill07/exercise3/requestor/AbstractRequestor.kt @@ -6,35 +6,49 @@ import javax.jms.Session import javax.jms.ObjectMessage import javax.jms.TextMessage -abstract class AbstractRequestor(val queueName: String) : Requestor { +abstract class AbstractRequestor( + queueName: String, + private val shared: Boolean = false +) : Requestor { private val connectionFactory: QueueConnectionFactory = this.getFactory() private val connection = connectionFactory.createQueueConnection() .also { it.start() } - private val session = connection.createQueueSession( + private val mainSession = connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE ) - private val queue = session.createQueue(queueName) - private val sender = session.createSender(queue) + private val queue = mainSession.createQueue(queueName) + private val sender = mainSession.createSender(queue) + private val secondarySession = connection.createQueueSession( + false, + Session.AUTO_ACKNOWLEDGE + ) + private val replyQueue = secondarySession.createTemporaryQueue() + private var receiver = if (!shared) secondarySession.createReceiver(replyQueue) else null override fun request(message: Message): Message { - val tmpQueue = session.createTemporaryQueue() - message.jmsReplyTo = tmpQueue + message.jmsReplyTo = replyQueue sender.send(message) - val tmpReceiver = session.createReceiver(tmpQueue) - return tmpReceiver.receive() + if (shared) + receiver = secondarySession.createReceiver( + replyQueue, + "JMSCorrelationID = '${message.jmsMessageID}'" + ) + + return receiver!!.receive() } - override fun createTextMessage(text: String): TextMessage = session.createTextMessage(text) + override fun createTextMessage(text: String): TextMessage = mainSession.createTextMessage(text) - override fun createObjectMessage(): ObjectMessage = session.createObjectMessage() + override fun createObjectMessage(): ObjectMessage = mainSession.createObjectMessage() - override fun createMessage(): Message = session.createMessage() + override fun createMessage(): Message = mainSession.createMessage() override fun close() { - session.close() + mainSession.close() + secondarySession.close() connection.close() } diff --git a/src/main/kotlin/drills/drill07/exercise3/requestor/ActiveMQRequestor.kt b/src/main/kotlin/drills/drill07/exercise3/requestor/ActiveMQRequestor.kt index a4113d3..d397f59 100644 --- a/src/main/kotlin/drills/drill07/exercise3/requestor/ActiveMQRequestor.kt +++ b/src/main/kotlin/drills/drill07/exercise3/requestor/ActiveMQRequestor.kt @@ -3,6 +3,9 @@ package drills.drill07.exercise3.requestor import org.apache.activemq.ActiveMQConnectionFactory import javax.jms.QueueConnectionFactory -class ActiveMQRequestor(queueName: String) : AbstractRequestor(queueName) { +class ActiveMQRequestor( + queueName: String, + shared: Boolean = false +) : AbstractRequestor(queueName, shared) { override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri) } diff --git a/src/main/kotlin/drills/drill07/exercise4/Replier.kt b/src/main/kotlin/drills/drill07/exercise4/Replier.kt index 767e364..80090ec 100644 --- a/src/main/kotlin/drills/drill07/exercise4/Replier.kt +++ b/src/main/kotlin/drills/drill07/exercise4/Replier.kt @@ -4,12 +4,12 @@ import drills.drill07.exercise3.replier.ActiveMQReplier import drills.drill07.exercise3.replier.Replier fun main() { - val replier: Replier = ActiveMQReplier("hello") + val replier: Replier = ActiveMQReplier("factorial", shared = true) replier.start() replier.onRequest { - val n = it.getIntProperty("fact") + val n = it.getIntProperty("value") val resultMessage = replier.createMessage() resultMessage.setLongProperty("result", fact(n)) resultMessage diff --git a/src/main/kotlin/drills/drill07/exercise4/Requestor.kt b/src/main/kotlin/drills/drill07/exercise4/Requestor.kt index d65ffb3..6003f54 100644 --- a/src/main/kotlin/drills/drill07/exercise4/Requestor.kt +++ b/src/main/kotlin/drills/drill07/exercise4/Requestor.kt @@ -4,13 +4,13 @@ import drills.drill07.exercise3.requestor.ActiveMQRequestor import drills.drill07.exercise3.requestor.Requestor fun main() { - val requestor: Requestor = ActiveMQRequestor("hello") + val requestor: Requestor = ActiveMQRequestor("factorial", shared = true) println("Enter the number of which you want to calculate the factorial:") val n = readLine()?.toInt() val sendMessage = requestor.createMessage() - .also { it.setIntProperty("fact", n!!) } + .also { it.setIntProperty("value", n!!) } val replyMessage = requestor.request(sendMessage)