counter


Username:Password:
///////////////////////////////////

February 25, 2008

Using ActiveMQ JMS from C#

Filed under: Apache — admin @ 4:25 pm

Wishing to have a publish/subscribe message queue available in .NET, I have looking for ways to allow .NET to talk to a JMS implementation. After reading the article An Introduction to IKVM by Avik Sengupta, I took the latest release of ActiveMQ and IKVM, complied the jar files (as described in the article), and modified one of the examples from the ActiveMQ distribution and created a C# version (see below). Everything seems to work. The example submits items to the queue, and them retrieves them.

using System;
using System.Threading;
using ikvm.lang;
using org.codehaus.activemq;
using org.codehaus.activemq.util;
using org.codehaus.activemq.message;
using javax.jms;
using java.util;

namespace DefaultNamespace
{
    class MainClass : MessageListener
    {
        protected int messageCount = 100;
        protected String[] data;
        protected ActiveMQConnectionFactory connectionFactory;
        protected Session session;
        protected MessageConsumer consumer;
        protected MessageProducer producer;
        protected Destination destination;
        protected Connection connection;

        protected Connection receiveConnection;
        protected Session receiveSession;

        protected List messages = Collections.synchronizedList(new ArrayList());
        protected bool topic = true;

        protected ActiveMQMessage createMessage()
        {
            return new ActiveMQMessage();
        }

        protected Destination createDestination(String subject)
        {
            return new ActiveMQTopic(subject);
        }

        public void testSendReceive()
        {
            messages.clear();

            for (int i = 0; i < data.Length; i++)
            {
                Message message = session.createTextMessage(data[i]);

                Console.WriteLine(“About to send a message: “ + message + ” with text: “ + data[i]);

                producer.send(destination, message);
            }

            // lets wait a little while
            Thread.Sleep(4000);

            Console.WriteLine(“should have received a message: {0} {1} {2}”, messages, data.Length, messages.size());

            for (int i = 0; i < data.Length; i++)
            {
                TextMessage received = (TextMessage) messages.get(i);
                String text = received.getText();

                Console.WriteLine(“Received Text: “ + text);

            }
        }

        public MainClass()
        {
            topic = true;

            data = new String[messageCount];
            for (int i = 0; i < messageCount; i++)
            {
                data[i] = “Text for message: “ + i + ” at “ + DateTime.Now.ToLongTimeString();
            }

            connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL(“vm://localhost”);
            connectionFactory.setUseEmbeddedBroker(true);

            connection = connectionFactory.createConnection();
            receiveConnection = connectionFactory.createConnection();

            Console.WriteLine(“Created connection: “ + connection);

            session = connection.createSession(false, 1);
            receiveSession = receiveConnection.createSession(false, 1);

            Console.WriteLine(“Created session: “ + session);
            producer = session.createProducer(null);

            Console.WriteLine(“Created producer: “ + producer);

            if (topic)
            {
                destination = receiveSession.createTopic(“FOO.BAR”);
            }
            else
            {
                destination = session.createQueue(“FOO.QUEUE”);
            }

            consumer = receiveSession.createConsumer(destination);
            consumer.setMessageListener(this);
            receiveConnection.start();
                connection.start();

            Console.WriteLine(“Created connection: “ + connection);

            testSendReceive();

            Console.WriteLine(“Dumping stats…”);
            connectionFactory.getFactoryStats().dump(new IndentPrinter());

            Console.WriteLine(“Closing down connection”);

            session.close();
            receiveSession.close();

            connection.close();
            receiveConnection.close();

            connectionFactory.stop();
            connection.close();

        }

        public void onMessage(Message message)
        {
            Console.WriteLine(“Received message: “ + message);

            messages.add(message);
        }

        public static void Main(string[] args)
        {
            MainClass mc = new MainClass();
        }
    }
}

Magic with JMS, MDBs, and ActiveMQ in Geronimo

Filed under: Apache — admin @ 4:23 pm
Apache Geronimo is the workhorse open source Java™ 2 Platform, Enterprise Edition (J2EE) 1.4 server container with an open framework that’s ready to host a variety of existing servers and services. ActiveMQ is a proven best-of-breed, open source Java Messaging Service (JMS) engine with a Swiss-army-knife arsenal of features and connectivity options. When you marry the two, magic happens! Sing Li takes you on a grand tour of this symbiotic relationship, providing example code to help you get started writing JMS applications and creating message-driven beans (MDBs) with Geronimo immediately.

The JMS API is an integral part of the J2EE platform, enabling message-based communications between loosely coupled components. Messages can be sent and received throughout the J2EE stack between clients, Web-tier components, business-tier Enterprise JavaBeans (EJBs), and Enterprise Information System (EIS)-tier services. These messages are sent asynchronously, where the sender continues with other application logic once the message is sent, and a message broker is responsible for delivering the message on behalf of the sender. Messages can be sent and received between specific endpoints (sender and recipients), or passed anonymously between producer(s) and consumer(s) using a publish/subscribe interaction model. Within the J2EE framework, components communicating through JMS can leverage the security and transaction capabilities provided by the container. Geronimo supports this vital API by integrating an open source project called ActiveMQ.

This article explores the ActiveMQ integration within Geronimo. You’ll see how Geronimo benefits from the integration and how ActiveMQ’s capabilities are enhanced when hosted inside Geronimo. A hands-on example, which you can download from this article, reveals how to code clients and Geronimo components that communicate through JMS. Another example shows you how to create versatile MDBs within Geronimo.

ActiveMQ: a best-of-breed, open source JMS implementation

ActiveMQ is a mature and feature-rich JMS server, or message broker in JMS terminology. Housed at Codehaus (see Resources), ActiveMQ supports a large variety of transports (such as TCP, SSL, UDP, multicast, intra-JVM, and NIO) and client interactions (such as push, pull, and publish/subscribe). With a sizable existing user base, the ActiveMQ server can work in a completely stand-alone state, independent of any container (J2EE or otherwise), or in conjunction with a J2EE server host, such as Geronimo.

When operated within Geronimo, ActiveMQ provides support for MDBs, which are EJBs that consume JMS messages. The asynchronous nature of JMS enables MDBs to be activated on demand by the container to perform work within the J2EE server on behalf of clients. Geronimo benefits greatly from ActiveMQ’s rich client support. Unlike session or entity EJBs, MDBs are not called through rigid EJB interfaces. Instead, a client can invoke MDB services by simply sending a JMS message to a destination. This significantly simplifies the coding of clients that consume EJB-based services. In fact, Geronimo inherits the ability to provide services to non-J2EE clients — those supported by the stand-alone ActiveMQ server.

Back to top

Wrapping the ActiveMQ message broker in a GBean

The ActiveMQ message broker is designed to be embeddable. Geronimo takes advantage of this capability. By providing a GBean container that wraps an embedded message broker, the ActiveMQContainerGBean is created. Other components of the ActiveMQ message broker are also wrapped as GBeans, as you will see shortly. Figure 1 illustrates how the ActiveMQContainerGBean embeds the ActiveMQ message broker.
Figure 1. The ActiveMQContainerGBean
The ActiveMQContainerGBean
By wrapping the ActiveMQ message broker in a GBean, the broker’s life cycle can now be managed through Geronimo. Specifically, you can deploy, start, and manage instances of ActiveMQ message brokers using Geronimo deployer and management tools. The configurable properties of the message broker are exposed as GBean properties. In Figure 1, the ActiveMQContainerGBean wraps the ActiveMQ message broker and provides life cycle management and configuration services. In addition to life cycle and configuration support, Geronimo provides the persistence support for durable messages. The current ActiveMQ server configuration uses the default Derby RDBMS instance to handle persistence.

The default J2EE server assembly supplied by Geronimo has an instance of the ActiveMQ message broker integrated. This assembly starts the message broker when the server is started, typically using the command line:

java -jar binserver.jar

To see how this instance is configured, examine the system-activemq-plan.xml deployment plan in the plan directory of the Geronimo source code distribution. You’ll see how the ActiveMQContainerGBean and other related ActiveMQ components are configured. Listing 1 shows the relevant segment in this deployment plan.
Listing 1. Configuring the ActiveMQContainerGBean in the system-activemq-plan.xml deployment plan

<?xml version="1.0" encoding="UTF-8"?>
<configuration
    xmlns="http://geronimo.apache.org/xml/ns/deployment"
    configId="org/apache/geronimo/ActiveMQServer"
    parentId="org/apache/geronimo/SystemDatabase">
...
<gbean name="ActiveMQ"
  class="org.activemq.gbean.ActiveMQContainerGBean">
  <attribute name="brokerName">possibly-unique-broker</attribute>
  <reference name="persistenceAdapter">
  <gbean-name>geronimo.server:j2eeType=JMSPersistence,name=ActiveMQ.cache,*</gbean-name>
  </reference>
 </gbean>

<gbean name="ActiveMQ.cache"
class="org.activemq.store.cache.SimpleCachePersistenceAdapterGBean">
<attribute name="cacheSize">10000</attribute>
<reference name="longTermPersistence">
<gbean-name>geronimo.server:j2eeType=JMSPersistence,name=ActiveMQ.journal,*</gbean-name>
</reference>
</gbean>

<gbean name="ActiveMQ.jdbc"
class="org.activemq.store.jdbc.JDBCPersistenceAdapterGBean">
  <reference name="dataSource">
<gbean-name>geronimo.server:J2EEApplication=null,J2EEServer=geronimo,
JCAResource=org/apache/geronimo/SystemDatabase,j2eeType=JCAManagedConnectionFactory,
name=SystemDatasource</gbean-name>
</reference>
</gbean>

<gbean name="ActiveMQ.tcp.${PlanServerHostname}.${PlanActiveMQPort}"
  class="org.activemq.gbean.ActiveMQConnectorGBean">
  <attribute name="url">tcp://${PlanServerHostname}:${PlanActiveMQPort}</attribute>
   <reference
    name="activeMQContainer">
<gbean-name>geronimo.server:j2eeType=JMSServer,name=ActiveMQ,*</gbean-name>
</reference>
  </gbean>

<gbean name="ActiveMQ.vm.localhost" class="org.activemq.gbean.ActiveMQConnectorGBean">
  <attribute name="url">vm://localhost</attribute>
  <reference name="activeMQContainer">
  <gbean-name>geronimo.server:j2eeType=JMSServer,name=ActiveMQ,*</gbean-name>
  </reference>
</gbean>

In Listing 1, the first GBean sets up the ActiveMQ message broker. The GBean has an attribute with a reference to a persistence adapter, and points to the second GBean. The second GBean wraps an instance of the ActiveMQ in-memory cache. This second GBean references ActiveMQ’s persistence component for long-term persistence. The third GBean wires a JDBC datasource for the persistence, pointing to the SystemDatasource — a Derby RDBMS instance. The last two GBeans set up the following two transports for accessing the message broker:

