Using ActiveMQ JMS from C#

Wishing to have a publish/subscribe message queue available in .NET, I have looking for ways to allow .NET to talk to a JMS implementation. After reading the article An Introduction to IKVM by Avik Sengupta, I took the latest release of ActiveMQ and IKVM, complied the jar files (as described in the article), and modified one of the examples from the ActiveMQ distribution and created a C# version (see below). Everything seems to work. The example submits items to the queue, and them retrieves them.

using System;
using System.Threading;
using ikvm.lang;
using org.codehaus.activemq;
using org.codehaus.activemq.util;
using org.codehaus.activemq.message;
using javax.jms;
using java.util;

namespace DefaultNamespace
{
    class MainClass : MessageListener
    {
        protected int messageCount = 100;
        protected String[] data;
        protected ActiveMQConnectionFactory connectionFactory;
        protected Session session;
        protected MessageConsumer consumer;
        protected MessageProducer producer;
        protected Destination destination;
        protected Connection connection;

        protected Connection receiveConnection;
        protected Session receiveSession;

        protected List messages = Collections.synchronizedList(new ArrayList());
        protected bool topic = true;

        protected ActiveMQMessage createMessage()
        {
            return new ActiveMQMessage();
        }

        protected Destination createDestination(String subject)
        {
            return new ActiveMQTopic(subject);
        }

        public void testSendReceive()
        {
            messages.clear();

            for (int i = 0; i < data.Length; i++)
            {
                Message message = session.createTextMessage(data[i]);

                Console.WriteLine(“About to send a message: “ + message + ” with text: “ + data[i]);

                producer.send(destination, message);
            }

            // lets wait a little while
            Thread.Sleep(4000);

            Console.WriteLine(“should have received a message: {0} {1} {2}”, messages, data.Length, messages.size());

            for (int i = 0; i < data.Length; i++)
            {
                TextMessage received = (TextMessage) messages.get(i);
                String text = received.getText();

                Console.WriteLine(“Received Text: “ + text);

            }
        }

        public MainClass()
        {
            topic = true;

            data = new String[messageCount];
            for (int i = 0; i < messageCount; i++)
            {
                data[i] = “Text for message: “ + i + ” at “ + DateTime.Now.ToLongTimeString();
            }

            connectionFactory = new ActiveMQConnectionFactory();
            connectionFactory.setBrokerURL(“vm://localhost”);
            connectionFactory.setUseEmbeddedBroker(true);

            connection = connectionFactory.createConnection();
            receiveConnection = connectionFactory.createConnection();

            Console.WriteLine(“Created connection: “ + connection);

            session = connection.createSession(false, 1);
            receiveSession = receiveConnection.createSession(false, 1);

            Console.WriteLine(“Created session: “ + session);
            producer = session.createProducer(null);

            Console.WriteLine(“Created producer: “ + producer);

            if (topic)
            {
                destination = receiveSession.createTopic(“FOO.BAR”);
            }
            else
            {
                destination = session.createQueue(“FOO.QUEUE”);
            }

            consumer = receiveSession.createConsumer(destination);
            consumer.setMessageListener(this);
            receiveConnection.start();
                connection.start();

            Console.WriteLine(“Created connection: “ + connection);

            testSendReceive();

            Console.WriteLine(“Dumping stats…”);
            connectionFactory.getFactoryStats().dump(new IndentPrinter());

            Console.WriteLine(“Closing down connection”);

            session.close();
            receiveSession.close();

            connection.close();
            receiveConnection.close();

            connectionFactory.stop();
            connection.close();

        }

        public void onMessage(Message message)
        {
            Console.WriteLine(“Received message: “ + message);

            messages.add(message);
        }

        public static void Main(string[] args)
        {
            MainClass mc = new MainClass();
        }
    }
}

Comments are closed.