Setup Aggregator in Spring Integration 2

Aggregator is an useful component in Spring Integration. It allows you to group a number of related messages before processing them and generates a single message as output. It is also one of the more complicated to configure. This article will demonstrate the steps required to setup a basic aggregator for use in Spring Integration.

Spring Integration Xml setup

Below is an example xml config I use.

    <int:aggregator id="basicAggregator"
        send-timeout="86420000"  >
        <bean id="basicAggregatorBean"/>

There are 3 fundamental components of Aggregator:

  1. The Aggregator bean (basicAggregatorBean) for processing the collected messages. The method for processing the message is defined in the attribute “method” (collect)
  2. Coorelation strategy bean (defaultHeaderAttributeCorrelationStrategy) and method (getCorrelationKey) to define how messages are grouped together.
  3. Release strategy bean (defaultSequenceSizerReleaseStrategy) and method (canRelease) to define the logic used to decide whether messages are ready to be processed.

Aggregator Bean

The aggregator bean is the simplest of the setup. Its a simple POJO. The method should take a list as argument according to the following rules as defined in the Spring Integration document:

  • if the argument is a java.util.List<T>, and the parameter type T is assignable to Message, then the whole list of messages accumulated for aggregation will be sent to the aggregator
  • if the argument is a non-parameterized java.util.List or the parameter type is not assignable to Message, then the method will receive the payloads of the accumulated messages
  • if the return type is not assignable to Message, then it will be treated as the payload for a Message that will be created automatically by the framework.

As an example, below is the skeleton implementation of the bean basicAggregatorBean defined:

public class basicAggregator {

    public List<MyPayload> collect(List<MyPayload> payloads) {
        // Implementation here

Correlation Strategy

The correlation strategy bean implements the interface CorrelationStrategy to decide how messages are grouped together. The simplest way to do this is via a header attribute. By default the message header attribute CORRELATION_ID is used. You may use the header enricher to achieve this:

    <int:header-enricher input-channel="inChannel" output-channel="aggregatorInChannel">
        <int:correlation-id value="123"/>
        <!-- use expression instead of value to avoid exception! -->
        <int:header name="sequenceSize" expression="1"/>

Note the use of expression attribute instead of value, or it will throw an exception java.lang.IllegalArgumentException: The ‘sequenceSize’ header value must be an Integer. Don’t know why.

There are a few concrete implementation of the interface CorrelationStrategy provided in Spring Integration 2.1:

  • HeaderAttributeCorrelationStrategy – determine message correlation based on header attribute value.
  • ExpressionEvaluatingCorrelationStrategy – determine message correlation based on given Spel expression.

Release Strategy

One of the basic release strategy class you could use is the class SequenceSizeReleaseStrategy which keeps a count of messages received and release them when the preset number is reached. Other concrete implemenation of interface ReleaseStrategy in Spring Integration includes:

  • ExpressionEvaluatingReleaseStrategy
  • MessageCountReleaseStrategy – only release first n messages
  • TimeoutCountSequenceSizeReleaseStrategy – releases if message count threshold reached or time lapsed threshold is reached

Note if you are implementing your own release strategy as a POJO bean, your canRelease method needs to have a single list argument.


That’s it. The Spring Integration 2.1 ( has more details. You can also use annotations instead of XML to configure aggregators.