  • A TCP transport, by default at the localhost and port 61616
  • An in-VM transport, by default mapped to vm://localhost

If you ever need to change the configuration of this instance, you can modify this system-activemq-plan.xml file and then redeploy the org/apache/geronimo/ActiveMQServer configuration.

Back to top

Accessing the message broker through JCA 1.5 resource adapter

An application component (such as a servlet, JSP, or EJB) utilizing JMS must access the message broker through a library that implements the JMS API. This API is provided by Geronimo. To implement this API in a way that will work with any JMS provider (such as a message broker), Geronimo supports the J2EE Connector (JCA) 1.5 specification. The JCA 1.5 specification details the contracts required between the application server (Geronimo) and the resource adapter (RA) — a driver supplied by ActiveMQ. (See Resources for an article on JCA 1.5.) Managed application components hosted in Geronimo access the ActiveMQ message broker only through this RA. This is another major benefit that ActiveMQ brings to the table — it has a ready-for-integration JCA 1.5-compliant RA implementation. Figure 2 shows the ActiveMQ JCA 1.5-compliant RA.
Figure 2. The ActiveMQ JCA 1.5-compliant RA
The ActiveMQ JCA 1.5 compliant RA
In Figure 2, you can see that the ActiveMQ RA interacts with Geronimo’s security and transaction system through well-defined JCA 1.5 system contracts. The ActiveMQ RA supports both outbound connections (JMS calls out to the message broker), and inbound connections (ActiveMQ-originated calls call into MDBs). For outbound connections, the message-sending application can enroll the JMS provider (ActiveMQ in this case) as a part of a distributed transaction, perhaps involving another resource manager (such as an RDBMS). For inbound connections, typically activating MDBs, the transaction is started by Geronimo. In the current JCA 1.5 specification, and in the Geronimo implementation, inbound connections do not interact with the container’s security subsystem.

The WorkManager contract is part of JCA 1.5. It allows an RA to submit work to the application server for execution. This allows the server — Geronimo — to have control over the thread management and work distribution for a compliant RA. In the case of ActiveMQ, this feature is used to manage threads for the inbound connections. For more information on this feature, read “JCA 1.5, Part 3: Message inflow” (developerWorks, June 2005).

Listing 2 shows a fragment of the system-jms-plan.xml deployment plan. This plan configures the components for an RA instance, as detailed for a deployment descriptor in the JCA 1.5 specification. In the ActiveMQ RA case, these components include an RA implementation class, an outbound connection factory, topics, and queues (administered objects).

The deployment plan in Listing 2 configures the following:

