JMS Reliablity Mechanisms-Creating Durable Subscriptions in JMS

We already discussed the basic concepts of JMS earlier. We have seen the various mechanisms in JMS API to ensure the reliable message delivery.The reliability mechanisms we discussed so far are :

1)Setting message persistence

2)Setting Message Priority

3)Configuring message expiry time

In this chapter we are discussing how JMS API ensures reliable messaging by Creating Durable Subscriptions.

Creating Durable Subscriptions in JMS

As we discussed earlier  , a publish/Subscribe messaging domain is  useless , if the subscriber is not active while publisher is publishing the message to destination.There we discussed an example by using a  non-durable subscriber. If we create a durable subscriber instead of the non-durable subscriber , it is possible to ensure  reliable messaging.

A  durable subscriber can be created  by

TopicSubscriber subscriber = session.createDurableSubscriber(topic,
                    “sampleSubscription”);

topic is the Topic  destination and sampleSubscription is the name of the durable subscriber.

To delete a durable  subscription , first close the subscription and then unsubscribe as follows:

subscriber.close();

session.unsubscribe(“sampleSubscription”);

A durable subscriber can have only one active subscriber at a time.A  durable subscriber registers a durable subscription with a unique identity.Subsequent subscribers with the same identity will resume the subscription  in the state where the previous subscriber was left.If there is no active subscriber , then the JMS provider keeps the messages till they are received by any subscriber or till the message expires.

The idea will become clear once we discuss an example. Our example application has two clients. The first client sends a  a string message to destination . The second client creates a durable subscription. In the case of non-durable subscription as we discussed earlier , second client was not able to receive the messages , those sent while it is not active.In case of durable subscriptions, the second client should be able to receive  the messages  published to the destination , while it is not active. Let us verify it.

FirstClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class FirstClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicPublisher publisher = null;

public FirstClient() {

}

public void sendMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
publisher = session.createPublisher(topic);
TextMessage message = session.createTextMessage();
message.setText("Hello...This is a message");
connection.start();
publisher.publish(message);
System.out.println(this.getClass().getName()
+ " has sent a message : " + message);

} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}

if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}

public static void main(String[] args) {
FirstClient firstClient = new FirstClient();
firstClient.sendMessage();

}

}

SecondClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SecondClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;

public SecondClient() {

}

public void receiveMessage() {

Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,

"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,

"tcp://localhost:3035");
try {

context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context

.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createDurableSubscriber(topic,
"sampleSubscription");
// subscriber = session.createSubscriber(topic);
connection.start();
Message message = subscriber.receive();
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
System.out.println("Message Received is : " + text.getText());
}

} catch (NamingException e) {

e.printStackTrace();

} catch (JMSException e) {

e.printStackTrace();

}

if (context != null) {
try {
context.close();
} catch (NamingException ex) {

ex.printStackTrace();
}
}

if (connection != null) {

try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();

}
}
}

public static void main(String[] args) {

SecondClient secondClient = new SecondClient();
secondClient.receiveMessage();

}

}

Here , we are using OpenJMS   as JMS Provider. For running the clients , the OpenJMS should be configured and started. Also the necessary libraries should be there in the project space. These configurations were  explained earlier.

Output

In the example we discussed in earlier section, if we start the second client  after the first client exits , the second client was not able to receive the message published by the first client. Because there the second client created only a non-durable subscriptions. Here , the second client creates a durable subscription. So the second client receives the messages those published before it is active.let us verify the output here.

Start the first client. When it exits , start the second client.

Output of FirstClient.java

FirstClient has sent a message : Hello…This is a message

Output of SecondClient.java

Message Received is : Hello…This is a message

We can verify the difference between a durable and non-durable subscriptions , by changing the subscription to non-durable.

To ensure reliable delivery in publish/subscribe messaging domain  , use persistent delivery mode at publishing end . Also at subscribing end use durable subscription.These two mechanisms ensures reliable message delivery in case of publish/subscribe messaging domain.