Implementing a Message Bus Using RabbitMQ and Spring Integration

By Willie Wheeler, John Wheeler, and Joshua White, authors of Spring in Practice

The point-to-point integration approach breaks down because it scales as O (n2) in the number of applications to be integrated. If you have more than a handful of applications to integrate, you'll need to manage a lot of development, configuration, and operational linkages. In this technique, based on chapter 13 of Spring in Practice, the authors show how you can use a centralized messaging infrastructure to alleviate much of the pain.

Prerequisites

Familiarity with the integration domain and Enterprise Integration Patterns in particular; see www.eaipatterns.com/ for background.

Key technologies

SI, RabbitMQ, Spring Rabbit, Advanced Message Queuing Protocol (AMQP), Spring AMQP

Background

You use RESTful web service APIs as a way to decouple applications from one another. Web service APIs provide a layer of abstraction over underlying capabilities and data, which makes it easier to change the implementations without impacting clients. In addition, the RESTful approach supports decoupling by reducing the knowledge that clients must have of the services they consume.

Improvements are possible in two major areas. First, the apps still have to know quite a bit about each other from development, configuration, and operational perspectives. Second, the point-to-point integration strategy scales poorly as you incorporate different apps. This recipe presents a broker-based approach that addresses both of these issues.

Problem

Further decouple your apps, and address scalability issues associated with the point-to-point integration strategy.

Solution

The solution is to use a centralized message broker to serve as the basis for your application integrations. Message brokers are specifically designed to address application integration, and they solve the previous issues as follows:

There are lots of options for message brokers, but you'll use RabbitMQ, which implements the AMQP messaging protocol. The advantage over the Java Message Service (JMS) API is that using a protocol decouples messaging clients from the broker. With JMS, the clients are Java clients (although any given broker has APIs for other platforms as well). With AMQP, any platform with an AMQP client can communicate with the broker, in much the same way that any web browser can communicate with any web server, regardless of the client and server platforms. Because most platforms have AMQP clients, [1] AMQP has outstanding interoperability.

We won't go into the details of RabbitMQ; fortunately it's fast and easy to set up a development instance.[2] You can also consult RabbitMQ in Action by Alvaro Videla and Jason J.W. Williams (Manning, 2012) for further information.

You can use gateways to hide the messaging system from the help desk and portal apps. In this recipe, you'll realize the advantage of that approach: you'll throw away the point-to-point REST implementations entirely and replace them with SI–generated proxies that use AMQP to speak to RabbitMQ. [3]

Figure 1 Integrating applications via a centralized RabbitMQ message broker. This improves scalability and enhances decoupling.

Figure 1 shows the goal for this recipe. Let's get started by looking at message buses and canonical data models.

Message buses and canonical data models

You're going to use RabbitMQ to implement the message bus integration pattern. The idea behind this pattern is to provide a central medium through which applications can communicate with one another. Conceptually it's based on the hardware bus concept: plug in, and you're good to go. Hohpe and Woolf define a message bus as follows:

A Message Bus is a combination of a Canonical Data Model, a common command set, and a messaging infrastructure to allow different systems to communicate through a shared set of interfaces.
—Enterprise Integration Patterns, p. 139

You'll use message queues as the shared set of interfaces. But what's missing so far is the so-called canonical data model (CDM), which is the lingua franca that allows you to get away with O(n)—or even O(1)—message translations instead of O(n2). Now you'll standardize data representations by creating a separate Maven module for the CDM.

In real life there are sometimes significant business, technical, and organizational challenges surrounding the creation of a CDM, but you can ignore those because you're lucky enough to have a simple data model. You'll use XML for your format because it's widely supported, although JSON would be another plausible option. Ideally you'd create XML schemas for the model, but you won't mess around with that here. Instead you'll create new DTOs (the Spring HATEOAS DTOs are more oriented around RESTful web services), define XML bindings, and treat the implied schema as constituting your CDM.

You have a handful of message types, but it will suffice to look at one. The following listing shows the DTO for tickets.

Listing 1 Ticket.java: DTO for tickets

package coAZm.springinpractice.ch13.cdm;

import java.util.Date;
import javax.xml.bind.annotation.XmlAccessType; 
import javax.xml.bind.annotation.XmlAccessorType; 
import javax.xml.bind.annotation.XmlRootElement;