  • An instance of the ActiveMQ RA, to access the ActiveMQ message broker configured in Listing 2, using the TCP protocol for transport
  • A JCA 1.5 WorkManager implementation, supplied by Geronimo, for the ActiveMQ RA instance
  • A JMS connection factory that can be used to create queue connections (through the QueueConnectionFactory interface), or topic connections (through the TopicConnectionFactory interface)
  • An admin object that contains a JMS queue named MDBTransferBeanOutQueue
  • An admin object that contains a JMS queue named SendReceiveQueue

If you need to modify this default configuration of the JMS RA (for example, to add another queue), you’ll need to make the appropriate changes in the system-jms-plan.xml deployment plan. To make any change effective, first undeploy the org/apache/geronimo/SystemJMS configuration, and then redeploy it. For other deployment plans where you can configure instances of ActiveMQ RAs, see the sidebar Deploying ActiveMQ RAs.

Back to top

Application client JMS access

A J2EE application client, supported by Geronimo’s Client Application Container, can have access to the ActiveMQ message broker. Figure 3 shows some client access configurations.

Deploying ActiveMQ RAs
Other than server-start deployment in the system-jms-plan.xml, you can also deploy ActiveMQ RA instances in one of the following ways:

  • Stand-alone, using an ra.xml deployment plan
  • With your Enterprise Application aRchive (EAR), inside a <module> element within the geronimo-application.xml deployment plan
  • With a Web Application aRchive (WAR), inside a <resource> element within the geronimo-web.xml deployment plan

Figure 3. J2EE and non-J2EE client access to message broker
J2EE and non-J2EE client access to message broker
The top configuration in Figure 3 shows a J2EE client application accessing an RA deployed in the client scope. This RA instance is configured to access the message broker located at the server. The second configuration in Figure 3 shows another J2EE client with an RA deployed in the client scope, but accessing a message broker instance deployed also at the client. (This can be useful in disconnected operations — notebook computers that occasionally connect to the enterprise network.) In the third configuration, a non-J2EE client’s access to the ActiveMQ message broker can be direct through any configured transport. The hands-on example demonstrates how to create such a stand-alone, non-J2EE ActiveMQ application client.

Back to top

Creating a JMS application: a servlet producer with native ActiveMQ consumer

This first example shows how a servlet can communicate with an external non-J2EE application through JMS. The example uses the global server-scoped queue called SendReceiveQueue (see Listing 2). It consists of a JMS message sender (producer), and a JMS message receiver (consumer). The producer is a servlet, called SenderServlet, that runs inside Geronimo. The consumer is a stand-alone ActiveMQ client that does not use Geronimo for JMS support. Figure 4 shows the interactions in this example.
Figure 4. SenderServlet with ActiveMQ receiver application
SenderServlet with ActiveMQ receiver application
In Figure 4, the application flow is as follows:

  1. The user accesses the SenderServlet using a Web browser.
  2. The SenderServlet, hosted by Geronimo, presents a data-entry form to the user.
  3. The user enters a text message and clicks Send.
  4. The SenderServlet processes the form submission and uses JMS to send the text message to the SendReceiveQueue.
  5. The stand-alone, non-J2EE ActiveMQ client reads the SendReceiveQueue and displays the received message.

The Web application source code for this example is in the war_only directory of the code download (see the Download section below Resources). The client code is in the mqclient subdirectory.

If you just want to try out the example, you can find the sender.war archive in the war_only/dist subdirectory. Just deploy the following application archive on your Geronimo server:

java -jar bin/deployer.jar sender.war

Use system for the user name and manager for the password when prompted. Follow the instructions in the readme.txt file to build the client. Next, go into the mqclient directory and run the client application using the run.bat file. This will start the client waiting for incoming messages.

Access the data entry servlet through a browser using the URL http://localhost:8080/sender/sendform.cgi.

Enter a text message into the field, and click Send. Notice that the ActiveMQ client receives and prints out the message immediately.

View Listing 3 to see some of the code for SendServlet.java. You can find the complete source in the war_only/src directory.

In Listing 3, the servlet generates the input form on an HTTP GET request and handles form submission on an HTTP POST request. The actual doPost() method extracts the entered message, creates a queue connection, starts a session, and sends the message to the queue.

The servlet init() method looks up the connection factory and queue using Java Naming and Directory Interface (JNDI). Geronimo provides the JNDI mapping service. You can see that the queue is looked up using java:comp/env/dwSendReceiveQueue. This name is mapped in the web.xml deployment descriptor for the Web application. The relevant segment of web.xml is reproduced in Listing 4.
Listing 4. JNDI mapping and resource reference in web.xml

<resource-ref>
  <res-ref-name>DefaultActiveMQConnectionFactory</res-ref-name>
  <res-type>javax.jms.QueueConnectionFactory</res-type>
  <res-auth>Container</res-auth>
</resource-ref>

<message-destination-ref>
  <message-destination-ref-name>dwSendReceiveQueue</message-destination-ref-name>
  <message-destination-type>javax.jms.Queue</message-destination-type>
  <message-destination-usage>Produces</message-destination-usage>
  <message-destination-link>SendReceiveQueue</message-destination-link>
</message-destination-ref>

In Listing 4, the <message-destination-ref> maps the name dwSendReceiveQueue to the system scoped queue through a <message-destination-link> subelement. Unfortunately, the J2EE 1.4 specification does not support a <resource-link> subelement inside a <resource-ref> element in web.xml. As a result, you must use the actual resource name for the connection factory (DefaultActiveMQConnectionFactory), or you will have to create a custom deployment plan (such as geronimo-web.xml) to map the reference.

Coding the ActiveMQ message receiver client

The receiver is a native ActiveMQ client; it does not use any Geronimo client support. It accesses the ActiveMQ message broker hosted in Geronimo using the TCP transport and the URL tcp://localhost:61616. Partial source code to this client, JMSReceiver, is shown in Listing 5. You can see the complete source code in the war_only/clientsrc directory.
Listing 5. JMSReceiver.java — a non-J2EE JMS messages receiver client

package com.ibm.dw.geronimo.jms;

import javax.jms.*;
import org.activemq.ActiveMQConnectionFactory;

public class JMSReceiver {
	protected Queue queue;
	protected String queueName = "SendReceiveQueue";
	protected String url = "tcp://localhost:61616";
	protected int ackMode = Session.AUTO_ACKNOWLEDGE;

	public static void main(String[] args) {
		JMSReceiver msgReceiver = new JMSReceiver();
		msgReceiver.run();
	}

