JMS based request-replyTo demonstration - ActiveMQ Classic¶
This is a simple example of two applications communicating asynchronously over two queues:
This code uses client acknowledgement, and replyTo queue. It does not use CDI for bean injection but code base instantiation of the ConnectionFactory. It also supports reconnect in case of connection to broker failure.
The API used for JMS is 2.0 as ActiveMQ is the classic, version 5.13.
Requirements¶
- Expose GET, POST, PUT
/orders
api - Mockup a repository in memory
- On POST or PUT operations, order messages are sent to another service (the participant) to act on them via a
orders
queue, and get the response toorders-reply
queue. - Support once and only once semantic
- Expose a POST /orders/simulation to run n order creation with random data, to support a failover demonstration.
Running Locally¶
While in development mode, under the activeMQ/classic/request-replyto
folder:
- Start Active MQ:
docker compose up -d
-
Start each application with
quarkus dev
cd jms-orchestrator quarkus dev cd jms-participant quarkus dev
-
Orchestrator Application URL: http://localhost:8081/ and swagger-ui
- See ActiveMQ console: http://localhost:8161/, admin/adminpassw0rd
Demonstration scripts¶
-
Post new order using the exposed API by going under
request-replyto/e2e
folder:cd request-replyto/e2e ./postOrder.sh
-
In the Orchestrator trace we should see the following message, from the Resource API, then getting the response from the participant of the order process as the status for order changed from
pending
toassigned
processing new order: { "sku": P05,"price": 100.0,"quantity": 2,"status": pending } 17:34:32 INFO [or.ac.or.in.re.OrderRepositoryMem] (executor-thread-1) Save in repository 7cfef528 17:34:32 INFO [or.ac.or.in.ms.OrderMessageProcessing] (Thread-2 (ActiveMQ-client-global-threads)) Received message: b1c0191f-4533-44f9-b56d-18ec67baa1fc,7cfef528,P05,100.0,2,assigned
-
On the participant side the log looks like:
Received message: b1c0191f-4533-44f9-b56d-18ec67baa1fc,7cfef528,P05,100.0,2,pending 17:34:32 INFO [or.ac.pa.in.ms.OrderMessageConsumer] (Thread-1 (ActiveMQ-client-global-threads)) Reponse sent to replyTo queue {"messageID":"b1c0191f-4533-44f9-b56d-18ec67baa1fc","orderID":"7cfef528","sku":"P05","price":100.0,"quantity":2,"status":"assigned"}
-
We can see the state of the queues in the ActiveMQ Console
-
To demonstrate connection failure and reconnect, start a long runnning simulation, stop the broker and relaunch it. The messages should continue to flow between the stop apps. Be sure to have build the producer and consumer images. Here are the commands:
# build OCI image for participant and orchestrator app request-replyto $ ./buildAll.sh # We should have 2 new OCI images. request-replyto $ docker images request-replyto $ docker compose -f e2e-docker-compose.yml up -d request-replyto $ docker ps # 3cd51215160f apache/activemq-classic:latest 0.0.0.0:5672->5672/tcp, 8080/tcp, 0.0.0.0:8161->8161/tcp, 0.0.0.0:61616->61616/tcp, 8443/tcp active # 2ce293bc26bd apache/activemq-classic:latest 0.0.0.0:5682->5672/tcp, 0.0.0.0:8171->8161/tcp, 0.0.0.0:61626->61616/tcp standby ...
Start some messages under
classic/request-replyto/e2e
e2e $ ./startNorders.sh 50
Once some messages are exchanged stop the active broker.
Code Explanation¶
The code is under jms-orchestrator and jms-participant, to implement a request-response over queue using JMS.
The Orchestrator is a classical microservice with the order entity as resource. The interesting part is the OrderMessageProcessing
class. It is JMS implementation code, using one connection to the broker and two JMS sessions, one for the producer and one for the consumer.
The API is jms 2.0 based on javax.jms
API, and the ActiveMQ is the client app: here is the important maven declarations
<!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.18.2</version>
</dependency>
The classical JMS implementation uses the ConnectionFactory, creates a unique connection to the broker, and then one session to send message and one to receive message from the different queue:
connectionFactory = new ActiveMQConnectionFactory(connectionURLs);
connection = connectionFactory.createConnection(user, password);
connection.setClientID("p-" + System.currentTimeMillis());
initProducer();
initConsumer();
connection.start();
As the class is MessageListener, the JMSconsumer thread is associated to it, and the OrderMessageProcessor
class processes the replyTo queue messages in the onMessage method.
public void onMessage(Message msg) {
//...
OrderMessage oe = OrderMessage.fromOrder(order);
String orderJson= mapper.writeValueAsString(oe);
TextMessage msg = producerSession.createTextMessage(orderJson);
msg.setJMSCorrelationID(UUID.randomUUID().toString().substring(0,8));
producer.send( msg);
//...
We should follow some implementation best practices:
- having a different data model for the message payload than the business entity persisted (Order and OrderMessage),
- use a service class to implement the business logic to manage the business entity.
- a different Resource class to support the RESTful APIs
orders
├── domain
│ ├── Order.java
│ └── OrderService.java
└── infra
├── api
│ ├── OrderResource.java
│ └── SimulControl.java
├── msg
│ ├── OrderMessage.java
│ └── OrderMessageProcessor.java
└── repo
├── OrderRepository.java
└── OrderRepositoryMem.java
The configuration of the application is based on the quarkus way to declare config:
main.queue.name=orders
replyTo.queue.name=ordersResp
reconnect.delay.ins=5
# Configures the Artemis properties.
quarkus.artemis.url=tcp://localhost:61616
Which could be overwritten by environment variables as illustrated in the docker compose file:
environment:
QUARKUS_ARTEMIS_URL: failover:(tcp://active:61616,tcp://standby:61617)?randomize=false
On the consumer side, the class is also a MessageListener, and a producer to the replyTo queue.
Each consumer performs acknowledgement by code. Acknowledging a consumed message automatically acknowledges the receipt of all messages that have been consumed by the current session.
Some ActiveMQ configuration explanation¶
The config folder includes some activemq.xml for the active and standby broker and also the jetty.xml so the console is accessible from the host machine.
- Jetty configuration update include host to be 0.0.0.0
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
- To be able to have active and standby to communicate and use shared storage, we need to add in active broker configuration the fact to use kahadb on a mounted end point and define a network connector to the partner:
<persistenceAdapter>
<kahaDB directory="/tmp/mq/kahadb"/>
</persistenceAdapter>
<networkConnectors>
<networkConnector uri="static:(tcp://standby:61626)" />
</networkConnectors>
Deploy on AWS¶
-
First build the docker images for each service using
buildAll.sh
command: Change the name of the image to adapt to your ECR repository.cd jms-orchestrator buildAll.sh docker push toECR_repository cd jms-participant buildAll.sh docker push toECR_repository
-
If not already done, use CDK to deploy VPC, Brokers, and Cloud9.
-
Use CDK to deploy the two apps on ECS Fargate.
cd infra cdk deploy