@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public class Ticket {
    private TicketCategory category; private TicketStatus status; 
    private String description;  
    private String createdBy;
    private Date dateCreated;

    public TicketCategory getCategory() { return category; }

    public void setCategory(TicketCategory category) {
        this.category = category;
    }

    public TicketStatus getStatus() { return status; }

    public void setStatus(TicketStatus status) { this.status = status; }

    public String getDescription() { return description; }

    public void setDescription(String description) {
        this.description = description;
    }

    public String getCreatedBy() { return createdBy; }

    public void setCreatedBy(String createdBy) {
        this.createdBy = createdBy;
    }

    public Date getDateCreated() { return dateCreated; }

    public void setDateCreated(Date dateCreated) {
        this.dateCreated = dateCreated;
    }
}

There isn't much to say here. It's a bare-bones DTO with some JAXB annotations to bind it to the CDM's XML representation.

You have DTOs for other message types as well, such as ticket categories, ticket statuses, customers, and so forth. Consult the sample code for details.

Now that you have a CDM in place along with a central set of DTOs, you need to make an interesting design decision. One possibility is for the existing apps to continue using their existing data models, and perform translations as messages enter and exit the bus. The other is for apps to adopt the central DTOs as their own data model, at least in cases where you have control over that (for example, internally developed apps).

In this case the choice is fairly clear because the Spring HATEOAS DTOs are more oriented to support RESTful web services. [4] The benefit is that you can avoid message translation between apps. You do need to modify the gateways to use the new DTOs, so let's do that now.

The gateway interfaces

You'll work on the portal side with the TicketGateway. It happens that the gateway interfaces are slightly leaky: through the DTOs they use, they expose the fact that you're designing for Spring Data REST-based implementations with URIs instead of database IDs. [5]

Let's replace the Spring HATEOAS DTOs with the ones you created for the CDM. The next listing presents the new TicketGateway.

Listing 2 TicketGateway.java: using CDM DTOs

package com.springinpractice.ch13.portal.integration.gateway;

import com.springinpractice.ch13.cdm.Ticket;
import com.springinpractice.ch13.cdm.TicketCategory;
import com.springinpractice.ch13.cdm.TicketCategory.TicketCategoryList;
import com.springinpractice.ch13.cdm.TicketStatus;

public interface TicketGateway {

    void createTicket(Ticket ticket); 

    TicketStatus findOpenTicketStatus();

    TicketCategoryList findTicketCategories();

    TicketCategory findTicketCategory(Long id);
}

See the sample code for a similar treatment of the PortalGateway. Let's turn now to the gateway implementations.

Reimplementing the portal's TicketGateway using Spring Integration

SI allows you to implement gateways dynamically. SI allows you to build integration logic that allows the portal app to send requests to other systems, and also to respond to requests from other systems. You can of course do the same thing for the help desk app. In effect, you can use SI to create app-specific adapters to the RabbitMQ messaging infrastructure. Review figure 1 for a visual.

This section focuses on implementing the portal app's outbound messages; that is, you'll implement the TicketGateway interface. To complete the circuit, you also need to handle inbound messages into the help desk, so you'll do that as well.

We won't cover requests originating from the help desk app because the logic involved is more of the same. Refer to the sample code if you want to see it.

Let's start by implementing the integration logic for the portal's self-service ticket creation feature.

Implementing self-service ticket creation: portal's outbound messaging

TicketGateway has a createTicket(Ticket) method that serves as a nice starting point because it's fairly straightforward. The idea is that the customer creates a ticket using the portal's web interface, and the portal passes it along to TicketGateway. Behind the scenes, the gateway does an asynchronous fire-and-forget at the messaging infrastructure, meaning that the call returns immediately. Later we'll look at the message-handling code on the help desk side, but to keep things simple let's focus on the portal's fire-and-forget code.

Figure 2 shows what this looks like using the EIP graphical language. Note that the Spring Tool Suite generates these diagrams automatically from the SI configuration files; click the IntegrationGraph tab in the configuration file editor.

Figure 2 A portal-integration pipeline supporting fire-and-forget ticket creation. The channel adapter pushes ticket-creation messages onto the bus.

