JMS Message Consumption-Asynchronous Message Consumption in JMS

We already discussed the fundamentals of JMS API .  In the previous chapter we discussed the synchronous message consumption in JMS .We have done a simple example too. In this chapter we are looking into the asynchronous message consumption in JMS with suitable example.

Asynchronous Message Consumption in JMS

In asynchronous message consumption there is no timing dependency between the production and consumption of messages. Whereas in synchronous , there exists a timing dependency. The example we are discussing here has two clients. The second client is implementing MessageListener.The onMessage() method will be invoked when a message reaches in the subscribed destination. In the synchronous message consumption as discussed earlier , the client operation stays on the receive() method.In this case whenever a message reaches , then only the onMessage() method is invoked. So no timing dependency is there.

Asynchronous Message Consumption in JMS Example

We are using OpenJMS as service provider . Before running the sample codes we should configure the OpenJMS and also we need to configure the workspace with all the dependencies as explained earlier.We have two clients. First client is simply sending a message to destination.Second client implements the MessageListener interface. The second client is starting before the first client(publish/subscribe domain is using in this example). But the operation is not waiting at at any point , because  we are not using the receive() method.So there is a chance to exit our program before the message reaches (orin other words , before the onReceive() call).To avoid this we just using a thread to continue the working of the second client.(In normal web application this is not required . As we know ,the thread  is just to keep the second client alive). We are discussing a publish/subscribe model which uses asynchronous consumption.

FirstClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class FirstClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicPublisher publisher = null;

public FirstClient() {

}

public void sendMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
publisher = session.createPublisher(topic);
TextMessage message = session.createTextMessage();
message.setText("Hello ...This is a sample message");
connection.start();
publisher.publish(message);
System.out.println("Message has successfully sent...");

} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}

}

public void closeContext() {

if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}
}

public static void main(String[] args) {
FirstClient firstClient = new FirstClient();
firstClient.sendMessage();
}

}

 

SecondClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SecondClient implements MessageListener {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;
Thread idleThread = null;

public SecondClient() {

}

public void subscribeMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(this);
connection.start();
Runnable idleRunnable = new Runnable() {
@Override
public void run() {
while (true) {
// stay here idle..the program should not exit till the
// response receives..Thats why...
}
}

};
idleThread = new Thread(idleRunnable, "idleThread");
idleThread.start();
} catch (JMSException e) {
e.printStackTrace();
} catch (NamingException e) {
e.printStackTrace();
}
}

@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
try {
System.out.println("Message received is : " + text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
System.out.println("Going to exit...");
System.exit(-1);
}

public static void main(String[] args) {
SecondClient client = new SecondClient();
client.subscribeMessage();
}

}

 

Output

Before running the two clients , the openJMS should be started and the necessary libraries should be imported in the workspace as mentioned in the configuration section.First compile and run the SecondClient.java . Then the FirstClient.java.

Output of FirstClient.java

Message has successfully sent…

Output of SecondClient.java

Message received is : Hello …This is a sample message
Going to exit…

Conclusion

The second client is receiving the message  without waiting at a single point. In synchronous consumption we discussed earlier , the operation stays there at the receive() method. Here , there is no waiting . Once a new message is there , the onMessage() method will be invoked and message will be received.

 

See related discussions:

JMS overview

JMS example

Configuraing OpenJMS

Sending ObjectMessage in JMS

Point-to-Point messaging domain in JMS

Publish/Subscribe messaging domain in JMS

Synchronous message consumption in JMS