Configuring Web Services

Sometimes you may require to maintain a log file, you might prefer that the log file be written asynchronously so that you do not hold up any transactions from completing , but you still want a guarantee that the log entry is made. Messaging systems can solve this kind of problem. JBoss has completely rewritten the Messaging in version 5, it replaces the JBossMQ messaging server in earlier versions.

Messaging systems have been around for some time, many of the larger system vendors have provided Message-Oriented Middleware (MOM) for many years, most people have heard of IBM MQSeries. The Java specification expert group led by Sun Microsystems defined a standard API for message which was called the JMS (Java Messaging Service) API. The JBoss Messaging system is written entirely in Java and it supports the two different message system architectures.

Two basic architectures are used for a MOM

centralized

uses a central server to which all messages clients connects, the clients send messages to the centralized server which is responsible for delivering the message to the proper recipients, it is possible to cluster the centralized server to maximum capacity.

This solution has the advantage that you can have a very powerful centralized server to handle the messages, but every message must make at least two network hops.

decentralized

delegates the responsibility of message handling to the clients, the client messaging components communicate with each other to pass messages. In some cases a router using multicast facilitates this communication, in other cases the client messaging component communicates directly to other clients messaging components.

This solution has the advantage that it only takes only network hop to deliver the message, but making each server a powerful server comes at a high cost.

There are two messaging models

point-to-point

a sender places a message in a queue and from here it is delivered to one receiver. Several senders can place messages on the same queue and multiple receivers can receive them, but an individual message is delivered to only one receiver.

This solution is ideal as a transaction log, every EJB processing orders places transaction information on the audit queue, an audit receiver reads those messages and logs the transactions. You can increase the number of receivers if the queue becomes large.

publish-and-subscribe

a publisher places messages in a topic, every subscriber registered to receive messages on that topic receives a copy of the messages.

This solution is ideal when a business (publisher) wants to make some information available to all its business partners that subscribe.

JMS API

The JMS API has a number of interfaces which are in the javax.jms package, to receive messages you need to do the following

  1. Locate a ConnectionFactory (typically JNDI)
  2. Use the ConnectionFactory to create a Connection
  3. Use the Connection to create a Session
  4. Locate a Destination (typically JNDI)
  5. Use the Session to create a MessageConsumer for that Destination.

Methods on the MessageConsumer enable you to either query the Destination for messages or to register for message notification.

A message contains three data areas

header contains a set of properties which are required by the JMS specification and consists of values such as unique message id, JMSMessageId, timestamp. Getter and Setters are available on the javax.jms.Message interface to access these values.
properties

can contain several categories of properties, again the javax.jms.Message interface provides getters and setters to access these values, it is possible to send the entire message inside the properties, but you should use the payload. There are a number of message property categories

  • JMS-defined optional properties described in the JMS spec's. The names of these properties all begin with the string JMSX
  • Vendor-specific properties provided by the message service vendor, the names of these properties all begin with the string JMS_ followed by the vendor name. JBoss messaging does not have any vendor-specific properties
  • Application properties provided by the client sending the message
payload

The payload area contains the message data, there are a number of different payload types each of which is handled by an interface.

  • Message used for messages without a payload, this interface defines the setters and getters for the header data and properties, the other interfaces in the table extend the Message interface
  • BytesMessage used to send an array of uninterrupted bytes
  • MapMessage used to send a message as a set of name/value pairs, the names are Strings, and the values can be any primitive type or string
  • ObjectMessage used to send a serializable object, the recipient must have the class for the object and all of its required supporting classes in its classpath
  • StreamMessage used to send a stream of Java primitives and strings
  • TextMessage used to send a text string

JBoss Messaging Architecture

The JBoss messaging architecture consists of components that reside on the server and the client, these components work together to provide a reliable and scaleable messaging system.

The JBoss Messaging Core provides a generic messaging service that supports JMS and other types of messaging, the Core is a distributed messaging system based on JGroups. Because the Core is distributed there is no single point of failure when multiple servers are in use. The Core uses a pluggable persistence manager to persist message. It comes with a JDBC persistence manager but you can use other persistence managers that can store in other data stores such as file-based or memory-based. All messages are stored as Binary Large Objects (BLOBs). Each Core has its own database but multiple Cores can share a database, JBoss Transactions provides two-phase commit capabilities, the JMS Facade make the Core into a JMS provider. The JBoss Messaging client library provides a single JAR file containing all the necessary libraries including the JMS API used by the client application to communicate with the messaging service. JBoss Remoting provides the communication mechanism between the client and the server, this is ideal as it uses pluggable transports and data marshallers, current transports include TCP/IP, HTTP and a bidirectional socket transport similar to the UIL2 transport provided by JBossMQ.