The pipeline is straightforward. At the front end is a TicketGateway proxy that accepts requests from the application through the TicketGateway interface. It passes ticket creation requests to the AMQP channel adapter by way of a channel, and the channel adapter in turn pushes the message to a RabbitMQ exchange.[6] In the case of ticket creation, all of this is completely asynchronous, so control returns to the portal immediately after invoking the TicketGateway. The following listing shows how to implement the pipeline using SI, Spring Rabbit, and Spring AMQP.

Listing 3 beans-integration.xml: portal application

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:oxm="http://www.springframework.org/schema/oxm" 
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/
	             [CA]spring-context-3.1.xsd
        http://www.springframework.org/schema/integration   
        http://www.springframework.org/schema/integration/
	             [CA]spring-integration-2.2.xsd
        http://www.springframework.org/schema/integration/amqp       
        http://www.springframework.org/schema/integration/amqp/
	             [CA]spring-integration-amqp-2.2.xsd
        http://www.springframework.org/schema/oxm 
        http://www.springframework.org/schema/oxm/spring-oxm-3.1.xsd   
        http://www.springframework.org/schema/rabbit   
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd 
        ">

    <context:property-placeholder 
        location="classpath:/spring/environment.properties" />

    <rabbit:connection-factory id="rabbitConnectionFactory" 
username="${rabbitMq.username}" password="${rabbitMq.password}" /> <rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="createTicketRequest.queue" />
<rabbit:template id="amqpTemplate"
connection-factory="rabbitConnectionFactory" message-converter="marshallingMessageConverter" /> <bean id="marshallingMessageConverter"
class="org.springframework.amqp.support.converter. [CA]MarshallingMessageConverter" p:contentType="application/xml"> <constructor-arg ref="marshaller" /> </bean> <oxm:jaxb2-marshaller id="marshaller">
<oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.Ticket" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketCategory" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketStatus" /> </oxm:jaxb2-marshaller> <int:gateway
service-interface="com.springinpractice.ch13.portal.integration. [CA]gateway.TicketGateway" default-request-timeout="2000"> <int:method name="createTicket"
request-channel="createTicketRequestChannel" /> </int:gateway> <int:channel id="createTicketRequestChannel" />
<int-amqp:outbound-channel-adapter
amqp-template="amqpTemplate" channel="createTicketRequestChannel" routing-key="createTicketRequest.queue" /> </beans>

Quite a bit is happening in this listing, but you can break the configuration into three sections: RabbitMQ, Object/XML mapping (OXM), and SI.

