Writing Flow Specs

A flow is a message processing pipeline that acts as the integration layer between systems. Refer to the Introduction to Flows for a conceptual overview on flows and their configuration options.

Flow configurations are created with the flowConfig builder in the following manner:

flowConfig {
    id = "my-flow"
    description = "My First Flow"
    ownerId = "myCompany"
    exchangePattern = RequestResponse

    restApi {
        id = "echo-api"
        apiSpecId = "OpenAPIv3:echo-api:latest"
    }

    ...
}

The first processor added to a flowConfig builder is the "source," or entry point, for that flow. The source processor will acquire or receive messages. For example, an initial restApi processor signifies that the flow will receive messages on a REST endpoint. Additional processors added to the flowConfig make up the flow pipeline, where the message processing occurs. The pipeline implements processors in the order that they are added to the flowConfig.

See the Getting Started section for tutorials on writing (and testing) example flow configurations.

Properties

The following properties can be applied to a flow configuration:

Property Description

id

The unique ID of the flow. Required.

description

Description of the flow. Optional.

ownerId

ID of the customer organization that owns this flow. ID is provided by Greenbird. Required.

exchangePattern

The processing behavior of the flow. Can be either RequestResponse, OneWay, or Headless. Required.

maxNumberOfInFlightMessages

The maximum number of messages that can be inflight (i.e., have not received a response yet) before the flow source stops dispatching messages to the flow pipeline for processing. The flow source will still accept and persist incoming messages, then process them when the number of inflight messages drops below the configured limit.

maxBufferSize

The maximum number of persisted messages that will be held in memory until a slot becomes available in the flow pipeline, according to the maxNumberOfInFlightMessages limit.

maxStashSize

The maximum number of messages that will be persisted in a stash before rejecting new messages. Messages in the stash will move to the buffer as soon as memory is freed up, according to the maxBufferSize limit. Note that the message remains persisted even as it moves from stash to buffer to inflight.

correlationIdExpr

A SpEL expression to extract a correlation ID from the incoming requests in the flow specification. The correlation ID will be stored in the messageCorrelationId exchange property but won’t overwrite an existing ID/exchange property.

Exchange Patterns

The exchangePattern property determines the overall behavior of the flow and its relationship to other flows or clients. This property is set by using the FlowExchangePattern enum (e.g., FlowExchangePattern.RequestResponse). The following exchange patterns are available:

Pattern Description

RequestResponse

The traditional synchronous request, where the result of the processing is returned to the client in a response at the end of the processing.

OneWay

The pattern for asynchronous processing where the client is only concerned with delivering a message and does not wish to wait for the result of the processing. The client will receive a response/receipt immediately after the message has been reliably persisted, regardless of when the actual pipeline processing takes place. Also note that OneWay messages are still subject to delivery guarantees (i.e., redelivery and dead letter strategies).

Headless

The flow behaves more like an include statement. The processor pipeline of a Headless flow is included in the upstream pipeline at the point where it is referenced. The message processing in the Headless flow pipeline becomes subject to the exchange pattern and message reliability configurations of the upstream flow.

Groups

Flows can optionally be assigned one or more group identifies. This is useful for grouping flows in UIs, like in Utilihive Heartbeat. Flows are assigned groups via the group() function. For example:

flowConfig {
    id = "my-flow"
    description = "My First Flow"
    ownerId = "myCompany"
    exchangePattern = RequestResponse

    group("Test Group 1")
    group("Test Group 2")

    ...
}

Strategies

Strategy builders can be added before or after the list of processors in the flowConfig to configure specifics on how a flow is deployed, archives messages, or handles failures.

Redelivery

The redeliveryStrategy configures the at-least-once delivery contract for the flow. In other words, you can specify how often the flow source tries to redeliver a failed message before giving up and delegating the message to error handling. The strategy can be configured with the following optional properties:

Property Description

redeliveryMillis

The approximate time the source will wait before sending the next redelivery. Please note that when setting redeliveryMaxNo to 0, redeliveryMillis still has significance: the source will wait around this amount of time for confirmation on the initial message before delegating it to error handling.

redeliveryMaxNo

The number of times the message will be redelivered before the source gives up and delegates the message to error handling. Setting this property to 0 will turn off redelivery attempts, but the original message is still sent for the first time.

