We already discussed the basic concepts of JMS earlier. We have seen the various mechanisms in JMS API to ensure the reliable message delivery.The reliability mechanisms we discussed so far are :
3)Configuring message expiry time
In this chapter we are discussing how JMS API ensures reliable messaging by Creating Durable Subscriptions.
Creating Durable Subscriptions in JMS
As we discussed earlier , a publish/Subscribe messaging domain is useless , if the subscriber is not active while publisher is publishing the message to destination.There we discussed an example by using a non-durable subscriber. If we create a durable subscriber instead of the non-durable subscriber , it is possible to ensure reliable messaging.
A durable subscriber can be created by
TopicSubscriber subscriber = session.createDurableSubscriber(topic,
“sampleSubscription”);
topic is the Topic destination and sampleSubscription is the name of the durable subscriber.
To delete a durable subscription , first close the subscription and then unsubscribe as follows:
subscriber.close();
session.unsubscribe(“sampleSubscription”);
A durable subscriber can have only one active subscriber at a time.A durable subscriber registers a durable subscription with a unique identity.Subsequent subscribers with the same identity will resume the subscription in the state where the previous subscriber was left.If there is no active subscriber , then the JMS provider keeps the messages till they are received by any subscriber or till the message expires.
The idea will become clear once we discuss an example. Our example application has two clients. The first client sends a a string message to destination . The second client creates a durable subscription. In the case of non-durable subscription as we discussed earlier , second client was not able to receive the messages , those sent while it is not active.In case of durable subscriptions, the second client should be able to receive the messages published to the destination , while it is not active. Let us verify it.
FirstClient.java
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class FirstClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicPublisher publisher = null;
public FirstClient() {
}
public void sendMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
publisher = session.createPublisher(topic);
TextMessage message = session.createTextMessage();
message.setText("Hello...This is a message");
connection.start();
publisher.publish(message);
System.out.println(this.getClass().getName()
+ " has sent a message : " + message);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
FirstClient firstClient = new FirstClient();
firstClient.sendMessage();
}
}
SecondClient.java
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class SecondClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;
public SecondClient() {
}
public void receiveMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createDurableSubscriber(topic,
"sampleSubscription");
// subscriber = session.createSubscriber(topic);
connection.start();
Message message = subscriber.receive();
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
System.out.println("Message Received is : " + text.getText());
}
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
SecondClient secondClient = new SecondClient();
secondClient.receiveMessage();
}
}
Here , we are using OpenJMS as JMS Provider. For running the clients , the OpenJMS should be configured and started. Also the necessary libraries should be there in the project space. These configurations were explained earlier.
Output
In the example we discussed in earlier section, if we start the second client after the first client exits , the second client was not able to receive the message published by the first client. Because there the second client created only a non-durable subscriptions. Here , the second client creates a durable subscription. So the second client receives the messages those published before it is active.let us verify the output here.
Start the first client. When it exits , start the second client.
Output of FirstClient.java
FirstClient has sent a message : Hello…This is a message
Output of SecondClient.java
Message Received is : Hello…This is a message
We can verify the difference between a durable and non-durable subscriptions , by changing the subscription to non-durable.
To ensure reliable delivery in publish/subscribe messaging domain , use persistent delivery mode at publishing end . Also at subscribing end use durable subscription.These two mechanisms ensures reliable message delivery in case of publish/subscribe messaging domain.