aggregate

Processor that aggregates fragments based on an identifier (correlation ID) and a given number of expected fragments.

Incomplete aggregates will be persisted until they are completed or one of the thresholds defined by maxNoOfMessagesToKeep and maxAgeInSecondsForMessagesToKeep is passed.

The processor produces the following output messages:

  • All non-fragment messages will be forwarded unchanged.

  • All fragment messages up to but not including the last one will be filtered.

  • When the last fragment is received, the whole aggregate is forwarded as the result.

The processor supports the following four modes of aggregation:

  1. Automatic aggregation of fragments produced by the split processor

    No config necessary for extracting correlation ID and fragment count.

  2. Aggregation of fragments that provides both correlation ID and fragment count

    Each incoming fragment message contains both the total number of fragments and the correlation ID.

    Required configuration properties:

    • fragmentCorrelationIdExpr

    • fragmentTotalNoExpr

  3. Aggregation started by an initiator message

    If the total number of fragments can not be provided on each fragment, the aggregation can be started by an initiator message containing the correlation ID and fragment count. Fragments are then only required to provide the correlation ID.

    Required configuration properties:

    • initiatorCorrelationIdExpr

    • initiatorTotalNoExpr

    • fragmentCorrelationIdExpr

    The fragmentCorrelationIdExpr or initiatorCorrelationIdExpr should not be applicable to both the initiator message and fragment at the same time.

  4. Hardcoded number of fragments

    When the total number of fragments is static, it can be hardcoded in the processor configuration. Fragments are then only required to provide the correlation ID.

    Required configuration properties:

    • fragmentCorrelationIdExpr

    • totalNumberOfFragments

Properties

Name Summary

initiatorCorrelationIdExpr

The expression used to select the correlation ID for aggregating fragments from the message that initiated aggregation.

initiatorTotalNoExpr

The expression used to select the total number of expected fragments from the message that initiated aggregation.

fragmentCorrelationIdExpr

The expression used to select the correlation ID from the fragment message.

fragmentTotalNoExpr

The expression used to select the total number of expected fragments from the fragment message.

totalNumberOfFragments

Hardcoded number of expected fragments.

maxNoOfMessagesToKeep

The aggregates to store for the processor before evicting the oldest aggregate.

maxAgeInSecondsForMessagesToKeep

The maximum number of seconds aggregates can be stored before being evicted.

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

retainPayloadOnFailure

Whether the incoming payload is available for error processing on failure. Defaults to false.

Sub-builders

Name Summary

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

inboundTransformationStrategy

Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.

Details

Fragment Handling

The aggregate processor persists all fragments except for the last one. Duplications are checked (and excluded) by comparing the incoming fragment to those already persisted. However, because the last fragment is not persisted, duplication is not checked against this fragment. If the last fragment has already triggered an aggregation and successfully forwarded the message, a redelivered last fragment will cause the aggregation to execute and forward again.

After each fragment is persisted, a filtered message response is returned. In a REST-based flow, the response would include the following property:

"reason": "New fragment received and filtered, aggregation for correlation id '123' is not completed, received 1 of 3 fragments."

For the last fragment, after aggregation, a composite object with the total number of fragments and a list of unique fragments is forwarded. For example:

{
  "numberOfFragments": 3,
  "fragments": [...]
}

Payload Conversion

Configuring a payload conversion with an inboundTransformationStrategy requires extra caution, because the aggregate processor is designed to handle different types of messages (non-fragment, fragment, aggregation initiator) to process the aggregation. Conversion to an object requires that the incoming payload is always of the same type and/or can always be unmarshalled to a given type. Conversion to a string requires that the properties necessary for aggregation are available not as part of the payload but as message exchange properties.

Consider the following use cases:

  • If fragments are produced by a split processor, there are no special requirements.

  • If fragments provide the correlation ID and fragment count, conversion to a string requires that the correlationId and totalNumberOfFragments are set as exchange properties.

  • If there’s a hardcoded number of fragments, conversion to a string requires that the correlationId is set as an exchange property.

  • If aggregation is started by an initiator message, conversion to a string or object requires the following:

    • The initiator message and fragment messages are of the same type (when conversion to an object is configured).

    • The initiatorTotalNo, initiatorCorrelationId, and fragmentCorrelationId are set as exchange properties.

If payload conversion is configured, it applies to all messages (fragment and non-fragment). Thus, the outgoing payload can be of three types:

  1. Converted non-fragment

  2. Converted initiator message (when aggregation with initiator is configured)

  3. Composite object with list of unique converted fragments