	public void run() {
		try {
			ActiveMQConnectionFactory connectionFactory =
				new ActiveMQConnectionFactory(url);
			QueueConnection connection =
				(QueueConnection)
				connectionFactory.createConnection();
			connection.start();
			MessageConsumer consumer = null;
			Session session = connection.createQueueSession(
					false,
					Session.AUTO_ACKNOWLEDGE);
			queue = session.createQueue(queueName);
			consumer = session.createConsumer(queue);
			System.out.println("Waiting for message (max 5)");
			for (int i = 0; i < 5; i++) {
				Message message = consumer.receive();
				processMessage(message);
			}
			System.out.println("Closing connection");
			consumer.close();
			session.close();
			connection.close();

		} catch (Exception e) {
        ...
		}
	}

	public void processMessage(Message message) {
		try {
			TextMessage txtMsg = (TextMessage) message;
			System.out.println("Received a message: " + txtMsg.getText());
		} catch (Exception e) {
        ...
        }
	}

}

In Listing 5, the JMSReceiver is completely free of J2EE code. There is no need to perform JNDI lookup or destination mapping, because the ActiveMQ native library performs the equivalent actions automatically.

Coding this non-J2EE message consumer is straightforward, using a mix of JMS API and ActiveMQ support library. Coding of a J2EE message consumer, with Geronimo’s support, is also simple. The next example shows the construction of a useful J2EE message consumer: an MDB that adds product categories to the Really Big Pet Store example.

Back to top

Creating an MDB for data update

The second hands-on example shows the construction of an MDB. This example uses the same SendServlet code as in the first example, but the message consumer is now an MDB inside the same enterprise application (bundled in the same EAR). Figure 5 illustrates the operation of this example.
Figure 5. Operation of the MDB example
Operation of the MDB example
The example uses code from the article, “Geronimo! Part 2: Tame this J2EE 1.4 bronco” (developerWorks, May 2005). The article includes more information on the operation of the Really Big Pet Store.

In Figure 5, the operation of the original Web application is left intact. Shoppers use a browser to access the store’s JSP-based user interface. The StoreController servlet fetches the product categories information using the getCats() method of a stateless session EJB called CategoriesBean.

CategoriesBean has been modified to use a new helper class, called CategoryData, to obtain the list of categories.

The MDB is called CategoriesMDB. This EJB is activated when a message is placed into the SendReceiveQueue. CategoriesMDB extracts the message content and uses it to add a product category to CategoryData. Because the session EJB, CategoriesBean, uses CategoryData for every screen update, the new category is visible to the shopper immediately.

In a production environment, the CategoryData helper class can be replaced by an entity bean representing the categories. The use of an entity bean is purposely avoided here to simplify the configuration and setup for this example.

Coding CategoriesMDB

The code for SenderServlet is almost identical to the version in the first example. See ear_ejb/src/SenderServlet.java for complete source.

The code for CategoriesMDB is shown in Listing 6. You can find complete source at ejb/CategoriesMDB.java.
Listing 6. CategoriesMDB — consuming JMS messages and add category

import javax.ejb.*;
import javax.jms.*;
import com.ibm.dw.reallybigpet.ejb.CategoryData;

public class CategoriesMDB
  implements MessageDrivenBean,MessageListener {

private transient MessageDrivenContext mdc = null;
public CategoriesMDB() {
}

public void setMessageDrivenContext(MessageDrivenContext mdc) {
   this.mdc = mdc;
}

public void ejbCreate() {
}

public void onMessage(Message inMessage) {
    TextMessage msg = null;

    try {

        if (inMessage instanceof TextMessage) {
            msg = (TextMessage) inMessage;
         CategoryData.getInstance().addCat(msg.getText());

        }
     } catch (Exception e) {
        e.printStackTrace();
    }
} 

public void ejbRemove() {
}

}
Parent configuration and class loading
Make sure to set the parentId attribute of the application element in geronimo-application.xml to org/apache/geronimo/SystemJMS. This will ensure that you are using the class loader of the JMS RA. If you set your parentId to org/apache/geronimo/System instead, you will encounter class-loading problem similar to java.lang.ClassNotFoundException: org.activemq.ra.ActiveMQActivationSpec. This is the most common mistake you are likely to make when deploying MDB with Geronimo.

The MDB code is simple. The highlighted lines in Listing 7 are where the work is performed. When a JMS message is received at the queue, Geronimo will:

  1. Pick up the message.
  2. Activate an instance of this MDB.
  3. Call the MDB’s onMessage() method, passing the received message as an argument of the method.

You need to configure the MDB with the associated JMS destination (or queue) in the deployment descriptor, ejb-jar.xml. See an example of this in Listing 7.

In Listing 8, the SendReceiveQueue is associated with MDB through an <activation-config> subelement in the <message-driven> element. To select the JMS RA that will be used, create a custom openejb=jar.xml similar to Listing 8.
Listing 8. Custom deployment plan, openejb-jar.xml, to select JMS RA

<?xml version="1.0"?>
<openejb-jar xmlns="http://www.openejb.org/xml/ns/openejb-jar"
configId="catmdb"
parentId="org/apache/geronimo/SystemJMS"
>
<enterprise-beans>
  <message-driven>
        <ejb-name>CatMDB</ejb-name>
        <resource-adapter>
           <resource-link>ActiveMQ RA</resource-link>
         </resource-adapter>
   </message-driven>
</enterprise-beans>
</openejb-jar>

In Listing 8, the default system-scoped RA is selected by name using the <resource-link> element. See the sidebar Parent configuration and class loading for information on the need for a geronimo-application.xml deployment plan.

To try out this example, deploy the reallybigpet.ear application, located in the ear_ejb/dist directory. To access the store, point your browser to http://localhost:8080/ReallyBigPetStore/store.cgi. To access the category add form, point your browser to http://localhost:8080/ReallyBigPetStore/sendform.cgi.

Back to top

Conclusion

ActiveMQ provides quintessential JMS service for Geronimo, while Geronimo provides ActiveMQ with JDBC-based persistence, life cycle management (through JCA 1.5 compliance), security, transaction, and work management. This symbiotic relationship allows Geronimo users to enjoy the best of both worlds: access to standardized J2EE 1.4 JMS and MDB facilities enhanced by ActiveMQ’s rich transports and client support.


Asynchronous calls and remote callbacks using Lingo Spring Remoting

Filed under: Apache — admin @ 4:22 pm

Lingo is the only Spring Remoting implementation that supports asynchronous calls and remote callbacks. Today I’ll cover all the nitty gritty details of the async/callback related functionality along with the limitations and gotchas.Asynchronous method invocation and callback support by Lingo is an awesome feature and there are several usecases where these are an absolute must. Lets consider a simple and rather common use case : You have a server side application (say an optimizer) for which you want you write a remote client API. The API has methods like solve() which are long running and methods like cancel() which stops the optimizer solve.

A synchronous API under such circumstances is not really suitable since the solve() method could take a really long time to complete. It could be implemented by having the client code spawn their own thread and do its own exception management but this becomes really kludgy. Plus you have to worry out network timeout issues. You might be thinking “I’ll just use JMS if I need an asynchronous programming model”. You could use JMS but think about the API you’re exposing. Its going to be a generic JMS API where the client is registering JMS listeners, and sending messages to JMS destinations using the JMS API. Compare this to a remote API where the client is actually working with the Service interface itself.

Lingo combines the elegance of Spring Remoting with the ability to make asynchronous calls. Lets continue with our Optimizer example and implement a solution using Lingo and Spring. OptimizerService interface

public interface OptimizerService {
    void registerCallback(OptimizerCallback callback) throws OptimizerException;

