add share tmpQueue
continuous-integration/drone/tag Build is passing Details

This commit is contained in:
Raffaele Mignone 2020-04-22 18:01:59 +02:00
parent 10bf3e3588
commit 1381d706ed
Signed by: norangebit
GPG Key ID: F5255658CB220573
8 changed files with 57 additions and 25 deletions

View File

@ -5,7 +5,7 @@ import drills.drill07.exercise3.replier.Replier
import javax.jms.TextMessage import javax.jms.TextMessage
fun main() { fun main() {
val replier: Replier = ActiveMQReplier("hello") val replier: Replier = ActiveMQReplier("hello", shared = true)
replier.start() replier.start()

View File

@ -5,7 +5,7 @@ import drills.drill07.exercise3.requestor.Requestor
import javax.jms.TextMessage import javax.jms.TextMessage
fun main() { fun main() {
val requestor: Requestor = ActiveMQRequestor("hello") val requestor: Requestor = ActiveMQRequestor("hello", shared = true)
val msg = requestor.request(requestor.createTextMessage("Hello Home!")) as TextMessage val msg = requestor.request(requestor.createTextMessage("Hello Home!")) as TextMessage

View File

@ -7,21 +7,33 @@ import javax.jms.Session
import javax.jms.ObjectMessage import javax.jms.ObjectMessage
import javax.jms.TextMessage import javax.jms.TextMessage
abstract class AbstarctReplier(val queueName: String) : Replier { abstract class AbstarctReplier(
queueName: String,
private val shared: Boolean = false
) : Replier {
private val factory: QueueConnectionFactory = this.getFactory() private val factory: QueueConnectionFactory = this.getFactory()
private val connection = factory.createQueueConnection() private val connection = factory.createQueueConnection()
private val session = connection.createQueueSession( private val session = connection.createQueueSession(
false, false,
Session.AUTO_ACKNOWLEDGE Session.AUTO_ACKNOWLEDGE
) )
private val queue = session.createQueue(queueName) private val mainQueue = session.createQueue(queueName)
private val receiver = session.createReceiver(queue) private val receiver = session.createReceiver(mainQueue)
private val secondarySession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
override fun onRequest(buildResponse: (Message) -> Message) { override fun onRequest(buildResponse: (Message) -> Message) {
receiver.setMessageListener { receiver.setMessageListener {
val replyQueue = it.jmsReplyTo as Queue val replyQueue = it.jmsReplyTo as Queue
val sender = session.createSender(replyQueue) val sender = secondarySession.createSender(replyQueue)
sender.send(buildResponse(it)) val replyMessage = buildResponse(it)
if (shared)
replyMessage.jmsCorrelationID = it.jmsMessageID
sender.send(replyMessage)
} }
} }

View File

@ -3,6 +3,9 @@ package drills.drill07.exercise3.replier
import org.apache.activemq.ActiveMQConnectionFactory import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory import javax.jms.QueueConnectionFactory
class ActiveMQReplier(queueName: String) : AbstarctReplier(queueName) { class ActiveMQReplier(
queueName: String,
shared: Boolean = false
) : AbstarctReplier(queueName, shared) {
override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri) override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri)
} }

View File

@ -6,35 +6,49 @@ import javax.jms.Session
import javax.jms.ObjectMessage import javax.jms.ObjectMessage
import javax.jms.TextMessage import javax.jms.TextMessage
abstract class AbstractRequestor(val queueName: String) : Requestor { abstract class AbstractRequestor(
queueName: String,
private val shared: Boolean = false
) : Requestor {
private val connectionFactory: QueueConnectionFactory = this.getFactory() private val connectionFactory: QueueConnectionFactory = this.getFactory()
private val connection = connectionFactory.createQueueConnection() private val connection = connectionFactory.createQueueConnection()
.also { it.start() } .also { it.start() }
private val session = connection.createQueueSession( private val mainSession = connection.createQueueSession(
false, false,
Session.AUTO_ACKNOWLEDGE Session.AUTO_ACKNOWLEDGE
) )
private val queue = session.createQueue(queueName) private val queue = mainSession.createQueue(queueName)
private val sender = session.createSender(queue) private val sender = mainSession.createSender(queue)
private val secondarySession = connection.createQueueSession(
false,
Session.AUTO_ACKNOWLEDGE
)
private val replyQueue = secondarySession.createTemporaryQueue()
private var receiver = if (!shared) secondarySession.createReceiver(replyQueue) else null
override fun request(message: Message): Message { override fun request(message: Message): Message {
val tmpQueue = session.createTemporaryQueue() message.jmsReplyTo = replyQueue
message.jmsReplyTo = tmpQueue
sender.send(message) sender.send(message)
val tmpReceiver = session.createReceiver(tmpQueue) if (shared)
return tmpReceiver.receive() receiver = secondarySession.createReceiver(
replyQueue,
"JMSCorrelationID = '${message.jmsMessageID}'"
)
return receiver!!.receive()
} }
override fun createTextMessage(text: String): TextMessage = session.createTextMessage(text) override fun createTextMessage(text: String): TextMessage = mainSession.createTextMessage(text)
override fun createObjectMessage(): ObjectMessage = session.createObjectMessage() override fun createObjectMessage(): ObjectMessage = mainSession.createObjectMessage()
override fun createMessage(): Message = session.createMessage() override fun createMessage(): Message = mainSession.createMessage()
override fun close() { override fun close() {
session.close() mainSession.close()
secondarySession.close()
connection.close() connection.close()
} }

View File

@ -3,6 +3,9 @@ package drills.drill07.exercise3.requestor
import org.apache.activemq.ActiveMQConnectionFactory import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.QueueConnectionFactory import javax.jms.QueueConnectionFactory
class ActiveMQRequestor(queueName: String) : AbstractRequestor(queueName) { class ActiveMQRequestor(
queueName: String,
shared: Boolean = false
) : AbstractRequestor(queueName, shared) {
override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri) override fun getFactory(uri: String): QueueConnectionFactory = ActiveMQConnectionFactory(uri)
} }

View File

@ -4,12 +4,12 @@ import drills.drill07.exercise3.replier.ActiveMQReplier
import drills.drill07.exercise3.replier.Replier import drills.drill07.exercise3.replier.Replier
fun main() { fun main() {
val replier: Replier = ActiveMQReplier("hello") val replier: Replier = ActiveMQReplier("factorial", shared = true)
replier.start() replier.start()
replier.onRequest { replier.onRequest {
val n = it.getIntProperty("fact") val n = it.getIntProperty("value")
val resultMessage = replier.createMessage() val resultMessage = replier.createMessage()
resultMessage.setLongProperty("result", fact(n)) resultMessage.setLongProperty("result", fact(n))
resultMessage resultMessage

View File

@ -4,13 +4,13 @@ import drills.drill07.exercise3.requestor.ActiveMQRequestor
import drills.drill07.exercise3.requestor.Requestor import drills.drill07.exercise3.requestor.Requestor
fun main() { fun main() {
val requestor: Requestor = ActiveMQRequestor("hello") val requestor: Requestor = ActiveMQRequestor("factorial", shared = true)
println("Enter the number of which you want to calculate the factorial:") println("Enter the number of which you want to calculate the factorial:")
val n = readLine()?.toInt() val n = readLine()?.toInt()
val sendMessage = requestor.createMessage() val sendMessage = requestor.createMessage()
.also { it.setIntProperty("fact", n!!) } .also { it.setIntProperty("value", n!!) }
val replyMessage = requestor.request(sendMessage) val replyMessage = requestor.request(sendMessage)