First, the RabbitMQ configuration begins with a connection factory (#1). (Note that the default credentials for a fresh RabbitMQ installation are guest/guest.) You use <rabbit:admin/> at #2 to create queues dynamically if they don't already exist. At #3, you declare a single queue for ticket-creation requests.

At #4, you create a template for sending messages to Rabbit. This follows Spring's general practice of template-based communication with external systems and resources. The template uses a MarshallingMessageConverter (part of Spring AMQP) at #5 to perform OXM on message payloads. By default, the Rabbit template uses a SimpleMessageConverter, which handles strings, Serializable instances, and byte arrays. Because you want an XML-based CDM, you need a converter that performs OXM.

You configure a JAXB marshaller at #6, declaring the Ticket, TicketCategory, and TicketStatus DTOs for OXM binding. The MarshallingMessageConverter uses this marshaller.

The rest of the configuration is for SI. At #7, you define a dynamic proxy for the TicketGateway interface. The configuration at #8 routes tickets coming in through the createTicket() method to the createTicketRequestChannel (#9), where the AMQP outbound channel adapter (#10) receives it and pushes it to Rabbit's default exchange, because you haven't specified an exchange explicitly. This channel adapter, like all channel adapters, is unidirectional. (Gateways support bidirectional, request/reply messaging, but you don't require that here.) The channel adapter's routing key is set to createTicketRequest.queue, so the default exchange routes it to that queue. The message payload is ticket XML in the canonical format because the adapter uses the Rabbit template, which in turn uses the MarshallingMessageConverter.

That takes care of the fire-and-forget implementation of ticket creation on the portal side. Now there's a message with an XML ticket payload sitting in a request queue on your bus. The next step is to implement integration logic on the help desk side to receive and service the request.

Implementing self-service ticket creation: help desk's inbound messaging

This section shows how to process inbound ticket-creation requests. See figure 3 for a diagram showing how this works.

Figure 3 A help desk integration pipeline that receives ticket-creation messages from the bus and creates tickets in the help desk database

Here's what's happening. An inbound channel adapter receives the request from Rabbit, maps the ticket XML to a ticket DTO, and passes it to a processing chain. The chain is a wrapper around a linear sequence of endpoints, obviating the need to define explicit channels connecting the chain's members. The chain's first endpoint is a transformer (SI's terminology for EIP's message translator) that maps the DTO to a ticket entity. Then a service activator invokes the TicketRepository.save(TicketEntity) method to save the ticket to the database. The repository's save() method returns the saved instance, but the chain discards that message by dropping it onto the global nullChannel, which is essentially a black hole like /dev/null in Unix. Here's the configuration for the integration logic just described.

Listing 4 beans-integration.xml: help desk application

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"     
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:int-xml="http://www.springframework.org/schema/integration/xml"    
    xmlns:oxm="http://www.springframework.org/schema/oxm"   
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd   
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/
	             [CA]spring-context-3.1.xsd
        http://www.springframework.org/schema/integration   
        http://www.springframework.org/schema/integration/
	             [CA]spring-integration-2.2.xsd

        http://www.springframework.org/schema/integration/amqp       
        http://www.springframework.org/schema/integration/amqp/
	             [CA]spring-integration-amqp-2.2.xsd
        http://www.springframework.org/schema/integration/xml     
        http://www.springframework.org/schema/integration/xml/
	             [CA]spring-integration-xml-2.2.xsd
        http://www.springframework.org/schema/oxm 
           http://www.springframework.org/schema/oxm/spring-oxm-3.1.xsd      
           http://www.springframework.org/schema/rabbit 
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd    
           http://www.springframework.org/schema/util 
        http://www.springframework.org/schema/util/spring-util-3.1.xsd
        ">

    <context:property-placeholder
        location="classpath:/spring/environment.properties" />

    <rabbit:connection-factory id="rabbitConnectionFactory" 
username="${rabbitMq.username}" password="${rabbitMq.password}" /> <rabbit:admin connection-factory="rabbitConnectionFactory" /> <rabbit:queue name="createTicketRequest.queue" /> <bean id="marshallingMessageConverter class="org.springframework.amqp.support.converter. [CA]MarshallingMessageConverter" p:contentType="application/xml"> <oxm:jaxb2-marshaller id="marshaller">
<oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.Ticket" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketCategory" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketStatus" /> </oxm:jaxb2-marshaller> <context:component-scan base-package="com.springinpractice.ch13.
[CA]helpdesk.integration.transformer" /> <int-amqp:inbound-channel-adapter
queue-names="createTicketRequest.queue" channel="createTicketRequestChannel" message-converter="marshallingMessageConverter" /> <int:channel id="createTicketRequestChannel" />
<int:chain input-channel="createTicketRequestChannel"
output-channel="nullChannel"> <int:transformer ref="ticketTransformer" method="toEntity" />
<int:service-activator
[CA]expression="@ticketRepository.save(payload)" /> </int:chain> </beans>

As with the portal application, you have an initial RabbitMQ configuration (#1), although this time you don't need a template. You also have the OXM configuration (#2). This time around you have some transformers (more on that in a minute), so you scan for them at (#3).

In listing 3 you had an AMQP outbound channel adapter to send messages to the bus, so here you have the inbound counterpart (#4). The inbound channel adapter receives ticket-creation requests from createTicketRequest.queue and passes them via a channel (#5) to a chain (#6).

A chain is a linear sequence of endpoints connected by implicit channels. The first endpoint is a transformer (#7) that transforms the ticket DTO into a ticket entity, as you'll see. The second endpoint is a service activator I that saves the ticket entity to the Spring Data JPA ticket repository using a Spring Expression Language (SpEL) expression. The variables headers and payload are available for use, although you're using only payload here. The payload is the ticket entity that the transformer generated. The call to save() returns the saved entity, but you don't want to return that to the original caller; you send it to the global nullChannel (#8), which sends the message to a black hole.

Next is the transformer that converts ticket DTOs into ticket entities.

Listing 5 TicketTransformer.java

package com.springinpractice.ch13.helpdesk.integration.transformer;

import javax.inject.Inject;
import org.springframework.stereotype.Component;
import com.springinpractice.ch13.cdm.Ticket;
import com.springinpractice.ch13.helpdesk.model.TicketEntity;

@Component
public class TicketTransformer {
    @Inject private TicketCategoryTransformer ticketCategoryTransformer;
    @Inject private TicketStatusTransformer ticketStatusTransformer;

    public TicketEntity toEntity(Ticket ticketDto) {  
TicketEntity ticketEntity = new TicketEntity(); ticketEntity.setCategory(
ticketCategoryTransformer.toEntity(ticketDto.getCategory())); Customer customerDto = ticketDto.getCreatedBy(); String username = customerDto.getUsername(); if (username != null) { ticketEntity.setCustomerUsername(username); } else { ticketEntity.setCustomerEmail(customerDto.getEmail()); ticketEntity.setCustomerFullName(getFullName(customerDto)); } ticketEntity.setDateCreated(ticketDto.getDateCreated()); ticketEntity.setDescription(ticketDto.getDescription()); ticketEntity.setStatus(
ticketStatusTransformer.toEntity(ticketDto.getStatus())); return ticketEntity; } private String getFullName(Customer customerDto) { String firstName = customerDto.getFirstName(); String lastName = customerDto.getLastName(); if (firstName == null) { return (lastName == null ? "[Unknown]" : lastName).trim(); } else { return (lastName == null ? firstName : firstName + " " + lastName).trim(); } } }

The transformer is a POJO. Although it's possible to use annotations to configure SI components, you're using XML because I (Willie) find it easier to understand when the SI configuration is in one place.

At #1, you have a transformer method. This is the toEntity() method specified in listing 4. When there's a single public method, you don't have to specify the transformer method explicitly in the XML, but you do it anyway. Because the ticket DTO has references to category and status DTOs, you delegate the transformation to corresponding transformers #2 and #3.

With that, you have a full asynchronous flow from the portal application through the message bus and ending with the help desk. To be sure, there are some details we've neglected, such as error handling. But the basic integration is in place. The next section looks at a more complex case: implementing synchronous finder methods.

Implementing the finders: portal's outbound messaging

Finder methods involve a request/reply communication style, which takes more effort to implement in a messaging environment than the fire-and-forget style does. In this case you'll implement synchronous request/replies, meaning the caller will block until the reply arrives; but note that SI also supports asynchronous request/replies, which are based on a callback mechanism. We won't cover that here, though.

Integration and services: an architectural perspective

You might fairly ask why you would implement synchronous request/reply on top of a fundamentally asynchronous messaging infrastructure. Wouldn't it be simpler to have the caller invoke a web service on the target system?

In many cases it's indeed simpler to make a web service call. You can avoid implementing a bunch of integration patterns on the bus, as well as avoid forcing the request and reply messages to pass through the message bus.

But the arguments for using a bus for asynchronous communications mostly apply even for synchronous communications: (1) client systems can decouple themselves from service-specific locations, message formats, authentication schemes, and so on; and (2) you avoid the aforementioned O(n2) problem associated with point-to-point messaging.

One pro-bus argument that doesn't apply in the case of synchronous messaging is the runtime decoupling argument. In the asynchronous case, it doesn't matter if a message receiver is offline when the sender sends the message, because the messaging system queues the message until the receiver is available. With synchronous communications, the receiver must be available when the sender sends it a request.

We won't settle the issue here, but suffice it to say there's a design decision to consider. The rest of the recipe shows how to implement synchronous messaging without necessarily claiming that it's the right approach for all cases.

You'll add support for three finder methods. Figure 4 augments the portal-side pipeline you established in figure 2. Originally you had a single path to an AMQP outbound channel adapter. This time you add a couple of new paths to an AMQP outbound gateway.

Figure 4 The portal's outbound pipeline with support for the TicketGateway's finder methods

Channel adapters and gateways are alike in that they're both interfaces to external systems, but not alike in that channel adapters are unidirectional (fire-and-forget) while gateways support request/reply communications. In this case, the external system is the message bus. The following listing shows how to implement the pipeline in figure 4.

Listing 6 beans-integration.xml: portal application

<?xml version="1.0" encoding="UTF-8"?>
<beans ...>

    ... RabbitMQ configuration from listing 3, plus the following ...

    <rabbit:queue name="findTicketStatusRequest.queue" /> 
<rabbit:queue name="findTicketCategoriesRequest.queue" /> <rabbit:queue name="findTicketCategoryRequest.queue" /> ... OXM configuration from listing 3, plus the following ... <oxm:jaxb2-marshaller id="marshaller">
<oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.DummyPayload" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.Ticket" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketCategory" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm. [CA]TicketCategory$TicketCategoryList" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketCategoryRequest" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketStatus" /> <oxm:class-to-be-bound [CA]name="com.springinpractice.ch13.cdm.TicketStatusRequest" /> </oxm:jaxb2-marshaller> <int:gateway
service-interface="com.springinpractice.ch13.portal.integration. [CA]gateway.TicketGateway" default-request-channel="helpDeskRequestChannel" default-request-timeout="2000" default-reply-timeout="2000"> <int:method name="createTicket" request-channel="createTicketRequestChannel" /> <int:method name="findOpenTicketStatus"
payload-expression="new com.springinpractice.ch13.cdm. [CA]TicketStatusRequest('open')"> <int:header name="requestType" value="findTicketStatusRequest" /> </int:method> <int:method name="findTicketCategories"
payload-expression="new com.springinpractice.ch13.cdm. [CA]DummyPayload()"> <int:header name="requestType" [CA]value="findTicketCategoriesRequest" /> </int:method> <int:method name="findTicketCategory"
[CA]request-channel="findTicketCategoryRequestChannel"> <int:header name="requestType" [CA]value="findTicketCategoryRequest" /> </int:method> </int:gateway> <int:channel id="createTicketRequestChannel" /> <int-amqp:outbound-channel-adapter amqp-template="amqpTemplate" channel="createTicketRequestChannel" routing-key="createTicketRequest.queue" /> <int:channel id="findTicketCategoryRequestChannel" />
<int:transformer
input-channel="findTicketCategoryRequestChannel" output-channel="helpDeskRequestChannel" expression="new com.springinpractice.ch13.cdm. [CA]TicketCategoryRequest(payload)" /> <int:channel id="helpDeskRequestChannel" />
<int-amqp:outbound-gateway
amqp-template="amqpTemplate" request-channel="helpDeskRequestChannel" routing-key-expression="headers['requestType'] + '.queue'" mapped-request-headers="requestType" /> </beans>

You add three new queues at #1 to support your new finder methods. At #2, you add more classes to be bound to the OXM configuration. You'll see why you're adding the dummy payload and special request objects in a moment.

You modify the gateway definition at #3. You specify that by default all requests coming into the gateway will land on the helpDeskRequestChannel. You also set a default reply timeout, expressed in milliseconds, because now you're expecting replies.

On replies: unless you specify an explicit default-reply-channel (which you're not doing here), the gateway creates for any given request a temporary, anonymous reply channel, and adds the channel to the request message as a header called replyChannel. That way, reply-generating downstream endpoints know where to place the reply.

The first finder method is findOpenTicketStatus() (#4). You use a SpEL payload expression to create a TicketStatusRequest DTO for the open status. The reason you create a special request DTO is that you need the request to be XML. This is because the AMQP gateway expects an XML reply from the bus (recall your CDM), which it maps to an object via the AMQP template, which in turn uses the MarshallingMessageConverter. The template applies the converter to both the request and the reply, so the request needs to be a mappable DTO as opposed to a simple string.

In addition to the SpEL payload, the finder method definition includes a custom requestType header (custom in the sense that you invented it). Both SI and RabbitMQ support message headers, but here, the header is an SI header. You'll use this header to route finder requests to the right queue, as you'll see.

At #5 is the second finder method. This time you get a list containing all ticket categories, which is useful for populating the category drop-down in the new ticket form.

There is a small problem, though. By default, SI treats no-arg gateway methods as connecting to pollable (receive-only) channels, as opposed to no-arg request/reply (send-then-receive) channels. To implement a request/reply communication, you need to provide a dummy payload using payload-expression. Normally you can pass in a dummy string or a Date:

    payload-expression="new java.util.Date()"

But here that doesn't work because you're using MarshallingMessageConverter, which expects payloads to be mappable XML. That's why you have the DummyPayload class, and you use payload-expression to create an instance here. Once again you enrich your message with a requestType header for routing purposes.

The third finder retrieves a specific ticket category by ID (#6). Once again you need to represent the payload ID using XML rather than a Long. You'll need a transformer for this. You override the gateway's default request channel with a new channel called findTicketCategoryRequestChannel and then pass the message over that channel (#7) to the transformer at (#8). Here you take advantage of the transformer's expression attribute to wrap the Long ID in a mappable request DTO. Finally the message goes to the helpDeskRequestChannel (#9) like the other help desk requests.

The next stop is an AMQP outbound gateway (#10). You use the AMQP template to do the actual request and reply. Then there are a couple of header-related attributes.

First you use routing-key-expression to specify a dynamic, message-driven routing key that allows Rabbit's default exchange to route messages to queues. In this case, the expression is a SpEL expression that appends .queue to the value of the requestType SI header you've been using.

You use mapped-request-headers to indicate that you want the SI requestType header to appear as an AQMP header as well once the message hits the bus. This is because you'll have further use for this header for routing on the help desk side.

As with the initial gateway, the AMQP outbound gateway generates a reply. Here's how this works behind the scenes. For any given request, the outbound gateway creates a temporary reply queue and sets the AMQP message's reply_to property to the queue's name. This tells downstream endpoints where to place the reply when it materializes. Once the reply appears in that queue, the AMQP outbound gateway grabs it and places it on the request message's reply channel. You'll recall from our discussion that the request message maintains a reference to the reply channel as the value of its replyChannel header.

Implementing the finders: help desk's inbound messaging

Now the portal sends finder requests to the bus, so the help desk needs to pick those up and service them. The supporting help desk pipeline appears in figure 5.

Figure 5 The help desk's inbound pipeline to support the TicketGateway's finder methods. Although it's not shown here, each chain contains a service activator followed by a transformer.

This help desk pipeline receives finder requests at an AMQP inbound gateway and forwards them to a router, which uses the requestType header to pass the request to one of three chains. Each chain invokes a finder method on the TicketRepository and uses a transformer to convert the result into a DTO before returning it to the caller. Here's the configuration you use to implement the help desk pipeline.

Listing 7 beans-integration.xml: help desk application

<?xml version="1.0" encoding="UTF-8"?>


    ... RabbitMQ configuration from listing 4, plus the following ...

    <rabbit:queue name="findTicketStatusRequest.queue" /> 
<rabbit:queue name="findTicketCategoriesRequest.queue" /> <rabbit:queue name="findTicketCategoryRequest.queue" /> ... OXM configuration from listing 4, plus the following ... <oxm:jaxb2-marshaller id="marshaller">
<oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.DummyPayload" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.Ticket" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketCategory" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm. [CA]TicketCategory$TicketCategoryList" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketCategoryRequest" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketStatus" /> <oxm:class-to-be-bound name="com.springinpractice.ch13.cdm.TicketStatusRequest" /> </oxm:jaxb2-marshaller> <context:component-scan base-package="com.springinpractice.ch13.
[CA]helpdesk.integration.transformer" /> ... inbound create ticket pipeline from listing 4 ... <int-amqp:inbound-gateway
queue-names="findTicketStatusRequest.queue, [CA]findTicketCategoriesRequest.queue, [CA]findTicketCategoryRequest.queue" request-channel="helpDeskRequestChannel"
mapped-request-headers="requestType" message-converter="marshallingMessageConverter" /> <int:channel id="helpDeskRequestChannel" /> <int:header-value-router input-channel="helpDeskRequestChannel"
header-name="requestType"> <int:mapping value="findTicketStatusRequest" channel="findTicketStatusRequestChannel" /> <int:mapping value="findTicketCategoriesRequest" channel="findTicketCategoriesRequestChannel" /> <int:mapping value="findTicketCategoryRequest" channel="findTicketCategoryRequestChannel" /> </int:header-value-router> <int:channel id="findTicketStatusRequestChannel" /> <int:chain input-channel="findTicketStatusRequestChannel">
<int:service-activator expression="@ticketStatusRepository.findByKey(payload.key)" /> <int:transformer ref="ticketStatusTransformer" method="toDto" /> </int:chain> <int:channel id="findTicketCategoriesRequestChannel" /> <int:chain input-channel="findTicketCategoriesRequestChannel"> <int:service-activator expression="@ticketCategoryRepository.findAll()" /> <int:transformer ref="ticketCategoryListTransformer" method="toDto" /> </int:chain> <int:channel id="findTicketCategoryRequestChannel" /> <int:chain input-channel="findTicketCategoryRequestChannel">
<int:service-activator expression="@ticketCategoryRepository.findOne(payload.id)" /> <int:transformer ref="ticketCategoryTransformer" method="toDto" /> </int:chain> </beans>

As was true with the portal SI configuration, you declare the three queues for finder requests at #1 to ensure that they exist. You also declare the same set of DTOs for OXM at #2 because you'll need to convert back and forth between the bus CDM and Java.

You need a few transformers to convert the entities you find into DTOs, so you scan for them at #3. The entry point for synchronous messages into the pipeline is the AMQP inbound gateway at #4. You specify its three feeder queues using the queue-names attribute. Just as you used mapped-request-headers in listing 6 to convert the custom SI requestType header into an AMQP header, you use it here to convert the AMQP header back into a custom SI requestType header.

The AMQP inbound gateway supports replies. When the gateway receives a message from a queue, it creates an anonymous reply channel and attaches it to the message using the replyChannel message header. Eventually some downstream component responsible for producing the reply will place the reply in that channel.

The gateway passes requests to a router that uses header values to drive routing (#5).

As you've guessed, you're using the requestType header for that. The <mapping> elements provide the routing definitions.

Once the request leaves the router, it goes to one of three chains you've defined, corresponding to the three finder requests. First is a chain for the ticket status requests (#6). The chain has an expression-driven service activator that unpacks the key from the request object (recall that you wrapped the key with a TicketStatusRequest in listing 6) and calls the findByKey() method on the TicketStatusRepository. The result is an entity, so you use a transformer to convert the entity back to a DTO for subsequent mapping to the XML-based CDM on the return trip. See the sample code for the transformers, which are similar to the one from listing 5.

Because you haven't specified an explicit output channel for the chain, the chain sends the transformer's output to the channel you're storing under the replyChannel header. The circuit is now complete: the help desk AMQP inbound gateway receives the reply from the channel and sends it to the specified exchange and queue (as specified by the routing key). The portal AMQP outbound gateway receives the reply from the queue and places it on the replyChannel. Finally the initial portal gateway receives the reply and returns it to the caller. The chains at #7 and #8 are essentially similar to the one at #6.

With that, you're done. You now have the plumbing on both the portal and the help desk sides to support both asynchronous and synchronous communications over Rabbit. Although we didn't cover it here, note that the help desk also requests customer information from the portal, using largely the same set of patterns, but in the opposite direction.

Discussion

In this recipe you used RabbitMQ as the bus-implementation technology and SI as a way to implement app-specific bus adapters. But this isn't the only way to use SI. You can use SI itself to implement buses. In the recipes that follow, you'll reposition the help desk's SI pipeline as an application bus in its own right and then add both inbound and outbound email by attaching them to the application bus.


Here are some other Manning titles you might be interested in:


Here are some other Manning titles you might be interested in:

Spring in Action, Third Edition

Spring in Action, Third Edition
Craig Walls

Spring Batch in Action

Spring Batch in Action
Thierry Templier, Arnaud Cogoluegnes, Gary Gregory, and Olivier Bazoud

Spring Integration in Action

Spring Integration in Action
Mark Fisher, Jonas Partner, Marius Bogoevici, and Iwein Fuld



[1] Java, Ruby, Python, .NET, Perl, PHP, C/C++, Erlang, Lisp, Haskell...

[2] See www.rabbitmq.com/download.html for download and installation instructions.

[3] Although it would be possible to use Spring Integration to bridge web services to RabbitMQ, here there's no point. There are fewer moving parts if you remove the web services and connect the apps directly to the broker. In other situations it might be desirable to keep the web services around.

[4] HATEOAS is a general architectural principle and might make sense outside the context of RESTful web services. But you don't have any use for links here, so you'll go with plain-vanilla DTOs. Spring HATEOAS may be useful for implementing message-bus CDMs in addition to REST APIs.

[5] See Joel Spolsky, "The Law of Leaky Abstractions," Nov. 11, 2002, http://www.joelonsoftware.com/articles/ LeakyAbstractions.html.

[6] If the exchange concept is new to you, you might want to take a few minutes to read up on it. See www.rab- bitmq.com/tutorials/tutorial-three-java.html for a quick overview.