    void solve();

    void cancel() throws OptimizerException;
}

The solve() method is asynchronous while the cancel() and registerCallback(..) methods are not. Asynchronous methods by convention must not have a return value and also must not throw exceptions. The registerCallback(..) method registers a client callback with the Optimizer. In order to make an argument be a remote callback, the argument must implement java.util.EventListener or java.rmi.Remote. In this example the OptimizerCallback interface extends java.util.EventListener. If the argument does not implement either of these interfaces, it must implement java.io.Serializable and it will then be passed by value.

OptimizerCallback interface

public interface OptimizerCallback extends EventListener {

    void setPercentageComplete(int pct);

    void error(OptimizerException ex);

    void solveComplete(float solution);
}

The callback API has a method for the Optimizer to set the percentage complete, report an error during the solve() process (remember that the solve() method is asynchronous so it cannot throw an exception directly) and finally the solveComplete(..) callback to inform the client that the solve is complete along with the solution.

OptimizerService implementation

public class OptimizerServiceImpl implements OptimizerService {

    private OptimizerCallback callback;
    private volatile boolean cancelled = false;

    private static Log LOG = LogFactory.getLog(OptimizerServiceImpl.class);

    public void registerCallback(OptimizerCallback callback) {
        LOG.info(“registerCallback() called …”);
        this.callback = callback;
    }

    public void solve() {
        LOG.info(“solve() called …”);
        float currentSolution = 0;

        //simulate long running solve process
        for (int i = 1; i <= 100; i++) {
            try {
                currentSolution += i;
                Thread.sleep(1000);
                if (callback != null) {
                    callback.setPercentageComplete(i);
                }
                if (cancelled) {
                    break;
                }
            } catch (InterruptedException e) {
                System.err.println(e.getMessage());
            }
        }
        callback.solveComplete(currentSolution);

    }

    public void cancel() throws OptimizerException {
        LOG.info(“cancel() called …”);
        cancelled = true;
    }
}

The solve() method sleeps for a while and makes the call setPercentageComplete(..) on the callback registered by the client. The code is pretty self explanatory here.

Optimizer Application context - optimizerContext.xmlWe now need to export this service using Lingo Spring Remoting. The typical Lingo Spring configuration as described in the Lingo docs and samples is :

<?xml version=“1.0″ encoding=“UTF-8″?>
<!DOCTYPE beans PUBLIC “-//SPRING//DTD BEAN//EN” “http://www.springframework.org/dtd/spring-beans.dtd”>

<beans>
    <bean id=“optimizerServiceImpl” class=“org.sanjiv.lingo.server.OptimizerServiceImpl” singleton=“true”/>

    <bean id=“optimizerServer” class=“org.logicblaze.lingo.jms.JmsServiceExporter” singleton=“true”>
        <property name=“destination” ref=“optimizerDestination”/>
        <property name=“service” ref=“optimizerServiceImpl”/>
        <property name=“serviceInterface” value=“org.sanjiv.lingo.common.OptimizerService”/>
        <property name=“connectionFactory” ref=“jmsFactory”/>
    </bean>

    <!– JMS ConnectionFactory to use –>
    <bean id=“jmsFactory” class=“org.activemq.ActiveMQConnectionFactory”>
        <property name=“brokerURL” value=“tcp://localhost:61616″/>
        <property name=“useEmbeddedBroker”>
            <value>true</value>
        </property>
    </bean>

    <bean id=“optimizerDestination” class=“org.activemq.message.ActiveMQQueue”>
        <constructor-arg index=“0″ value=“optimizerDestinationQ”/>
    </bean>
</beans>

In this example, I’m embedding a JMS broker in the Optimizer process. However you are free to use an external JMS broker and change the JMS Connection Factory configuration appropriately.

Note : The above optimizerContext.xml it the typical configuration in the Lingo docs/examples
but is not the ideal configuration. It has some serious limitations which I'll cover in a bit
along with the preferred  "server" configuration.

OptimizerServer The “main” class that exports the OptimizerService simply needs to instantiate the “optimizerServer” bean in the optimizerContent.xml file.

public class OptimizerServer {

    public static void main(String[] args) {
        if (args.length == 0) {
            System.err.println(“Usage : java org.sanjiv.lingo.server.OptimizerServer <config file>”);
            System.exit(-1);
        }
        String applicationContext = args[0];

        System.out.println(“Starting Optimizer …”);
        FileSystemXmlApplicationContext ctx = new FileSystemXmlApplicationContext(applicationContext);

        ctx.getBean(“optimizerServer”);

        System.out.println(“Optimizer Started.”);

        ctx.registerShutdownHook();
    }
}

The ClientIn order for the client to lookup the remote OptimizerService, we need to configure the client side Spring application context as follows : Client Application Context - clientContext.xml

<?xml version=“1.0″ encoding=“UTF-8″?>
<!DOCTYPE beans PUBLIC “-//SPRING//DTD BEAN//EN” “http://www.springframework.org/dtd/spring-beans.dtd”>

<beans>
    <bean id=“optimizerService” class=“org.logicblaze.lingo.jms.JmsProxyFactoryBean”>
        <property name=“serviceInterface” value=“org.sanjiv.lingo.common.OptimizerService”/>
        <property name=“connectionFactory” ref=“jmsFactory”/>
        <property name=“destination” ref=“optimizerDestination”/>

        <!– enable async one ways on the client –>
        <property name=“remoteInvocationFactory” ref=“invocationFactory”/>
    </bean>

    <!– JMS ConnectionFactory to use –>
    <bean id=“jmsFactory” class=“org.activemq.ActiveMQConnectionFactory”>
        <property name=“brokerURL” value=“tcp://localhost:61616″/>
    </bean>

    <bean id=“optimizerDestination” class=“org.activemq.message.ActiveMQQueue”>
        <constructor-arg index=“0″ value=“optimizerDestinationQ”/>
    </bean>

