distribute

Processor that distributes messages to a set of flows. The distribution will not be considered complete until all destinations have confirmed reception of the message.

The distributor maintains some state for the delivery of messages and avoids (re-)delivering to destinations that have already confirmed reception of a given message. This state is not persistent, meaning that the processor does not provide any special delivery guarantee. How long the distributor maintains state for x number of simultaneous messages can be configured.

This has the following connotations:

  • Message processing is only considered successful if all the target flows have reported success.

  • Any delivery guarantee has to be implemented upstream in the flow when needed.

  • Because state is not persistent, downstream target flows/processors that are not idempotent must ensure any guarantees they require themselves.

There is one exception: If a flow ID contains the suffix -handoff, messages will be forwarded using handoff semantics. See the handoff processor for more details.

Properties

Name Summary

flowId()

Adds a flow ID to a list of flows that the messages will be distributed to.

targetFilterExpression()

Adds a boolean expression to a list of filters used to select which flows to distribute individual messages to. If no filters are defined, all messages will be distributed to all target flows. If at least one filter is defined, each flow ID will be evaluated against the filter expression. If at least one expression returns true, the message will be distributed to that flow ID. Otherwise, it will not be distributed to that flow ID.

distributionExpiryMillis

The amount of time after distribution is initiated that the distributor maintains state of delivery.

distributionExpiryCheckMillis

The frequency of which the distributor checks for and evicts its internal state for abandoned/failed distributions.

distributionMaxMessages

The maximum number of active messages that the distribution processor can handle simultaneously. If this threshold is reached, the distributor will replace an existing entry by using an undefined strategy.

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

retainPayloadOnFailure

Whether the incoming payload is available for error processing on failure. Defaults to false.

Sub-builders

Name Summary

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

inboundTransformationStrategy

Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.

Details

Filtering

Expressions defined in the targetFilterExpression() function are executed in a context that expands the standard expression context with the targetFlowId property. targetFlowId contains the current flow ID being evaluated.

Multiple expressions can be added to the same processor to create advanced filtering techniques. For example, given the following expressions:

targetFilterExpression("exchangePropertiesMap['targetSystem'] == null")
targetFilterExpression("exchangePropertiesMap['targetSystem'] == 'systemAbc' and targetFlowId.contains('-abc-')")
targetFilterExpression("exchangePropertiesMap['targetSystem'] == 'systemXyz' and targetFlowId.contains('-xyz-')")

Messages that do not contain a targetSystem exchange property (i.e., targetSystem == null) would be distributed to all targets. If targetSystem is set, then messages would only be distributed to that specific target.

Messages for targets that are filtered out are treated in the same way as messages filtered by the filter processor. If all destinations are filtered out, either by a filter processor on the target flow or by the targetFilterExpression(), the response message on the main flow will indicate that the message was filtered.

Results

The distribute processor will only produce a result when a response has been received from all target flows. The processor results are not an aggregate of the responses returned. Instead, the results are built from only one of the target flows' responses. The target flow response is chosen by the following order of precedence:

  1. Message failure from a non-transient error

  2. Message failure from a transient error

  3. Successful message

  4. Filtered message

Handoff Syntax

If the target flows are handoff flows, then the target flow IDs must include the suffix -handoff. This naming pattern signals to the distribute processor that a handoff is taking place, and so the processor will no longer wait for a processed response.

See the handoff processor for more details.