aggregate
Processor that aggregates fragments based on an identifier (correlation ID) and a given number of expected fragments.
Incomplete aggregates will be persisted until they are completed or one of the thresholds defined by maxNoOfMessagesToKeep
and maxAgeInSecondsForMessagesToKeep
is passed.
The processor produces the following output messages:
-
All non-fragment messages will be forwarded unchanged.
-
All fragment messages up to but not including the last one will be filtered.
-
When the last fragment is received, the whole aggregate is forwarded as the result.
The processor supports the following four modes of aggregation:
-
Automatic aggregation of fragments produced by the
split
processorNo config necessary for extracting correlation ID and fragment count.
-
Aggregation of fragments that provides both correlation ID and fragment count
Each incoming fragment message contains both the total number of fragments and the correlation ID.
Required configuration properties:
-
fragmentCorrelationIdExpr
-
fragmentTotalNoExpr
-
-
Aggregation started by an initiator message
If the total number of fragments can not be provided on each fragment, the aggregation can be started by an initiator message containing the correlation ID and fragment count. Fragments are then only required to provide the correlation ID.
Required configuration properties:
-
initiatorCorrelationIdExpr
-
initiatorTotalNoExpr
-
fragmentCorrelationIdExpr
The
fragmentCorrelationIdExpr
orinitiatorCorrelationIdExpr
should not be applicable to both the initiator message and fragment at the same time. -
-
Hardcoded number of fragments
When the total number of fragments is static, it can be hardcoded in the processor configuration. Fragments are then only required to provide the correlation ID.
Required configuration properties:
-
fragmentCorrelationIdExpr
-
totalNumberOfFragments
-
Properties
Name | Summary |
---|---|
|
The expression used to select the correlation ID for aggregating fragments from the message that initiated aggregation. |
|
The expression used to select the total number of expected fragments from the message that initiated aggregation. |
|
The expression used to select the correlation ID from the fragment message. |
|
The expression used to select the total number of expected fragments from the fragment message. |
|
Hardcoded number of expected fragments. |
|
The aggregates to store for the processor before evicting the oldest aggregate. |
|
The maximum number of seconds aggregates can be stored before being evicted. |
|
Optional, descriptive name for the processor. |
|
Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern |
|
Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here. |
|
Whether the incoming payload is available for error processing on failure. Defaults to |
Sub-builders
Name | Summary |
---|---|
Strategy for describing how a processor’s message is logged on the server. |
|
Strategy for archiving payloads. |
|
Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used. |
Details
Fragment Handling
The aggregate
processor persists all fragments except for the last one.
Duplications are checked (and excluded) by comparing the incoming fragment to those already persisted.
However, because the last fragment is not persisted, duplication is not checked against this fragment.
If the last fragment has already triggered an aggregation and successfully forwarded the message, a redelivered last fragment will cause the aggregation to execute and forward again.
After each fragment is persisted, a filtered message response is returned. In a REST-based flow, the response would include the following property:
"reason": "New fragment received and filtered, aggregation for correlation id '123' is not completed, received 1 of 3 fragments."
For the last fragment, after aggregation, a composite object with the total number of fragments and a list of unique fragments is forwarded. For example:
{
"numberOfFragments": 3,
"fragments": [...]
}
Payload Conversion
Configuring a payload conversion with an inboundTransformationStrategy
requires extra caution, because the aggregate
processor is designed to handle different types of messages (non-fragment, fragment, aggregation initiator) to process the aggregation.
Conversion to an object requires that the incoming payload is always of the same type and/or can always be unmarshalled to a given type.
Conversion to a string requires that the properties necessary for aggregation are available not as part of the payload but as message exchange properties.
Consider the following use cases:
-
If fragments are produced by a
split
processor, there are no special requirements. -
If fragments provide the correlation ID and fragment count, conversion to a string requires that the
correlationId
andtotalNumberOfFragments
are set as exchange properties. -
If there’s a hardcoded number of fragments, conversion to a string requires that the
correlationId
is set as an exchange property. -
If aggregation is started by an initiator message, conversion to a string or object requires the following:
-
The initiator message and fragment messages are of the same type (when conversion to an object is configured).
-
The
initiatorTotalNo
,initiatorCorrelationId
, andfragmentCorrelationId
are set as exchange properties.
-
If payload conversion is configured, it applies to all messages (fragment and non-fragment). Thus, the outgoing payload can be of three types:
|