Not using redeliveries is discouraged. Redeliveries are an integral part of ensuring reliable message processing for flows.

Dead Letter

The deadLetterStrategy defines what happens when a flow’s configured redeliveryStrategy is exhausted, and further delivery attempts are cancelled. The strategy can be configured with the following optional properties:

Property Description

deadLetterFlowId

Flow to which the dead letter payload is forwarded and where the errors can be handled.

forwardNonMessageRelatedErrors

Whether errors that are not directly related to messages are sent to the deadLetterFlowId. An example error would be a poller issue like losing connection to the external queue.

forwardMessagePreprocessingErrors

Whether errors encountered by the source endpoint before the message is processed by the flow are sent to the deadLetterFlowId. An example error would be malformed or invalid JSON in a REST source.

Dead letter payloads include the following fields:

Field Description

originalMessagePayload

Payload of the original message.

originalMessageId

ID of the message forwarded to the dead letter flow.

originalDeliveryId

Delivery ID of the message forwarded to the dead letter flow.

failureTransience

Information on whether the error was caused by a transient or non-transient failure.

messageDeliveryAttempts

Number of attempts to deliver the message.

payloadOnFailure

Payload that was being processed on failure.

failureDetails

Object containing info about the failure (e.g., a restRequest failure would include the URL, status code, error message, etc.).

Catch and Return

The catchAndReturnStrategy is used to catch failures and produce an alternative result. This is only relevant for RequestResponse flows and can still be combined with a deadLetterStrategy. The strategy is configured by providing a processor that will receive the failure message (in the same format as a dead letter payload). The results of the processor become the response for the flow. For example:

catchAndReturnStrategy {
    processor<MapConfig.Builder> {
        id = "return-error"
        mapSpec = """
            {
                "message" : #input.payload.failureDetails.errorDetails.errorMessage
            }
        """.trimIndent()
    }
}

Note that the processor is implemented by referencing its underlying config class (e.g., MapConfig.Builder) and not the builder shorthand (e.g., map). Most processors are supported except for those that are dependent on some form of persistence (e.g., the suspendMessage processor) or multi-flow handling (e.g., distribute, split).

Flow Pipeline Deployment

The flowPipelineDeploymentStrategy determines how a flow pipeline is deployed in the flow-server cluster. The flow pipeline is a separate deployment from the flow source and is either deployed with one instance on each cluster member or as a cluster singleton. The following table highlights the differences:

Strategy Description

CLUSTERED

There is a source and a pipeline on each cluster member. The message will not be remoted to another node in the cluster for pipeline processing but processed locally on the node where the source resides. This is the default strategy if the pipeline does not contain a persistent processor.

SINGLETON

Only one copy of the pipeline is deployed, and all clustered sources will send messages to the same, single pipeline. This will involve remoting for the sources that are on different nodes than the pipeline. This is the default strategy if the pipeline contains one or more persistent processors.

The default strategy cannot be overridden from SINGLETON to CLUSTERED, only CLUSTERED to SINGLETON. Thus, you would only need to implement a flowPipelineDeploymentStrategy in the case where you want a normally clustered flow to be singleton instead (like wanting messages from clustered sources to be processed in a single pipeline instance). To do so, use the scope property in the following manner:

flowPipelineDeploymentStrategy {
    scope = FlowPipelineDeploymentScope.SINGLETON
}
A singleton flow source doesn’t necessarily mean a singleton deployment. By default, the pipeline of a singleton source (such as a file-poller) will be deployed alongside the source on the same node, meaning it works similar to the clustered variant but with only one local deployment.

Response Payload Archiving

The responsePayloadArchivingStrategy is used to archive a flow’s response by sending the final payload to another flow. The receiving flow must follow handoff conventions. Each archived payload must have a unique path that can later be used to retrieve it from the archive. The path is generated using the flow and message IDs unless otherwise specified.

The strategy is configured with the following properties:

Property Description

flowId

Flow to which the payload will be forwarded. Required.

objectPathExpr

Optional expression that determines the unique path of the payload in the archive.

Individual processors, including the source processor, can be configured to archive their incoming payload with a payloadArchivingStrategy.