JMS Application

The example below is that of a video store, which provides a video notification service to its customers when a new video arrives, a message is placed on a topic that can be subscribed to by its customers, each customer is then notified of the new video release, its basically a simple publish-and-subscribe client that uses the commandline and JBoss that supplies the messaging service.

The example has three objects - Store, Customer and Video

Video.java package org.jbia.jms.videostore;

public class Video implements java.io.Serializable {

  private String name;
  private String genre;

  public final String getGenre() {
    return genre;
  }

  public final void setGenre(String genre) {
    this.genre = genre;
  }

  public final String getName() {
    return name;
  }

  public final void setName(String name) {
    this.name = name;
  }

  // Using an annotation to let the compiler know we are overriding
  @Override
  public String toString() {
    return name + " [" + genre + "]";
  }
}

Store.java package org.jbia.jms.videostore;

// You could tidy this up but you can see what I am using here
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class Store implements MessageListener {

  private BufferedReader rdr = new BufferedReader(new InputStreamReader(System.in));

  private Session sessionProducer;
  private Session sessionConsumer;
  private MessageProducer producer;
  private MessageConsumer consumer;

  private void run() throws Exception {
    Connection connection = null;

    try {
          Context initial = new InitialContext();
          ConnectionFactory cf = (ConnectionFactory)initial.lookup("ConnectionFactory");
          Destination notify = (Destination)initial.lookup("topic/testTopic");
          Destination request = (Destination)initial.lookup("topic/testDurableTopic");

          connection = cf.createConnection();
          sessionProducer = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
          sessionConsumer = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

          producer = sessionProducer.createProducer(notify);
          consumer = sessionConsumer.createConsumer(request);

          consumer.setMessageListener(this);

          connection.start();

          // Loop for 10 seconds
          while (notifyOfVideos()) {
            Thread.sleep(10 * 1000);
          }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (connection != null) {
          sessionProducer.close();
          sessionConsumer.close();
          connection.close();
        }
    }
  }

  private boolean notifyOfVideos() throws Exception {

    System.out.println("Supply info for new video:");
    System.out.print("Name: ");
    String name = rdr.readLine();
    System.out.print("Genre: ");
    String genre = rdr.readLine();

    if (name.length() == 0 || genre.length() == 0) {
      return false;
    }

    Video video = new Video();
    video.setName(name);
    video.setGenre(genre);
    ObjectMessage om = sessionProducer.createObjectMessage(video);
    producer.send(om);
    return true;
  }

  public void onMessage(Message msg) {
    try {
          ObjectMessage om = (ObjectMessage)msg;
          Video video = (Video)om.getObject();
          String customer = om.getStringProperty("Customer");
          System.out.println("Reservation request:");
          System.out.println("\tcustomer: " + customer);
          System.out.println("\tvideo : " + video);
    } catch (JMSException e) {
        e.printStackTrace();
    }
  }

  public static void main(String[] args) throws Exception {
    new Store().run();
  }
}

Customer.java package org.jbia.jms.videostore;

// You could tidy this up but you can see what I am using here
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class Customer {

  private BufferedReader rdr = new BufferedReader(new InputStreamReader(System.in));

  private Session session;
  private MessageProducer producer;

  private MessageConsumer consumer;

  private void run(String customer) throws Exception {
    Connection connection = null;
    try {
         Context initial = new InitialContext();
         ConnectionFactory cf = (ConnectionFactory)initial.lookup("ConnectionFactory");
         Destination notify = (Destination)initial.lookup("topic/testTopic");
         Destination request = (Destination)initial.lookup("topic/testDurableTopic");

         connection = cf.createConnection();
         connection.setClientID(customer);
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

         producer = session.createProducer(request);
         consumer = session.createConsumer(notify);

         connection.start();

         System.out.println("Listening for video notifications...");
         while (listen(customer)) {/* do nothing */}
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (connection != null) {
          session.close();
          connection.close();
        }
    }
  }

  public boolean listen(String customer) throws Exception {
    
    boolean result = true;
    ObjectMessage om = (ObjectMessage)consumer.receive();
    Video video = (Video)om.getObject();
    System.out.println("New video available: " + video);

    System.out.print("Reserve a copy? [y,n,x(to exit)] ");
    String input = rdr.readLine();
    if (input.equalsIgnoreCase("y")) {
      om = session.createObjectMessage(video);
      om.setStringProperty("Customer", customer);
      producer.send(om);
    } else if (input.equalsIgnoreCase("x")) {
        result = false;
    }

    return result;
  }

  public static void main(String[] args) throws Exception {
    if (args.length == 0) {
      System.out.println("usage: customer <name>");
    } else {
      new Customer().run(args[0]);
    }
  }
}

