Testing with ActiveMQ embedded broker

Introduction

This blog demonstrates what you may do to setup and run tests for a JMS message system using ActiveMQ’s embeeded broker. In general, you should focus on testing on your message producers and consumers. However you may also find it useful in some circumstances to write some integration tests to make sure that the various components of the message system are implemented correctly.

Setup ActiveMQ embedded broker

This section shows how to setup a embedded broker in ActiveMQ using an ActiveMQConnectionFactory. First, you need to create a BrokerService instance:

    protected BrokerService createBroker() throws Exception {
        BrokerService service = new BrokerService();
        service.setPersistent(false);
        service.setUseJmx(false);

        // Setup policy
     ...

        connector = service.addConnector(""tcp://localhost:61616");
        return service;
    }

It may be useful to setup the broker policy to match your system setup. For example, to configure broker to wait for consumers before dispatching starts, insert the following codes after “// Setup policy” comment above:

        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setConsumersBeforeDispatchStarts(2);
        policy.setTimeBeforeDispatchStarts(1000);
        policyMap.setDefaultEntry(policy);
        service.setDestinationPolicy(policyMap);

Now to create embedded broker using connection factroy:

        ActiveMQConnectionFactory connFactory = 
                  new ActiveMQConnectionFactory(connector.getConnectUri() 
                                           + "?jms.prefetchPolicy.all=1");
        connection = connFactory.createConnection();
        connection.start();

Now we are ready to start the broker. I put the codes altogether into an abstract class for JUnit:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/spring/app-root-test.xml" })
public abstract class AbstractActiveMQJMSTest {

    protected Connection connection;
    protected BrokerService broker;
    protected TransportConnector connector;
    protected int consumersBeforeDispatchStarts;
    protected int timeBeforeDispatchStarts;

    @Value("${jms.broker.url}")
    private String brokerUrl;

    @Before
    public void setUp() throws Exception {
        broker = createBroker();
        broker.start();
        ActiveMQConnectionFactory connFactory = 
              new ActiveMQConnectionFactory(connector.getConnectUri() 
                                       + "?jms.prefetchPolicy.all=1");
        connection = connFactory.createConnection();
        connection.start();
    }

    @After
    public void tearDown() throws Exception {
        connector.stop();
        connection.close();
        broker.stop();
    }

    protected BrokerService createBroker() throws Exception {
       ... // omitted to save space. See codes above
    }

Writing Tests

Writing tests using the above abstract class is straight forward. For example, the codes below demonstrate how to test that all messages from a producer are dispatched to the same consumer (i.e. exclusive consumer). Note this is a make-up test case for the purpose of demonstrating how a unit test can be written and run using the embedded broker in ActiveMQ. In practice you would only need to write unit tests to check message headers are setup properly in the producer to ensure exclusive customer feature is setup correctly.

public class ExclusiveCustomerMessagingTest extends AbstractActiveMQJMSTest {

    @Autowired
    private ISomeProvider provider;

    @Test
    public void testStickyConsumer() throws Exception {
        for (int i = 0; i < 5; i++) {
            provider.send(someMessage);
        }

        // now create some consumers
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(1);

        Destination dest = new ActiveMQQueue("foo-queue");
        MessageConsumerGroupThread worker = 
             new MessageConsumerGroupThread(connection, dest, "foo", 
                                        startSignal, doneSignal, 5);

        MessageConsumerGroupThread worker2 = 
             new MessageConsumerGroupThread(connection, dest, "foo2",
                                         startSignal, doneSignal, 5);

        Thread workerThread = new Thread(worker);
        workerThread.start();
        Thread workerThread2 = new Thread(worker2);
        workerThread2.start();

        startSignal.countDown();
        doneSignal.await(); // this blocks until workers are done

        List<Message> msg = worker.getMessagesReceived();
        List<Message> msg2 = worker2.getMessagesReceived();

        // Test sticky customer
        assertEquals(5, msg.size() + msg2.size());
        assertTrue(msg2.size() == 5 || msg.size() == 5);

        worker.setStop(true);
        worker2.setStop(true);
    }
}

Note MessageConsumerGroupThread class implements Runnable and loops until a prescribed number of messages (5) from the destination have been handled by the internal message consumer or being stopped by the calling test method.

Resources

  1. ActiveMQ documentation on the differnt approaches one can use to create an embedded broker.
  2. ActiveMQ javadoc API.
About these ads

About Raymond Lee
Professional Java/EE Developer, software development technology enthusiast.

One Response to Testing with ActiveMQ embedded broker

  1. James Elsey says:

    Great tutorial, any possibility that you could provide some information on the workings of the MessageConsumerGroupThread’s run() implementation? It’s not obvious how to handle the receiving of the messages from either the dest or the connection.