Distributed Flows

The following tutorial uses the example project codebase. Make sure you have the repository cloned so you can follow along and run the tests.

This example will create a REST endpoint on the test server that distributes the request data to two other flows. This is useful in cases where the data has multiple destinations that need to be transformed differently.

Creating the Flows

Open the Kotlin file located at main/kotlin/flowexamples/e03/E03DistributionFlow.kt. The setup here is similar to the first tutorial on simple REST endpoints. However, we are working with three flows this time. The following private variables hold the unique IDs of the flows we will use:

object E03DistributionFlow {
    private const val FLOW_ID_API = "distribution-api"
    private const val FLOW_ID_TARGET_1 = "distribution-target-1-handoff"
    private const val FLOW_ID_TARGET_2 = "distribution-target-2-handoff"

    ...
}

The Entry Point Flow

The first flow, distribution-api, will act as the entry point and hand off its payload from the REST request to the other flows. The following code establishes this flow as having an inbound REST API:

val distributionApiSpec = flowConfig {
    id = FLOW_ID_API
    description = "Distribution Flow"
    ownerId = OWNER_ID
    exchangePattern = RequestResponse

    restApi {
        id = "distribution-api"
        apiSpecId = distributionRestResourceKey.toResourceIdentifier()
    }

    ...
}

The handoff occurs with the help of a distribute processor. Multiple flows can be added to the distribute processor by calling the flowId() function, as the following code demonstrates:

distribute {
    id = "distribute-message"
    flowId(FLOW_ID_TARGET_1)
    flowId(FLOW_ID_TARGET_2)
}

This will hand off the payload to two other flows that will further process it on their own. We aren’t done with the REST flow, however. Because we set the exchangePattern property to RequestResponse, we need to ensure a response is sent back to fulfill the original request. The following processor will accomplish this for us:

map {
    id = "create-response"
    mapSpec = """
        {
            "message" : "Distributed"
        }
    """.trimIndent()
}

The Handoff Flows

The next step is to create the flows that receive the data from the distribute processor. For this example, these flows are very similar to each other and look like the following:

val distributionTarget1Spec = flowConfig {
    id = FLOW_ID_TARGET_1
    description = "Distribution Target Flow"
    ownerId = OWNER_ID
    exchangePattern = OneWay

    handoff { id = "handoff-source-1" }

    restRequest {
        id = "request-handoff-1"
        defaultMethod = POST
        address = URL("https://OVERRIDE_ME/target1")
    }
}

The handoff processor signifies that this flow is part of a distributed pipeline and does not create an outward-facing endpoint (like a REST API). The second processor, restRequest, then makes a follow-up POST request to an outbound endpoint using the payload that was handed off to this flow.

The other thing to note is how the exchangePattern has been set to OneWay. A handoff flow will rarely, if ever, need to return a response. The parent flow has already sent a response back to the client, so the handoffs can operate under a "fire and forget" strategy.

Testing the Flows

Open the Kotlin class located at test/kotlin/flowexamples/e03/E03DistributionFlow.kt. This test will deploy all three flows, then make a POST request to the first flow and verify if the handoff flows made their follow-up requests.

The Test Context

Because our flows are dependent on outbound requests, we should mock those requests with WireMock. WireMock also gives us the ability to verify that the requests were actually made. Similar to the previous example, we’ll start our test by creating a WireMockServer instance and stubs for the handoff endpoints using the following code:

withWireMock { wireMockServer ->
    wireMockServer.stubFor(post(urlPathEqualTo("/target1")))
    wireMockServer.stubFor(post(urlPathEqualTo("/target2")))

    ...
}

Next, we update the handoff flow specs to use the mocked URLs instead of the hardcoded https://OVERRIDE_ME/ values, as the following code demonstrates:

val wireMockBaseUrl = "http://localhost:${wireMockServer.port()}"

val mockedTarget1Flow = distributionTarget1Spec.map<RestRequestConfig> { restRequestConfig ->
    restRequestConfig.copy(address = URL("$wireMockBaseUrl/target1"))
}
val mockedTarget2Flow = distributionTarget2Spec.map<RestRequestConfig> { restRequestConfig ->
    restRequestConfig.copy(address = URL("$wireMockBaseUrl/target2"))
}

We then add the two mocked handoff flows, the untouched parent flow, and the OpenAPI resource to the context object with the following code:

ctx.addFlowTestConfig {
    resource(openApiResource)
    flow(distributionApiSpec)
    flow(distributionTarget1Spec)
    flow(distributionTarget2Spec)
}

The Test Assertions

Finally, we deploy the context on the test server and assert the parent flow’s response as follows:

flowTest(ctx) {
    val responseMessage = restApiEndpoint(distributionApiSpec)
        .path("echo")
        .request()
        .basicAuth()
        .post(json(SimpleValue("testValue")), SimpleMessage::class.java)

    assertThat(responseMessage.message).isEqualTo("Distributed")

    ...
}

This first part of the test will pass, because the map processor in the flow hardcodes the following JSON:

{
  "message": "Distributed"
}

The test also includes two WireMock verifications that look like the following:

assertOrTimeout(TIMEOUT_DURATION) {
    wireMockServer.verify(
        postRequestedFor(urlEqualTo("/target1"))
            .withRequestBody(equalToJson(""" { "value": "$inputValue" } """))
    )

    wireMockServer.verify(
        postRequestedFor(urlEqualTo("/target2"))
            .withRequestBody(equalToJson(""" { "value": "$inputValue" } """))
    )
}

The WireMock methods are wrapped in the SDK’s assertOrTimeout() function to ensure that the test doesn’t time out before the handoff flows have had a chance to make their requests. The variable TIMEOUT_DURATION is set to a Java Duration of five seconds.

The WireMock verifications check that a POST request was made to the given URL with the given request body. Note that the expected request body is identical to the payload of the parent flow, thus confirming that the distribute processor successfully handed off the message!

Before moving on to the next tutorial on data mappings, practice with this example by adding a third handoff flow and verifying its execution.