When compiling make sure that the jbossall-client.jar is in the path, then package the file as below (into a videostore.jar file), you also need to create the jndi.properties file

compile java files javac -cp d:/jboss-5.0.0.GA/client -d . src/Video.java
javac -cp d:/jboss-5.0.0.GA/client/jbossall-client.jar;D:. -d . src/Store.java
javac -cp d:/jboss-5.0.0.GA/client/jbossall-client.jar;D:. -d . src/Customer.java
jndi.properties file java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
java.naming.provider.url=jnp://127.0.0.1:1099
create the videostore.jar file jar cvf videostore.jar org jndi.properties

// make sure everything is OK
jar tvf videostore.jar

Before we can run the application we need to make sure that the Topic Services are running, add the following to server/xxx/deploy/messaging/destinations-service.xml (I copied these from docs\examples\jms\example-destinations-service.xml file)

destinations-service.xml <mbean code="org.jboss.jms.server.destination.TopicService"
       name="jboss.messaging.destination:service=Topic,name=testTopic"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
  <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  <depends>jboss.messaging:service=PostOffice</depends>
  <attribute name="SecurityConfig">
    <security>
      <role name="guest" read="true" write="true"/>
      <role name="publisher" read="true" write="true" create="false"/>
      <role name="durpublisher" read="true" write="true" create="true"/>
    </security>
  </attribute>
</mbean>

<mbean code="org.jboss.jms.server.destination.TopicService"
       name="jboss.messaging.destination:service=Topic,name=testDurableTopic"
       xmbean-dd="xmdesc/Topic-xmbean.xml">
  <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  <depends>jboss.messaging:service=PostOffice</depends>
  <attribute name="SecurityConfig">
    <security>
      <role name="guest" read="true" write="true"/>
      <role name="publisher" read="true" write="true" create="false"/>
      <role name="durpublisher" read="true" write="true" create="true"/>
    </security>
  </attribute>
</mbean>
Start JBoss run -c default
Confirm that the Topic services are running, should see this in the log file 11:58:51,590 INFO [TopicService] Topic[/topic/testTopic] started, fullSize=200000, pageSize=2000, downCacheSize=2000
11:58:51,606 INFO [TopicService] Topic[/topic/testDurableTopic] started, fullSize=200000, pageSize=2000, downCacheSize=2000

Now we can run (test) our application

Run the clients

java -cp videostore.jar:%jboss_home%\client\jbossall-client.jar org.jbia.jms.videostore.Customer Will
java -cp videostore.jar:%jboss_home%\client\jbossall-client.jar org.jbia.jms.videostore.Customer Hay

java -cp videostore.jar:%jboss_home%\client\jbossall-client.jar org.jbia.jms.videostore.Store

Message-Driven Bean(MDB)

Session EJBs can be message producers, but this is unwise as there is only a limited number of session beans in the pool, so you wan to use them and return them back to the pool as quickly as possible, there are other problems that can occur when using session EJBs as message producers, and for these reasons the EJB 2.0 specification introduced a new type of EJB, the message-driven bean (MDB), the MDB was designed to be a message consumer, you told it which queue or topic should subscribe to and it processed the messages.

To explain how they work we will create one and use it in the example above, the MDB will monitor the videos that "Will" is watching, it basically wiretaps the request topic recording all of "Will's" video requests.

wireTap.java package org.jbia.jms.videostore;

import javax.ejb.*;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

