windowedAggregationHandoff

Source processor that aggregates fragment messages based on a period of time or a number of messages, whichever comes first.

Windowed aggregation handoff is only meant for OneWay scenarios and to immediately ACK all incoming messages.

Properties

Name Summary

numberOfMessages

Required along with periodInSeconds to configure the aggregation of fragments for a number of messages or a period of time in seconds, whichever comes first, depending on the rate of messages.

periodInSeconds

Required along with numberOfMessages to configure the aggregation of fragments for a number of messages or a period of time in seconds, whichever comes first, depending on the rate of messages.

timeWindowStartOnFirstMessage

Whether the time window should start on the first reception of a message after a window has been closed or immediately open a new window when the previous one is closed. Optional and defaults to false (i.e., a new window is immediately opened).

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

retainPayloadOnFailure

Whether the incoming payload is available for error processing on failure. Defaults to false.

Sub-builders

Name Summary

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

inboundTransformationStrategy

Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.

Details

Usage

Both the distribute and split processors support windowed aggregation handoffs to flows when the flow ID contains the suffix -handoff. This naming pattern is what signals to the main flow that a handoff is taking place. The handoff flow would then be configured in a manner similar to the following:

flowConfig {
    id = "message-aggregation-handoff"
    description = "Message Aggregation Handoff Flow"
    ownerId = "my-company"
    exchangePattern = FlowExchangePattern.OneWay

    windowedAggregationHandoff {
        id = "windowed-aggregation-handoff-source"
        numberOfMessages = 5L
        periodInSeconds = 2L
    }

    test {
        id = "log-handoff"
        messageLoggingStrategy {
            logDescription = true
            logFullMessage = true
        }
    }
}

In the main flow, the initiating processor then references the same -handoff flow ID. For example:

distribute {
    id = "distribute-message"
    flowId("message-aggregation-handoff")
}

Caveats

It is important to note that this aggregating source, like all other parts of Utilihive message processing, is subject to at-least-once delivery and eventual consistency semantics. You cannot, for instance, set up a window for 24 hours and expect exactly 1 aggregate per 24 hours containing exactly every fragment received in that period. The aggregation is done on a best-effort basis. Fragments will not be lost, but they might be redelivered and/or arrive out of order. Aggregates might be produced at unexpected times, and they might contain message duplicates. Also, the aggregating source is not a singleton, so you will get one aggregate per server node.

In short: it is not intended as a tool to create large atomic transactions consisting of a predictable set of messages but as a tool for grouping messages into sensibly sized chunks on a best-effort basis.

Expecting consistent atomic aggregates with in-order messages is, in general, not a good design approach for integration flows. Upstream systems might produce out of order or heavily delayed messages, and downstream systems might temporarily reject some aggregates while accepting others. In such scenarios, the at-least-once delivery and eventual consistency guarantees of Utilihive are a good approach to ensure that all messages are delivered eventually. The only external requirement is that the downstream systems must handle duplicate messages.

It is also important to note that for some flow-server deployments, there is a size restriction on the fragments and the threshold is determined by the flow-server configuration. If the size of a fragment exceeds the configured threshold, the fragment will be rejected and an error will be reported. Please make sure that the size of the fragments to be aggregated is within the configured threshold of the flow-server deployment. This size threshold can be obtained from Utilihive support for the multi-tenant deployment or your operations team for the on-premise deployment.