    <bean id=“invocationFactory” class=“org.logicblaze.lingo.LingoRemoteInvocationFactory”>
        <constructor-arg>
            <bean class=“org.logicblaze.lingo.SimpleMetadataStrategy”>
                <!– enable async one ways –>
                <constructor-arg value=“true”/>
            </bean>
        </constructor-arg>
    </bean>
</beans>

Now all a client needs to do to is obtain a handle of the remote OptimizerService by looking up the bean “optimizerService” configured in clientContext.xml.

OptimizerCallback implementationBefore going over the sample Optimizer client code, lets first write a sample implementation of the OptimizerCallback interface - one which the client will register with the remote Optimizer by invoking the registerCallback(..) method.

public class OptimizerCallbackImpl implements OptimizerCallback {

    private boolean solveComplete = false;
    private OptimizerException callbackError;
    private Object mutex = new Object();

    public void setPercentageComplete(int pct) {
        System.out.println(“+++ OptimzierCallback :: “ + pct + “% complete..”);
    }

    public void error(OptimizerException ex) {
        System.out.println(“+++ OptimzierCallback :: Error occured during solve” + ex.getMessage());
        callbackError = ex;
        solveComplete = true;
        synchronized (mutex) {
            mutex.notifyAll();
        }
    }

    public void solveComplete(float soltion) {
        System.out.println(“+++ OptimzierCallback :: Solve Complete with answer : “ + soltion);
        solveComplete = true;
        synchronized (mutex) {
            mutex.notifyAll();
        }
    }

