Asynchronous Processing with Camel 2.5 + JMS

Camel 2.5 isn’t quite out yet, but with that release you can use the new Asynchronous Processing feature.

I’ve been passing request-reply messages around with JMS for quite some time in Camel, and it’s always bothered me that threads have been tied up for the duration of the message.  Asynchronous Processing fixes that, if you choose to use it.

I don’t know what it is about Camel, though, but I have a hard time figuring out anything in terms of initially bootstrapping my routes and how that corresponds to actual Java code.  So since I got JMS + Asynchronous Processing to work, I thought I’d capture it here to help out. Here goes:

Route and Inline Consumer

// whenever  message is sent to a.test, our AsyncProcessor defined here will fire
from("jms:queue:a.test").process(new AsyncProcessor() {
    @Override public boolean process(final Exchange ex, final AsyncCallback cb) {
           String body = ex.getIn().getBody(String.class);
           ex.getOut().setBody("Response: " + body);
           cb.done(false);
           return false;
}
@Override public void process(final Exchange exchange) throws Exception {}
});

Sending message as client:

 // get endpoint for JMS queue defined in route above
 Endpoint endpoint = camelContext.getEndpoint("jms:queue:a.test");
 // get producer so that we can fire messages
 Producer producer = endpoint.createProducer();
 producer.start();

 // create test message
 final Exchange exchange = producer.createExchange();
 exchange.setPattern(ExchangePattern.InOut);
 DefaultMessage message = new DefaultMessage();
 message.setExchange(exchange);
 message.setBody("test");
 exchange.setIn(message);
 // this cast is safe because this must be a JmsProducer
 // .process() completes immediately; AsyncCallback instance passed in has done() called
 // when the consumer above has sent it's response.
 ((AsyncProcessor)producer).process(exchange, new AsyncCallback() {
      @Override public void done(final boolean b) {
          // our async callback. access original exchange for response body
          // should always check exchange.isFailed()
          System.out.println("Got response: " + exchange.getOut().getBody());
      }
 });

That’s it! It’s pretty straightforward once you know what to do.