@MessageDriven(activationConfig = {
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
  @ActivationConfigProperty(propertyName = "destination", propertyValue = "topic/testDurableTopic"),
  @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "Durable"),
  @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "Customer = 'Will'")
})

public class WireTap implements MessageListener {

  public void onMessage(Message msg) {
    try {
      ObjectMessage objmsg = (ObjectMessage)msg;
      Video video = (Video)objmsg.getObject();
      String perp = msg.getStringProperty("Customer");
      System.out.println("Perpetrator " + perp + " requested video " + video);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Build the WireTap.jar file javac -cp d:/jboss-5.0.0.GA/client/jbossall-client.jar;D:. -d . src/WireTap.java

jar cvf WireTap.jar org/jbia/jms/videostore/Video.class org/jbia/jms/videostore/WireTap.class
logfile output 2009-04-29 15:10:45,058 INFO [STDOUT] (WorkManager(2)-7) Perpetrator Will requested video ask a policeman [comedy]
use a deployment descriptor file instead of annotations (META-INF/ejb-jar.xml)

// Just remove the annotation and recompile

<ejb-jar
  xmlns="http://java.sun.com/xml/ns/javaee"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"
  version="3.0"
>
  <enterprise-beans>
    <message-driven>
      <ejb-name>WireTap</ejb-name>
      <ejb-class>org.jbia.jms.sofaspuds.WireTap</ejb-class>
      <message-destination-type>javax.jms.Topic</message-destination-type>
      <activation-config>
        <activation-config-property>
          <activation-config-property-name>destinationType</activation-config-property-name>
          <activation-config-property-value>javax.jms.Topic</activation-config-property-value>
        </activation-config-property>
        <activation-config-property>
          <activation-config-property-name>destination</activation-config-property-name>
          <activation-config-property-value>topic/testDurableTopic</activation-config-property-value>
        </activation-config-property>
        <activation-config-property>
          <activation-config-property-name>subscriptionDurability</activation-config-property-name>
          <activation-config-property-value>Durable</activation-config-property-value>
        </activation-config-property>
        <activation-config-property>
          <activation-config-property-name>messageSelector</activation-config-property-name>
          <activation-config-property-value>Customer = 'Will'</activation-config-property-value>
        </activation-config-property>
      </activation-config>
    </message-driven>
  </enterprise-beans>
</ejb-jar>

The @MessageDriven annotation declares this class to be a message-driven bean, the bean takes a series of configuration properties to govern the handling of the message as follows:

The WireTap class implements the MessageListener interface which requires the onMessage method. When the MDB receives a message, it extracts the Video object from the message, it then prints the surveillance data to the console. There are a number of standard and extended MDB configuration properties, the extended properties overwrite many of the ones defined in server/xxx/conf/standardjboss.xml

Property Default Description
destinationType javax.jms.Queue Identifies the destination as being javax.jms.Queue or javax.jms.Topic
subscriptionDurablility NonDurable Identifies the subscription to be Durable or NonDurable
acknowledgementMode Auto-acknowledge Indicates if the EJB container should automatically acknowledge the message as soon as the MDB receives the message for processing or can wait until later, if using the latter you must be able to handle duplicates, by checking the JMSXDeliveryCount message property
messageSelector null An expression that can be used to filter the messages that are sent to the MDB
Extended MDB configuration properties
destination null JNDI name for the topic or queue
clientId -generated- Used to set clientId for the connection
subscriptionName -generated- The durable subscription name
user null The account name used to determine access rights to the destination
password null The account password used to determine access rights to the destination
maxMessages 1 The maximum number of messages that can be assigned to a server session at one time
minSession 1 The minimum number of MDBs to keep in the pool
maxSession 15 The maximum number of MDBs to keep in the pool
keepAlive 60000 The amount of time to wait (milliseconds) before removing an unused MDB from the pool
reconnectInterval 10000 The amount of time to wait (milliseconds) before attemping to reconnect to the messaging server if the messaging server goes away
providerAdaptorJNDI java:/DefaultJMSProvider JNDI name of the JMS provider adaptor that's used to create messaging resources

Message-Driven POJOs

Message-Driven POJOs are what their name implies, they are Plain Old Java Objects that can be registered as message consumers, they are specific to JBoss and you should not use them if you application needs to be application server portable. Also message-driven POJOs are supported by the EJB container, meaning they cannot be used on a server configuration that does not support EJB i.e Tomcat.

Using an example to explain message-driven POJOs we will continue to extend the example above, to define a message-driven POJO you need both an interface and a class that implements that interface.

ICredit.java (interface) package org.jbia.jms.videostore;

import org.jboss.ejb3.annotation.Producer;

@Producer
public interface ICredit {
  void charge(String customer, double amount);
}
Credit.java package org.jbia.jms.videostore;

import javax.ejb.*;
import org.jboss.ejb3.annotation.Consumer;

@Consumer(activationConfig = {
  @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
})

public class Credit implements ICredit {

  public void charge(String customer, double amount) {
    System.out.println("Credit charge to " + customer + " for " + amount);
  }
}

Note: see the below update to destinations-service.xml file to add the messaging Queue

Store.java (updated) package org.jbia.jms.videostore;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.jboss.ejb3.mdb.*;

public class Store implements MessageListener {

  private BufferedReader rdr = new BufferedReader(new InputStreamReader(System.in));

  private Session sessionProducer;
  private Session sessionConsumer;
  private MessageProducer producer;
  private MessageConsumer consumer;

  private void run() throws Exception {
    Connection connection = null;
    try {
      Context initial = new InitialContext();
      ConnectionFactory cf = (ConnectionFactory)initial.lookup("ConnectionFactory");
      Destination notify = (Destination)initial.lookup("topic/testTopic");
      Destination request = (Destination)initial.lookup("topic/testDurableTopic");

      connection = cf.createConnection();
      sessionProducer = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
      sessionConsumer = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

      producer = sessionProducer.createProducer(notify);
      consumer = sessionConsumer.createConsumer(request);

      consumer.setMessageListener(this);

      connection.start();

      while (notifyOfVideos()) {
        Thread.sleep(10 * 1000);
      }
      } catch (Exception e) {
          e.printStackTrace();
      } finally {
          if (connection != null) {
            connection.close();
          }
      }
  }

  private boolean notifyOfVideos() throws Exception {

    System.out.println("Supply info for new video:");
    System.out.print("Name: ");
    String name = rdr.readLine();
    System.out.print("Genre: ");
    String genre = rdr.readLine();

    if (name.length() == 0 || genre.length() == 0) {
      return false;
    }

    Video video = new Video();
    video.setName(name);
    video.setGenre(genre);
    ObjectMessage om = sessionProducer.createObjectMessage(video);
    producer.send(om);
    return true;
  }

  public void onMessage(Message msg) {
    try {
      ObjectMessage om = (ObjectMessage)msg;
      Video video = (Video)om.getObject();
      String customer = om.getStringProperty("SpudsCustomer");
      System.out.println("Reservation request:");
      System.out.println("\tcustomer: " + customer);
      System.out.println("\tvideo : " + video);
      charge(customer);
    } catch (JMSException e) {
        e.printStackTrace();
    }
  }

  private void charge(String customer) {
    ProducerManager mgr = null;
    try {
      InitialContext ctx = new InitialContext();
      ICredit card = (ICredit)ctx.lookup(ICredit.class.getName());
      ProducerObject prod = (ProducerObject)card;
      mgr = prod.getProducerManager();
      mgr.connect();
      card.charge(customer, 4.95);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
          if (mgr != null) {
            mgr.close();
          }
        } catch (javax.jms.JMSException je) {/* ignore errors on close */}
    }
  }


  public static void main(String[] args) throws Exception {
     new Store().run();
  }
}

Note: the updates are in bold

Need to add a Queue message service to the destinations-service.xml <mbean code="org.jboss.jms.server.destination.QueueService"
       name="jboss.messaging.destination:service=Queue,name=testQueue"
       xmbean-dd="xmdesc/Queue-xmbean.xml">
  <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  <depends>jboss.messaging:service=PostOffice</depends>
  <attribute name="SecurityConfig">
    <security>
      <role name="guest" read="true" write="true"/>
      <role name="publisher" read="true" write="true" create="false"/>
      <role name="noacc" read="false" write="false" create="false"/>
    </security>
  </attribute>
</mbean>

Compile the java file (including Store.java) and create a new/update the jar archives, and restart JBoss so that the new messaging Queue starts

compile the java files javac -cp d:/jboss-5.0.0.GA/client/jbossall-client.jar;D:. -d . src/ICredit.java
javac -cp d:/jboss-5.0.0.GA/client/jbossall-client.jar;D:. -d . src/Credit.java
javac -cp d:/jboss-5.0.0.GA/client/jbossall-client.jar;D:. -d . src/Store.java
create the jar archives jar cvf videostore.jar org jndi.properties
jar cvf credit.jar org/jbia/jms/videostore/ICredit.class org/jbia/jms/videostore/Credit.class
confirm that the messaging Queue started 17:16:51,606 INFO [org.jboss.jms.server.destination.QueueService] (main)
   Queue[/queue/testQueue] started, fullSize=200000, pageSize=2000,downCacheSize=2000
the resulting output 2009-04-29 17:17:19,169 INFO [STDOUT] (WorkManager(2)-5) Credit charge to Will for 4.95

JBoss Messaging

We are now going to enchance the video store example to include the following

We have covered most of the above in previous topics, so I am going to give you a brief explanation on what needs to be done and leave you to investigate further.

We first start with configuring the data source, I will be using a Oracle database

Oracle tables

create table VUser (
  vname varchar(30) not null primay key,
  vpassword varchar(30) not null
);

create table VRole (
  vname varchar(30) not null primary key,
  vpassword varchar(30) not null
);

insert into VUser values ('Paul', 'Valle' );
insert into VUser values ('Will', 'Hay' );
insert into VUser values ('Norman', 'Wisdom' );

insert into VRole values ('Paul', 'vstore');
insert into VRole values ('Will', 'vcust');
insert into VRole values ('Norman', 'vcust');
  

video-ds.xml <datasources>
  <local-tx-datasource>
    <jndi-name>jdbc/VideoDS</jndi-name>
    <connection-url>jdbc:oracle:thin:@localhost:1521:D01</connection-url>

    <driver-class>oracle.jdbc.driver.OracleDriver</driver-class>
    <user-name>jboss</user-name>
    <password>jboss</password>
  
    <!-- Uses the pingDatabase method to check a connection is still valid before handing it out from the pool -->
    <valid-connection-checker-class-name>
      org.jboss.resource.adapter.jdbc.vendor.OracleValidConnectionChecker
    </valid-connection-checker-class-name>
  
    <!-- Checks the Oracle error codes and messages for fatal errors -->
    <exception-sorter-class-name>
      org.jboss.resource.adapter.jdbc.vendor.OracleExceptionSorter
    </exception-sorter-class-name>
  
    <!-- sql to call when connection is created
    <new-connection-sql>some arbitrary sql</new-connection-sql>
    -->

    <!-- sql to call on an existing pooled connection when it is obtained from pool -
         the OracleValidConnectionChecker is prefered
    -->
    <check-valid-connection-sql>select * from dual;</check-valid-connection-sql>

    <!-- corresponding type-mapping in the standardjbosscmp-jdbc.xml (optional) -->
    <metadata>
      <type-mapping>Oracle9i</type-mapping>
    </metadata>
  </local-tx-datasource>
</datasources>

oracle-persistence-service.xml use the docs/examples/jms/oracle-persistence-service.xml file changing the data source from DefaultDS to VideoDS and copy it to server/xxx/deploy/messaging directory, remove the hsqldb-persistence-service.xml file from this directory

The messaging service uses the DatabaseServerLoginModule which you will also use except you will change the security domain, we are going to add another security realm to the server/xxx/conf/login-config.xml file

video-realm <policy>
  ...
  <application-policy name="video-realm">
    <authentication>
      <login-module code="org.jboss.security.auth.spi.DatabaseServerLoginModule" flag="required">
      
        <module-option name="dsJndiName">
          java:/jdbc/VideoDS
        </module-option>
      
        <module-option name="principalsQuery">
          select vpassword from VUser where vname=?
        </module-option>

        <module-option name="rolesQuery">
          select vrole, 'Roles' from VRole where vname=?
        </module-option>

      </login-module>
    </authentication>
  </application-policy>
</policy>

The final step is to configure the messaging service to use the above login module, which is configured in server/xxx/deploy/messaging/messaging-jboss-beans.xml

messaging-jboss-beans.xml

<deployment ...>
  <bean name="SecurityStore" ...>
    <property name="defaultSecurityConfig">
      <![CDATA[<security>
        <role name="guest" read="true" write="true" create="true"
      </security>]]>
    </property>
    <property name="securityDomain">video-realm</property>
      ...
  </bean>
  ...
</deployment>

Note:
read - if true, then users in that role can receive messages from the destination
write - if true, then users in that role can send messages to that destination
create - if true, then users in that role can establish a durable topic subscription

We have mentioned durable subscriptions a number of times it means that the messaging server maintains the message in the topic until all registered durable subscribers have received a copy of the message, which means a subscriber can exit and return later to receive any messages that have appeared since the last time it was run, this only applies to the subscriber not the publisher.

We have already discussed configuring the destinations which were in server/xxx/deploy/messaging/destinations-service.xml file, it defines a number of MBeans that define, each MBean defines a destination. To create you own destinations it is better to create a *-service.xml file and populate it with the desired MBean definitions for your destinations.

The below file creates two topics one for the video notification and one for the reservation request, give the vstore role the right to create durable subscriptions so that customers can find out about videos that come in even when they are offline (note the create attribute).

video-service.xml <server>
  <mbean code="org.jboss.jms.server.destination.TopicService"
         name="jbia.jms:service=Topic,name=Notification"
         xmbean-dd="xmdesc/Topic-xmbean.xml">

    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
    <attribute name="SecurityConfig">
      <security>
        <role name="vstore" write="true" />
        <role name="vcust" read="true" create="true" />
      </security>
    </attribute>
  </mbean>

  <mbean code="org.jboss.jms.server.destination.TopicService"
         name="jbia.jms:service=Topic,name=Reservation"
         xmbean-dd="xmdesc/Topic-xmbean.xml">

    <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
    <depends>jboss.messaging:service=PostOffice</depends>
    <attribute name="SecurityConfig">
      <security>
        <role name="vstore" read="true" />
        <role name="vcust" write="true" />
      </security>
    </attribute>
  </mbean>
</server>

Now you need to update the application to make use of the username and roles

Store.java ...

  Context initial = new InitialContext();
  ConnectionFactory cf = (ConnectionFactory)initial.lookup("ConnectionFactory");
  Topic notify = (Topic)initial.lookup("topic/Notification");
  Topic request = (Topic)initial.lookup("topic/Reservation");

  connection = cf.createConnection("Paul", "Valle");

...
Customer.java

...

  Context initial = new InitialContext();
  ConnectionFactory cf = (ConnectionFactory)initial.lookup("ConnectionFactory");
  Topic notify = (Topic)initial.lookup("topic/Notification");
  Topic request = (Topic)initial.lookup("topic/Reservation");

  connection = cf.createConnection(customer, customer);

...

Now fire up the application and test it.

The last piece of the puzzle is to protect the message packets going across the wire, again I have covered encrypting web communications but will discuss what you need to do with this project

The first thing to do is to generate the public, private key pairs and certificates, see securing apps for details on how to do this.

The messaging service uses JBoss Remoting to pass messages between systems, you need to copy the example docs/examples/jms/remoting-sslbisocket-service.xml to server/xxx/deploy/messaging directory, they only thing that you need to update in this file is the keystore and the passwords relating to the private key pairs and certificates.

You need to add an additional dependency in the server/xxx/deploy/messaging/messaging-service.xml file.

add sslbiosocket dependency <server>
  ...
  <mbean name="jboss.messaging:service=ServerPeer" ...>
    <depends>jboss.messaging:service=Connector,transport=bisocket</depends>
    <depends>jboss.messaging:service=Connector,transport=sslbisocket</depends>
  </mbean>
</server>

You can require some clients to use SSL while others do not, whether a client must use SSL depends on the connection factory which is configured in server/xxx/deploy/messaging/connection-factories-service.xml

setting the connection factory <server>
  <mbean name="jboss.messaging.connectionFactory:service=ConnectionFactory" ...>
    <depends optional-attribute-name="Connector">
      jboss.mesaging:service=Connector,transport=sslbisocket</depends>
    ...
  </mbean>
  ...
</server>

You can declare multiple connection factories using different name for each and each can use a different connector, the client can then choose the appropriate connection factory to use either an SSL or an unencrypted transport.

I have a EJB 3 topic on JMS which uses annotations that make implementing JMS easier.