JMS Domains-Publish Subscribe JMS Messaging Domain

We already discussed the fundamentals of JMS. There are two messaging domains in Java Messaging  API.

1)Point-to-Point Messaging Domain

2)Publish/Subscribe

In this chapter Publish/Subscribe JMS Messaging Domain with a suitable example.

Publish Subscribe JMS Messaging Domain

In a Publish/Subscribe messaging domain  the components are publishing clients , subscribing clients and the topic.Topics retains a message  only as long as they delivered to currently active subscribers.(In Point-to-Point messaging domain , it was different.There the queue retains a message  till it is delivered to a client or till the timeout expires).

publish-subscribe

In the example we are discussing here , we are using OpenJMS as service provider.before doing the coding , we should configure the OpenJMS and configure the workspace.

Publish Subscribe JMS Messaging Domain example

Now let us discuss an example to demonstrate the publish/subscribe messaging  domain. Here there are three clients. First client sends a message to the topic. The message will be  subscribed by the second and third clients . The second and third clients  should be active while first client is sending the message.Instead of sending simple text message we are sending  serializable objects. So let us see the EventMessage.java first. Instance of this class  is sending as message.

EventMessage.java

import java.io.Serializable;

public class EventMessage implements Serializable {

private static final long serialVersionUID = 1L;
private int messageId;
private String messageText = "";

public EventMessage(int id, String messageText) {
this.messageId = id;
this.messageText = messageText;
}

public int getMessageId() {
return messageId;
}

public void setMessageId(int messageId) {
this.messageId = messageId;
}

public String getMessageText() {
return messageText;
}

public void setMessageText(String messageText) {
this.messageText = messageText;
}
public String toString(){
return "Message Id = "+getMessageId()+" ; Message Text = "+getMessageText();
}

}

Now let us examine the FirstClient.java

FirstClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class FirstClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicPublisher publisher = null;

public FirstClient() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
publisher = session.createPublisher(topic);
EventMessage eventMessage = new EventMessage(1,
"Message from FirstClient");
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(eventMessage);
connection.start();
publisher.publish(objectMessage);
System.out.println(this.getClass().getName()
+ " has sent a message : " + eventMessage);

} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}

if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}

public void sendMessage() {

}

public static void main(String[] args) {
FirstClient firstClient = new FirstClient();
firstClient.sendMessage();
}

}

The operation can be summarized as follows.

1)JNDI look up to get the administered objects(TopicConnectionFactory & Topic)

2)Create TopicConnection object

3)Create TopicSession object

4)Create TopicPublisher object

5)Create object message and send it

7)Close context and connection.

Subscribing Clients

There are two subscribing clients. Their operation can be summarized as follows.

1)JNDI look up to get the administered objects(TopicConnectionFactory & Topic)

2)Create TopicConnection object

3)Create TopicSession object

4)Create TopicSubscriber object

5)Start connection and receive message

6)Close connection & context.

SecondClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SecondClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;

public SecondClient() {

}

public void receiveMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createSubscriber(topic);
connection.start();
Message message = subscriber.receive();
if (message instanceof ObjectMessage) {
Object object = ((ObjectMessage) message).getObject();
System.out.println(this.getClass().getName()
+ " has received a message : " + (EventMessage) object);
}

} catch (NamingException e) {

e.printStackTrace();
} catch (JMSException e) {

e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}

if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}

}

public static void main(String[] args) {
SecondClient secondClient = new SecondClient();
secondClient.receiveMessage();

}

}

ThirdClient.java

import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ThirdClient {
private Context context = null;
private TopicConnectionFactory factory = null;
private TopicConnection connection = null;
private TopicSession session = null;
private Topic topic = null;
private TopicSubscriber subscriber = null;

public ThirdClient() {

}

public void receiveMessage() {
Properties initialProperties = new Properties();
initialProperties.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
initialProperties.put(InitialContext.PROVIDER_URL,
"tcp://localhost:3035");
try {
context = new InitialContext(initialProperties);
factory = (TopicConnectionFactory) context
.lookup("ConnectionFactory");
topic = (Topic) context.lookup("topic1");
connection = factory.createTopicConnection();
session = connection.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
subscriber = session.createSubscriber(topic);
connection.start();
Message message = subscriber.receive();
if (message instanceof ObjectMessage) {
Object object = ((ObjectMessage) message).getObject();
System.out.println(this.getClass().getName()
+ " has received a message : " + (EventMessage) object);
}

} catch (NamingException e) {

e.printStackTrace();
} catch (JMSException e) {

e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException ex) {
ex.printStackTrace();
}
}

if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}

}

public static void main(String[] args) {
ThirdClient thirdClient = new ThirdClient();
thirdClient.receiveMessage();
}

}

Now let us see the output. For running this application we should start and configure the OpenJMS server. The necessary libraries should be there in class path too. These things were explained in configuration page.

Output

Run the secondClient.java and ThirdClient.java. Then run the FirstClient.java.

Output of FirstClient.java

FirstClient has sent a message : Message Id = 1 ; Message Text = Message from FirstClient

Output of SecondClient.java

SecondClient has received a message : Message Id = 1 ; Message Text = Message from FirstClient

Output of ThirdClient.java

ThirdClient has received a message : Message Id = 1 ; Message Text = Message from FirstClient

See related topics:

JMS overview

Configuring OpenJMS

JMS example

Object message in JMS

Point to point domain in JMS

Synchronous message consumption in JMS

Asynchronous message consumption in JMS

One thought on “JMS Domains-Publish Subscribe JMS Messaging Domain

  1. sanjeev Reply

    session.close()
    should be in finally block. The existing program will cause connection leak.

Leave a Reply

Your email address will not be published. Required fields are marked *