We already discussed the fundamentals of JMS. There are two messaging domains in Java Messaging API.
1)Point-to-Point Messaging Domain
2)Publish/Subscribe
In this chapter Publish/Subscribe JMS Messaging Domain with a suitable example.
Publish Subscribe JMS Messaging Domain
In a Publish/Subscribe messaging domain the components are publishing clients , subscribing clients and the topic.Topics retains a message only as long as they delivered to currently active subscribers.(In Point-to-Point messaging domain , it was different.There the queue retains a message till it is delivered to a client or till the timeout expires).
In the example we are discussing here , we are using OpenJMS as service provider.before doing the coding , we should configure the OpenJMS and configure the workspace.
Publish Subscribe JMS Messaging Domain example
Now let us discuss an example to demonstrate the publish/subscribe messaging domain. Here there are three clients. First client sends a message to the topic. The message will be subscribed by the second and third clients . The second and third clients should be active while first client is sending the message.Instead of sending simple text message we are sending serializable objects. So let us see the EventMessage.java first. Instance of this class is sending as message.
EventMessage.java
import java.io.Serializable;
public class EventMessage implements Serializable {
private static final long serialVersionUID = 1L;
private int messageId;
private String messageText = "";
public EventMessage(int id, String messageText) {
this.messageId = id;
this.messageText = messageText;
}
public int getMessageId() {
return messageId;
}
public void setMessageId(int messageId) {
this.messageId = messageId;
}
public String getMessageText() {
return messageText;
}
public void setMessageText(String messageText) {
this.messageText = messageText;
}
public String toString(){
return "Message Id = "+getMessageId()+" ; Message Text = "+getMessageText();
}
}
Now let us examine the FirstClient.java
FirstClient.java
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class FirstClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicPublisher publisher = null;
public FirstClient() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
publisher = session.createPublisher(topic);
EventMessage eventMessage = new EventMessage(1,
"Message from FirstClient");
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(eventMessage);
connection.start();
publisher.publish(objectMessage);
System.out.println(this.getClass().getName()
+ " has sent a message : " + eventMessage);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public void sendMessage() {
}
public static void main(String[] args) {
FirstClient firstClient = new FirstClient();
firstClient.sendMessage();
}
}
The operation can be summarized as follows.
1)JNDI look up to get the administered objects(TopicConnectionFactory & Topic)
2)Create TopicConnection object
3)Create TopicSession object
4)Create TopicPublisher object
5)Create object message and send it
7)Close context and connection.
Subscribing Clients
There are two subscribing clients. Their operation can be summarized as follows.
1)JNDI look up to get the administered objects(TopicConnectionFactory & Topic)
2)Create TopicConnection object
3)Create TopicSession object
4)Create TopicSubscriber object
5)Start connection and receive message
6)Close connection & context.
SecondClient.java
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class SecondClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;
public SecondClient() {
}
public void receiveMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createSubscriber(topic);
connection.start();
Message message = subscriber.receive();
if (message instanceof ObjectMessage) {
Object object = ((ObjectMessage) message).getObject();
System.out.println(this.getClass().getName()
+ " has received a message : " + (EventMessage) object);
}
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
SecondClient secondClient = new SecondClient();
secondClient.receiveMessage();
}
}
ThirdClient.java
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class ThirdClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;
public ThirdClient() {
}
public void receiveMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createSubscriber(topic);
connection.start();
Message message = subscriber.receive();
if (message instanceof ObjectMessage) {
Object object = ((ObjectMessage) message).getObject();
System.out.println(this.getClass().getName()
+ " has received a message : " + (EventMessage) object);
}
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
ThirdClient thirdClient = new ThirdClient();
thirdClient.receiveMessage();
}
}
Now let us see the output. For running this application we should start and configure the OpenJMS server. The necessary libraries should be there in class path too. These things were explained in configuration page.
Output
Run the secondClient.java and ThirdClient.java. Then run the FirstClient.java.
Output of FirstClient.java
FirstClient has sent a message : Message Id = 1 ; Message Text = Message from FirstClient
Output of SecondClient.java
SecondClient has received a message : Message Id = 1 ; Message Text = Message from FirstClient
Output of ThirdClient.java
ThirdClient has received a message : Message Id = 1 ; Message Text = Message from FirstClient
session.close()
should be in finally block. The existing program will cause connection leak.