Writing Flow Specs
A flow is a message processing pipeline that acts as the integration layer between systems. Refer to the Introduction to Flows for a conceptual overview on flows and their configuration options.
Flow configurations are created with the flowConfig
builder in the following manner:
flowConfig {
id = "my-flow"
description = "My First Flow"
ownerId = "myCompany"
exchangePattern = RequestResponse
restApi {
id = "echo-api"
apiSpecId = "OpenAPIv3:echo-api:latest"
}
...
}
The first processor added to a flowConfig
builder is the "source," or entry point, for that flow.
The source processor will acquire or receive messages.
For example, an initial restApi
processor signifies that the flow will receive messages on a REST endpoint.
Additional processors added to the flowConfig
make up the flow pipeline, where the message processing occurs.
The pipeline implements processors in the order that they are added to the flowConfig
.
See the Getting Started section for tutorials on writing (and testing) example flow configurations. |
Properties
The following properties can be applied to a flow configuration:
Property | Description |
---|---|
|
The unique ID of the flow. Required. |
|
Description of the flow. Optional. |
|
ID of the customer organization that owns this flow. ID is provided by Greenbird. Required. |
|
The processing behavior of the flow. Can be either |
|
The maximum number of messages that can be inflight (i.e., have not received a response yet) before the flow source stops dispatching messages to the flow pipeline for processing. The flow source will still accept and persist incoming messages, then process them when the number of inflight messages drops below the configured limit. |
|
The maximum number of persisted messages that will be held in memory until a slot becomes available in the flow pipeline, according to the |
|
The maximum number of messages that will be persisted in a stash before rejecting new messages. Messages in the stash will move to the buffer as soon as memory is freed up, according to the |
|
A SpEL expression to extract a correlation ID from the incoming requests in the flow specification. The correlation ID will be stored in the |
Exchange Patterns
The exchangePattern
property determines the overall behavior of the flow and its relationship to other flows or clients.
This property is set by using the FlowExchangePattern
enum (e.g., FlowExchangePattern.RequestResponse
).
The following exchange patterns are available:
Pattern | Description |
---|---|
RequestResponse |
The traditional synchronous request, where the result of the processing is returned to the client in a response at the end of the processing. |
OneWay |
The pattern for asynchronous processing where the client is only concerned with delivering a message and does not wish to wait for the result of the processing. The client will receive a response/receipt immediately after the message has been reliably persisted, regardless of when the actual pipeline processing takes place. Also note that |
Headless |
The flow behaves more like an |
Groups
Flows can optionally be assigned one or more group identifies.
This is useful for grouping flows in UIs, like in Utilihive Heartbeat.
Flows are assigned groups via the group()
function.
For example:
flowConfig {
id = "my-flow"
description = "My First Flow"
ownerId = "myCompany"
exchangePattern = RequestResponse
group("Test Group 1")
group("Test Group 2")
...
}
Strategies
Strategy builders can be added before or after the list of processors in the flowConfig
to configure specifics on how a flow is deployed, archives messages, or handles failures.
Redelivery
The redeliveryStrategy
configures the at-least-once delivery contract for the flow.
In other words, you can specify how often the flow source tries to redeliver a failed message before giving up and delegating the message to error handling.
The strategy can be configured with the following optional properties:
Property | Description |
---|---|
|
The approximate time the source will wait before sending the next redelivery. Please note that when setting |
|
The number of times the message will be redelivered before the source gives up and delegates the message to error handling. Setting this property to |
Not using redeliveries is discouraged. Redeliveries are an integral part of ensuring reliable message processing for flows. |
Dead Letter
The deadLetterStrategy
defines what happens when a flow’s configured redeliveryStrategy
is exhausted, and further delivery attempts are cancelled.
The strategy can be configured with the following optional properties:
Property | Description |
---|---|
|
Flow to which the dead letter payload is forwarded and where the errors can be handled. |
|
Whether errors that are not directly related to messages are sent to the |
|
Whether errors encountered by the source endpoint before the message is processed by the flow are sent to the |
Dead letter payloads include the following fields:
Field | Description |
---|---|
|
Payload of the original message. |
|
ID of the message forwarded to the dead letter flow. |
|
Delivery ID of the message forwarded to the dead letter flow. |
|
Information on whether the error was caused by a transient or non-transient failure. |
|
Number of attempts to deliver the message. |
|
Payload that was being processed on failure. |
|
Object containing info about the failure (e.g., a |
Catch and Return
The catchAndReturnStrategy
is used to catch failures and produce an alternative result.
This is only relevant for RequestResponse
flows and can still be combined with a deadLetterStrategy
.
The strategy is configured by providing a processor that will receive the failure message (in the same format as a dead letter payload).
The results of the processor become the response for the flow.
For example:
catchAndReturnStrategy {
processor<MapConfig.Builder> {
id = "return-error"
mapSpec = """
{
"message" : #input.payload.failureDetails.errorDetails.errorMessage
}
""".trimIndent()
}
}
Note that the processor is implemented by referencing its underlying config class (e.g., MapConfig.Builder
) and not the builder shorthand (e.g., map
).
Most processors are supported except for those that are dependent on some form of persistence (e.g., the suspendMessage
processor) or multi-flow handling (e.g., distribute
, split
).
Flow Pipeline Deployment
The flowPipelineDeploymentStrategy
determines how a flow pipeline is deployed in the flow-server cluster.
The flow pipeline is a separate deployment from the flow source and is either deployed with one instance on each cluster member or as a cluster singleton.
The following table highlights the differences:
Strategy | Description |
---|---|
|
There is a source and a pipeline on each cluster member. The message will not be remoted to another node in the cluster for pipeline processing but processed locally on the node where the source resides. This is the default strategy if the pipeline does not contain a persistent processor. |
|
Only one copy of the pipeline is deployed, and all clustered sources will send messages to the same, single pipeline. This will involve remoting for the sources that are on different nodes than the pipeline. This is the default strategy if the pipeline contains one or more persistent processors. |
The default strategy cannot be overridden from SINGLETON
to CLUSTERED
, only CLUSTERED
to SINGLETON
.
Thus, you would only need to implement a flowPipelineDeploymentStrategy
in the case where you want a normally clustered flow to be singleton instead (like wanting messages from clustered sources to be processed in a single pipeline instance).
To do so, use the scope
property in the following manner:
flowPipelineDeploymentStrategy {
scope = FlowPipelineDeploymentScope.SINGLETON
}
A singleton flow source doesn’t necessarily mean a singleton deployment. By default, the pipeline of a singleton source (such as a file-poller) will be deployed alongside the source on the same node, meaning it works similar to the clustered variant but with only one local deployment. |
Response Payload Archiving
The responsePayloadArchivingStrategy
is used to archive a flow’s response by sending the final payload to another flow.
The receiving flow must follow handoff conventions.
Each archived payload must have a unique path that can later be used to retrieve it from the archive.
The path is generated using the flow and message IDs unless otherwise specified.
The strategy is configured with the following properties:
Property | Description |
---|---|
|
Flow to which the payload will be forwarded. Required. |
|
Optional expression that determines the unique path of the payload in the archive. |
Individual processors, including the source processor, can be configured to archive their incoming payload with a payloadArchivingStrategy. |