    public void waitForSolveComplete() throws OptimizerException {
        while (!solveComplete) {
            synchronized (mutex) {
                try {
                    mutex.wait();
                    if (callbackError != null) {
                        throw callbackError;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }
}

OptimizerClient

public class OptimizerClient {

    public static void main(String[] args) throws InterruptedException {

        if (args.length == 0) {
            System.err.println(“Usage : java org.sanjiv.lingo.client.OptimizerClient <clientContext.xml>”);
            System.exit(-1);
        }

        String applicationContext = args[0];
        FileSystemXmlApplicationContext ctx = new FileSystemXmlApplicationContext(applicationContext);

        OptimizerService optimizerService = (OptimizerService) ctx.getBean(“optimizerService”);
        OptimizerCallbackImpl callback = new OptimizerCallbackImpl();

        try {
            optimizerService.registerCallback(callback);
            System.out.println(“Client :: Callback Registered.”);

            optimizerService.solve();
            System.out.println(“Client :: Solve invoked.”);

            Thread.sleep(8 * 1000);
            System.out.println(“Client :: Calling cancel after 8 seconds.”);

            optimizerService.cancel();
            System.out.println(“Client :: Cancel finished.”);
            //callback.waitForSolveComplete();
 
        } catch (OptimizerException e) {
            System.err.println(“An error was encountered : “ + e.getMessage());
        }
    }
}

The test client registers a callback and calls the asynchronous method solve(). Note that the solve method in our sample OptimizerService implementation takes ~100 seconds to complete. The client then prints out the message “Client :: Solve invoked.”. If the solve() call is indeed invoked asynchronously by Lingo under the hoods, this message should be printed to console immediately and not after 100 seconds. The client then calls cancel() after 8 seconds have elapsed.

Here’s the output when we run the Optimizer Server and Client

Notice that the solve method has been called asynchronously and after 8 seconds the client makes the cancel() call however the server does not seem to be receiving this call and continues with its setPercentageComplete(..) callback.

I asked this question on the Lingo mailing list but did not get a response. This misbehaviour was pretty serious because what this meant was that while an asynchronous call like solve() was executed asynchronously by the client, the client was not able to make another call like cancel() until the solve() method completed execution on the server… which defeats the purpose of a method like cancel().

Lingo and ActiveMQ are open source so I rolled up my sleeves and ran the whole thing through a debugger. Debugging multithreaded applications can get tricky but after spending several hours I was able to get the to bottom of this issue.

Recollect that we exported the OptimizerSericve using the class org.logicblaze.lingo.jms.JmsServiceExporter in optimizerContext.xml. On examining the source, I found that this class creates a single JMS Session which listens for messages on the configured destination (”optimizerDestinationQ” in our example) and when messages are received, it invokes a Lingo listener which does the translation of the inbound message into a method invocation on the exported OptimizerServiceImpl service object.

The JMS spec clearly states

A Session object is a single-threaded context for producing and consuming messages.
...
It serializes execution of message listeners registered with its message consumers.

Basically a single JMS Session is not suitable for receiving concurrent messages. I understood why the cancel() method wasn’t being invoked until the solve() method completed. But this behavior still didn’t make sense from an API usage perspective.

Fortunately Spring 2.0 added support classes for receiving concurrent messages which is exactly what we need (yep, Spring rocks!). There are a few different support classes like DefaultMessageListenerContainer, SimpleMessageListenerContainer, and ServerSessionMessageListener .

The ServerSessionMessageListenerContainer “dynamically manages JMS Sessions, potentially using a pool of Sessions that receive messages in parallel”. This class “builds on the JMS ServerSessionPool SPI, creating JMS ServerSessions through a pluggable ServerSessionFactory”.

I tried altering optimizerContext.xml to use this class optimizerContextPooledSS.xml

<?xml version=“1.0″ encoding=“UTF-8″?>
<!DOCTYPE beans PUBLIC “-//SPRING//DTD BEAN//EN” “http://www.springframework.org/dtd/spring-beans.dtd”>

<beans>
    <bean id=“optimizerServiceImpl” class=“org.sanjiv.lingo.server.OptimizerServiceImpl” singleton=“true”>
    </bean>

    <bean id=“optimizerServerListener” class=“org.logicblaze.lingo.jms.JmsServiceExporterMessageListener”>
        <property name=“service” ref=“optimizerServiceImpl”/>
        <property name=“serviceInterface” value=“org.sanjiv.lingo.common.OptimizerService”/>
        <property name=“connectionFactory” ref=“jmsFactory”/>
    </bean>

    <bean id=“optimizerServer” class=“org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer”>
        <property name=“destination” ref=“optimizerDestination”/>
        <property name=“messageListener” ref=“optimizerServerListener”/>
        <property name=“connectionFactory” ref=“jmsFactory”/>
    </bean>

    <!– JMS ConnectionFactory to use –>
    <bean id=“jmsFactory” class=“org.activemq.ActiveMQConnectionFactory”>
        <property name=“brokerURL” value=“tcp://localhost:61616″/>
        <property name=“useEmbeddedBroker”>
            <value>true</value>
        </property>
    </bean>

    <bean id=“optimizerDestination” class=“org.activemq.message.ActiveMQQueue”>
        <constructor-arg index=“0″ value=“optimizerDestinationQ”/>
    </bean>
</beans>

Unfortunately the behavior was still the same - cancel() was not executing on the server until solve() completed. I posted this question on the Spring User list but did not get a response. This class uses the ServerSessionPool SPI so I’m not sure if there is a problem with the Spring class, the ActiveMQ implementation of this SPI or something that I’ve done wrong.

Anyway I was able to successfully configure the DefaultMessageListenerContainer class and observed the desired behavior. In contrast to ServerSessionMessageListenerContainer, DefaultMessageListenerContainer “creates a fixed number of JMS Sessions to invoke the listener, not allowing for dynamic adaptation to runtime demands”. While ServerSessionMessageListenerContainer would have been ideal, DefaultMessageListenerContainer is good enough for most use cases as you’d typically want to have some sort of thread pooled execution on the server anyways.

optimizerContextPooled.xml

<?xml version=“1.0″ encoding=“UTF-8″?>
<!DOCTYPE beans PUBLIC “-//SPRING//DTD BEAN//EN” “http://www.springframework.org/dtd/spring-beans.dtd”>

<beans>

    <bean id=“optimizerServiceImpl” class=“org.sanjiv.lingo.server.OptimizerServiceImpl” singleton=“true”>
    </bean>

    <bean id=“optimizerServerListener” class=“org.logicblaze.lingo.jms.JmsServiceExporterMessageListener”>
        <property name=“service” ref=“optimizerServiceImpl”/>
        <property name=“serviceInterface” value=“org.sanjiv.lingo.common.OptimizerService”/>
        <property name=“connectionFactory” ref=“jmsFactory”/>
    </bean>

    <bean id=“optimizerServer” class=“org.springframework.jms.listener.DefaultMessageListenerContainer”>
        <property name=“concurrentConsumers” value=“20″/>
        <property name=“destination” ref=“optimizerDestination”/>
        <property name=“messageListener” ref=“optimizerServerListener”/>
        <property name=“connectionFactory” ref=“jmsFactory”/>
    </bean>

    <!– JMS ConnectionFactory to use –>
    <bean id=“jmsFactory” class=“org.activemq.ActiveMQConnectionFactory”>
        <property name=“brokerURL” value=“tcp://localhost:61616″/>
        <property name=“useEmbeddedBroker”>
            <value>true</value>
        </property>
    </bean>

    <bean id=“optimizerDestination” class=“org.activemq.message.ActiveMQQueue”>
        <constructor-arg index=“0″ value=“optimizerDestinationQ”/>
    </bean>

</beans>
Note : Although some Lingo examples have the destination created as a Topic(ActiveMQTopic)
with the org.logicblaze.lingo.jms.JmsServiceExporter class, you must use a Queue when
using multiple JMS sessions for concurrent message retreival as a Topic will be received
by all listeners which is not what we want.

Here’s the result when using applicationContextPooled.xml

You can download the complete source for this here and run the sample server and client. JRoller doesn’t allow uploading .zip files so I’ve uploaded the sample as a .jar file instead. The source distribution has a Maven 1.x project file. To build, simply run “maven”. To run the optimizer sever without pooled JMS listeners, run startOptimizer.bat under dist/bin/. To run with pooled JMS listeners, run startOptimizerPooled.bat and to run the test client, run startClient.bat

I am using this architecture to provide a remote API for our C++ optimizer. The C++ optimizer has a thin JNI layer which loads the Spring application context file and the OptimizerServiceImpl has a bunch of native methods which is tied to the underlying C++ optimizer functionality using the JNI function RegisterNatives(). Do you Lingo? I’d like to hear how others are using Lingo/Spring Remoting.

Posted by sjivan


GlassFish V2 and ActiveMQ 4.1

Filed under: Apache — admin @ 4:20 pm

The previous blog entries showed how JMS providers like Jboss Messaging and MantaRay could be used with GlassFish. ActiveMQ is also one such JMS provider http://activemq.apache.org/ , the following steps describe the configurations required to use ActiveMQ with GlassFish.

  1.  Install GlassFish V2 and ActiveMQ 4.1
    GlassFish V2 : https://glassfish.dev.java.net/downloads/v2-b33e.html
    ActiveMQ 4.1 : http://activemq.apache.org/activemq-410-release.html
  2. Modify the glassfish domain’s (default domain is domain1) classpath to add ActiveMQ4.1 jars located in ActiveMQ installation lib directory. The asadmin GUI could be used to modify a domain’s classpath. Open a browser and type the url of the application server admin GUI - http://hostname:adminport. Go to Application Server -> JVM Settings -> Path Settings . Add an entry for the jar files shown below [comma-separated as shown below] in the classpath suffix. Restart the application server domain for these changes to take effect.
      1. activemq-core.jar
      2. activeio.jar
      3. commons-logging.jar
      4. backport-util-concurrent.jar
  3.  Start the Active MQ - please refer to http://activemq.apache.org/run-broker.html
  4. Create the required destinations : http://activemq.apache.org/how-do-i-create-new-destinations.html shows how destinations can be created in ActiveMQ. 2 queue destinations are required, “Receive” from which we will receive the messages and “Send” to which we will respond back from our MDB.
  5. Create the jndi bindings : Create a File system JNDI object store to bind ActiveMQ JMS administered objects.  The following link shows a code snippet that creates a FS object store and binds the required ActiveMQ objects to the jndi tree.
    http://weblogs.java.net/blog/rampsarathy/archive/Main.java
  6. Create the resource adapter configuration :
    asadmin create-resource-adapter-config –user <adminname> –password <admin password> –property SupportsXA=true:ProviderIntegrationMode=jndi:RMPolicy=OnePerPhysicalConnection:
    JndiProperties=java.naming.factory.initial\\=com.sun.jndi.fscontext.RefFSContextFactory
    java.naming.provider.url\\=file://space/activemqobjects:LogLevel=FINEST genericra
  7. Deploy the resource adapter using the asadmin deploy command, as shown below. In the image above, see Generic JMS RA deployed in the application server.           $ asadmin deploy –user admin –password adminadmin <location of the generic resource adapter rar file>
    Generic JMS RA is present in ${GLASSFISH_HOME}/lib/addons/resourceadapters/genericjmsra/genericra.rar
  8. In order to configure a JMS Connection Factory, using the Generic Resource Adapter for JMS, a Connector connection pool and resources needs to be created in the application server, as shown below.
    #Creates a Connection Pool called inpool and points to XAQCF created in Active MQ
    asadmin create-connector-connection-pool –raname genericra connectiondefinition javax.jms.QueueConnectionFactory –transactionsupport  XATransaction –property ConnectionFactoryJndiName=activemqconnectionfactory inpool
    #Creates a Connection Pool called outpool and points to XATCF created in Active MQ
    asadmin create-connector-connection-pool –raname genericra connectiondefinition javax.jms.QueueConnectionFactory –transactionsupport  XATransaction –property ConnectionFactoryJndiName=activemqconnectionfactory outpool
    #Creates a connector resource named jms/inboundXAQCF and binds this resource to JNDI for applications to use.
    asadmin create-connector-resource –poolname inpool jms/inboundXAQCF
    Note: Though the inbound configuration of the RA happens through the activation specification, a pool has to be created to make sure that the transaction recovery happens when the application restarts. This is because the transaction manager does recovery only for connector resources that are registered in domain.xml.
    #Creates a connector resource named jms/outboundXAQCF and binds this resource to JNDI for applications to use.
    asadmin create-connector-resource –poolname outpool jms/outboundXAQCF
  9. For JMS Destination Resources, an administered object needs to be created. jms/inqueue [pointing to Generic JMS RA and Receive] created in the application server.
    #Creates a javax.jms.Queue Administered Object and binds it to application server’s JNDI tree at jms/inqueue and points to inqueue created in ActiveMQ.
    asadmin create-admin-object –raname genericra –restype javax.jms.Queue –property DestinationJndiName=Receive jms/inqueue
    #Creates a javax.jms.Topic Administered Object and binds it to application server’s JNDI tree at jms/outqueue and points to outqueue created in ActiveMQ.
    asadmin create-admin-object –raname genericra –restype javax.jms.Queue –property DestinationJndiName=Send  jms/outqueue
  10. Deployment descriptors:
    The deployment descriptors need to take into account the resource adapter and the connection resources that have been created. A sample sun-ejb-jar.xml for a Message Driven Bean that listens to a destination called inqueue  in ActiveMQ, and publishes back reply messages to a destination resource named jms/outqueue  is available here
    http://weblogs.java.net/blog/rampsarathy/archive/sun-ejb-jar.xml
  11. The business logic encoded in Message Driven Bean could then lookup the configured QueueConnectionFactory/Destination resource to create a connection and reply to the received message.

Integrating Rails and ActiveMQ with ActiveMessaging/REST

Filed under: Apache — admin @ 4:19 pm

ActiveMQ ships with a simple chat demo. The chat demo uses a Servlet to relay AJAX requests to and from a JMS Topic (the source for this application is also a good place to start if you are interested in learning about Jetty Continuations). Below are some instructions on how to build an alternate client with Rails for the chat demo. This is done via activemessaging(AKA a13g), a plugin which relays/dispatches the messages to the Servlet (or any other “broker”).

Assuming Java, Ruby and Rails …

  $rails -v
    Rails 1.2.3
  $ruby -v
    ruby 1.8.5
  $java -version
    java version "1.6.0_01"

Setting Up ActiveMQ

Download and unzip the binary. Start the broker and demo application by running /apache-activemq-5.0-SNAPSHOT/bin/activemq . This only works with a 20070623 (or later) build of ActiveMQ. For an earlier build, see this patch.

Create a Rails application

  $rails a13g
    (output excluded)

Generate a Controller and a View

  $cd a13g
  $script/generate controller talk index
     exists  app/controllers/
     exists  app/helpers/
     create  app/views/talk
     exists  test/functional/
     create  app/controllers/talk_controller.rb
     create  test/functional/talk_controller_test.rb
     create  app/helpers/talk_helper.rb
     create  app/views/talk/index.rhtml

Make a13g/app/controllers/talk_controller.rb look like this:

  class TalkController < ApplicationController

    include ActiveMessaging::MessageSender
    publishes_to :chat_client

    def new
      @message = params[:message]
      publish :chat_client, @message
      render :action => 'index'
    end

  end

Make a13g/app/views/talk/index.rhtml look like this:

  <%= start_form_tag :action => 'new' %>
     <%=text_field_tag :message %>
    <%= submit_tag “submit” %>
  <%= end_form_tag %>

Install the a13g plugin and it’s dependencies

  $script/plugin install http://activemessaging.googlecode.com/
svn/trunk/plugins/activemessaging
    (output excluded)
  $gem install daemons
    (output excluded)
  $gem install stomp
    (output excluded)

Create a Processor

  $script/generate processor ChatClient
    create  app/processors
    create  app/processors/chat_client_processor.rb
    create  test/functional/chat_client_processor_test.rb
    create  config/messaging.rb
    create  config/broker.yml
    create  app/processors/application.rb
    create  script/poller

An a13g Processor is similar to a MessageConsumer. Make the a13g/app/processors/chat_client_processor.rb look like this:

  class ChatClientProcessor < ApplicationProcessor

    subscribes_to :chat_client

    def on_message(message)
      logger.debug "ChatClientProcessor received: " + message.to_s
    end
  end

Configuring a13g

Destinations are mapped to symbols using messaging.rb (BTW, a13g committers - if you are reading this, please consider changing Gateway.queue to Gateway.destination, for reasons demonstrated in this post ). Make a13g/config/messaging.rb look like this:

  ActiveMessaging::Gateway.define do |s|
      s.queue :chat_client, 'topic://CHAT.DEMO'
  end

Copy this a13g adapter to a13g/vendor/plugins/activemessaging/
lib/activemessaging/adapters/rest.rb .
Tell a13g to use the adapter by making a13g/config/broker.yml look like this:

  development:
    adapter: rest
    port: 8161
    app: "/demo/amq"
    host: localhost

Start Rails and a13g

  $script/server
    (output excluded)
  $script/poller run
    (output excluded)
    Subscribing to topic://CHAT.DEMO (processed by ChatClientProcessor)
    Subscribed to topic://CHAT.DEMO, using session id of 1swb9sf6yi1ke

The rest adapter establishes an HTTP session. The session id will be used for all subsequent communication (publishing and consuming).

Let’s Chat

In your browser go to the chat demo and start a conversation.

Looking at the console for the activemessaging poller process, we can see that this message was received by the ChatClientProcessor.

  $script/poller run
    (output excluded)
    Subscribing to topic://CHAT.DEMO (processed by ChatClientProcessor)
    Subscribed to topic://CHAT.DEMO, using session id of 1swb9sf6yi1ke
    ChatClientProcessor received: I was thinking ... one day,
    they are going to standardize DSLs

By pointing a different browser window (different cookie/session) we can start an argument via the talk view.

Back in the original browser, we can see that chat messages are dispatched both ways.

The chat continues …

From the poller console …

  ChatClientProcessor received: I was thinking ... one day,
  they are going to standardize DSLs
  ChatClientProcessor received: No way.  Never.
  ChatClientProcessor received: Think about it.
  ChatClientProcessor received: Large conservative orgs will
  not be open to it.
  ChatClientProcessor received: True.
  ChatClientProcessor received: And the only way they will
  adopt DSLs is to go industry wide.
  ChatClientProcessor received: Think JCP, WS-*  ... only
  for *each* industry ....
  ChatClientProcessor received: Accounting, Finance,
  Logistics, etc.
  ChatClientProcessor received: Interesting