Functional Tests

Flows can be tested locally with the Utilihive SDK’s testing library. The library is based on JUnit 5 with support for running concurrent tests. When testing, flows are deployed to a local, standalone server and then removed once the test is complete. Requests can be made to these temporarily deployed flows, and their responses asserted, as part of the test.

Download the official example project to see a variety of fully working test classes.

Basic Usage

The basic outline for a test class looks like the following:

class MyFlowTest : ConcurrentTestBase() {
    @Test
    fun `GIVEN x WHEN y THEN z`(ctx: ConcurrentTestContext) {
        ctx.addFlowTestConfig {
            // resources to deploy
            ...
        }

        flowTest(ctx) {
            // requests and assertions
            ...
        }
    }
}

Note that test classes inherit the ConcurrentTestBase class, and @Test functions are given a ConcurrentTestContext parameter. The ConcurrentTestContext object is used to specify which flows, resources, etc. to deploy to the test server.

The flowTest() function handles the actual deployment and will start the test server if it is not already running. After the flowTest() function runs its course, the deployed flows and resources are removed from the test server. Thus, any requests and assertions must be made within the flowTest() callback function block.

If your test depends on the preexistence of a message (e.g., a queued message in a message broker or a file on an Secure File Transfer Protocol (SFTP) server), the creation of that message should also happen in the flowTest() callback to ensure all flows are properly deployed before the message is consumed.

Test Context

The addFlowTestConfig builder is used to set up the test context (i.e., the resources to deploy). The following functions are available in the builder:

Function Description

resource()

Adds a resource (such as an OpenAPI spec). Test resources are created as a Resource instance. See below for an example.

flow()

Adds a flow configuration, as created with the flowConfig builder. Flows are deployed concurrently with no guaranteed order of fulfillment.

tableData()

Adds (and optionally populates) a dynamic table. See the Dynamic Tables documentation for more details.

authConfig()

Adds a secret (i.e., authorization credentials). See below for an example.

Resources

OpenAPI specs, WSDL documents, and similar resources must be created as an instance of the Resource class before they are added to the test context. Creating a resource object requires a ResourceRevisionKey and the contents of the file. For example:

Resource(key = restResourceKey, content = openApiDefinition)

Revision keys are created with the newResourceRevisionKey builder in the following manner:

val restResourceKey = newResourceRevisionKey {
    ownerId = OWNER_ID
    type = "OpenAPIv3"
    id = "my-rest-api"
    revision = "latest"
}

Secrets

The authConfig() function takes two arguments: the key that the server uses to look up the credentials, and a map of the actual credentials. For example:

authConfig(
    "myKey", mapOf(
        "userName" to "myUsername",
        "password" to "myPassword"
    )
)

Assertions

It is recommended to write assertions with the AssertJ library. The example project already includes AssertJ as a Maven dependency.

For asynchronous tests, such as when testing SFTP file consumption, wrap your assertions in the SDK’s assertOrTimeout() callback function to prevent the test from failing prematurely. For example:

assertOrTimeout(TIMEOUT_DURATION) {
    assertThat(...)
}

Requests

REST

REST-based flows can be tested by making an HTTP request to the flow endpoint and asserting the response. The SDK provides a restApiEndpoint() function, which returns a WebTarget object. WebTarget methods can then be chained to specify the path, type, etc. For example:

val responseMessage = restApiEndpoint(myFlowSpec)
    .path("api")
    .request()
    .basicAuth()
    .post(json(SimpleValue("input")), SimpleMessage::class.java)

assertThat(responseMessage.message).isEqualTo("output")
basicAuth() is an extension function unique to the SDK that adds a basic username/password authentication header to the request. If a username is not provided as an argument, it will default to "testUser".

The WebTarget is designed to convert a response body into a Java type. Hence the SimpleMessage::class.java statement. The SDK provides the following built-in data classes to help with these response conversions:

Class Description

SimpleValue

Objects with a value property.

SimpleMessage

Objects with a message property.

MessageAcquirementReceipt

Objects with messageId and receiptCreationDateTime properties.

You can also write your own data classes as the need arises.

SOAP

SOAP-based flows can be tested by creating a web service client context and making a request within that context. The client is first configured with the newWebServiceClientConfig builder in the following manner:

val clientConfig = WebServiceClientConfig.newWebServiceClientConfig {
    flowSpec = soapFlowSpec
    wsdlDoc = soapDefinition
    serviceName = "MyService"
    portName = "MyService"
}

The following properties apply to the client configuration:

Property Description

flowSpec

The flow spec to test against. Required.

wsdlDoc

The WSDL document, in string format, for the SOAP service. Required.

serviceName

The name of the service from the WSDL to use in the request. Required.

portName

The name of the port from the WSDL to use in the request. Required.

soapAction

The operation, identified by its soapAction attribute, from the WSDL to use in the request. Optional.

address

The service endpoint address as a URL object. Optional.

username

The username for the request’s authentication. Optional.

password

The password for the request’s authentication. Optional.

After the client configuration has been established, the context is created via the withWebServiceFlowClient() function, as the following demonstrates:

withWebServiceFlowClient(clientConfig) {
    val response = request(...)
    assertThat(response).isEqualTo(...)
}

Calling request() from within this context will make a request to the specified client configuration. The request() function can either accept an XML string or JSON compliant maps and will return the response in the same format.

The following example uses an XML string (and would return an XML string):

val response = request(
    """
    <SayHi xmlns="http://www.bccs.uib.no/EchoService.wsdl">
        <Hi>hello</Hi>
    </SayHi>
    """.trimIndent()
)

The following example uses JSON compliant maps (and would return a map):

val nsMap = mapOf("_default" to "http://www.bccs.uib.no/EchoService.wsdl")
val xmlJson = mapOf(
    "_xmlns" to nsMap,
    "SayHi" to mapOf("Hi" to mapOf("_text" to "hello"))
)

val response = request(xmlJson, nsMap)

See the Payload Types documentation for more help on formatting XML as JSON.

Backend Stubbing

To ensure more reliable testing, you can stub out backend systems that your flows interact with. The SDK provides built-in support for HTTP stubbing and includes an embedded Kafka broker for testing Kafka-based flows. Both are optional dependencies and must be added to your Maven pom.xml file if you would like to use them.

For testing with technologies other than Kafka and HTTP, you can import additional libraries to create stubs or test against isolated instances and resources on the actual backend system. For example, testing against a temporary database is a valid approach. The best solution will vary by project, so it’s important to consider what works best for your use case and whether stubbing is necessary.

HTTP

For outbound HTTP requests, such as those made by a restRequest processor, you can stub the responses using WireMock. The Utilihive SDK uses the WireMock library for this purpose. WireMock is already included as a Maven dependency in the example project.

WireMock is used by calling the SDK’s withWireMock() function in the following manner:

withWireMock(config) { wireMockServer ->
    // stubs, test logic, assertions, etc.
    ...
}

The withWireMock() function creates a WireMockServer instance and runs the provided callback function. Stubs can then be created using WireMock methods on the returned wireMockServer object.

The config argument is an optional server configuration. By default, the WireMock server is set up with a unique dynamic port. If a custom configuration is provided, you should still use dynamic ports to ensure isolation from other concurrent tests.

However, a dynamic port means you will also need to dynamically update the processor to use the correct URL. This can be accomplished by mapping the flow spec into a new one while simultaneously updating the RestRequestConfig object on the flow. For example:

val stubbedFlowSpec = originalFlowSpec.map<RestRequestConfig> { restRequestConfig ->
    val newAddress = URL("http://localhost:${wireMockServer.port()}/echo")
    restRequestConfig.copy(address = newAddress)
}

You would then add the mapped flow to the test context instead of the original flow, as the following demonstrates:

ctx.addFlowTestConfig {
    flow(stubbedFlowSpec)

    ...
}

SFTP

If your flows require testing SFTP connections, you can use Utilihive’s SFTP stubbing library. For more details, see the SFTP Server Stub project on GitHub.

Kafka

The SDK includes an embedded Kafka broker for testing flows that use Kafka as a source or target. To enable the embedded Kafka broker, wrap your test code in the withKafkaBroker function, as shown below:

// Within a test function with a ConcurrentTestContext parameter 'ctx'
withKafkaBroker {
    // setup topics, produce messages, etc.
    ...
    ctx.addFlowTestConfig {
        // resources to deploy
        ...
    }
    flowTest(ctx) {
        // usage of the kafkaConsumer, kafkaProducer and assertions
        ...
    }
}
To use the embedded Kafka broker, include spring-kafka-test as a test dependency in your Maven pom.xml.

The test context provides the following functions to help with Kafka testing:

Function Description

addTopic()

Creates a new topic on the embedded Kafka broker.

startBroker()

Starts the broker. Automatically invoked when entering the withKafkaBroker block. Can also be used to restart the broker. Topics are recreated when the broker is restarted.

stopBroker()

Stops the broker and preserves topics, which are recreated when the broker is started again.

getBrokerList()

Returns the broker list in the format host:port for use in Kafka processor configurations.

To produce and consume messages, the context also includes the following variables:

Variable Description

kafkaProducer

A Kafka producer for producing messages to topics on the embedded broker.

kafkaConsumer

A Kafka consumer for consuming messages from topics on the embedded broker.

The kafkaProducer and kafkaConsumer should be used within the flowTest block to ensure that the flows are deployed before producing or consuming messages.

Tracing

Messages can be traced directly in the SDK to help with debugging and gaining insight into processor payload types. This is accomplished by adding a trace sub-builder to the test context configuration. Traces are registered on a given flow spec and ID of an individual processor, as the following setup demonstrates:

ctx.addFlowTestConfig {
    resource(openApiResource)
    flow(simpleRestSpec)

    trace {
        flow = simpleRestSpec
        processorId = "echo-response-creator"
    }
}

In this example, the trace would capture the inbound message for the echo-response-creator processor, including the payload and all message exchange properties. The output would look like the following:

-------------- Trace: exampleOwner--simple-rest.echo-response-creator.in-23b254 -----------------------
| Type: FlowProcessorWiretap
| Timestamp: 2022-07-25T09:27:37.168945Z
|
| ownerId: exampleOwner
| flowId: simple-rest-23b254
| processorId: echo-response-creator
| location: INBOUND
| messageId: cbeb4ac1-79d4-4dd6-80d9-d02e7acaf694
| originalMessageId: cbeb4ac1-79d4-4dd6-80d9-d02e7acaf694
| principalId: testUser
| messageExchangeProperty.http.header.Content-Type: application/json
| messageExchangeProperty.http.method: POST
|
| ...
|
| Payload:
| {
|   "value" : "testValue"
| }
|
-------------- End trace: exampleOwner--simple-rest.echo-response-creator.in-23b254 -----------------------

If you want to trace the message after the processor has processed it, add the outbound() function to the trace builder. For example:

trace {
    flow = simpleRestSpec
    processorId = "echo-response-creator"
    outbound()
}
Trace processing is asynchronous and concurrent. If you have traces on multiple processors, the results won’t necessarily come through in the same order as the processors in the pipeline.

Bear in mind that you would typically use tracing in one test at a time and remove the trace after the issue has been resolved. It is not something you would want committed to your version control system.

Logs

The SDK also provides a logAsserter object to help with verifying logs on the flow-server. However, this should only be used in cases where you are unable to test the end results of a flow.

The logAsserter is used in the following manner:

logAsserter.awaitEvent {
    logger = LoggerNames.TEST_PROCESSOR
    flowId = myFlowSpec.id
    messagePhrase("Processing")
}

The logAsserter supports the following methods:

Method Description

awaitEvent()

Attempt to find exactly one log event matching the given log query.

awaitEvents(n)

Attempt to find n number of log events matching the given log query.

awaitAtLeastEvents(n)

Attempt to find at least n number of log events matching the given log query.

assertNoEvents()

Assert that no log events matching the given log query are found.

The following properties can be applied to a logAsserter to define the parameters of your log event query:

Property Description

logger

The source of the log (e.g., a test processor). Use the LoggerNames enum for available options.

level

The level of the log, such as INFO or ERROR. Use the LogEvent.Level enum for available options.

timeoutMills

The timeout when the log asserter should stop waiting and fail.

flowId

The ID of the flow that the log pertains to.

filter

Extra filtering to perform after the log events have been queried with the other parameters. Filters are written as lambda expressions.

messagePhrase()

The phrase to look for in the log. messagePhrase() can be added multiple times to watch for several phrases.

Each property is optional, but it is helpful to narrow down the query as much as possible. For example, the following filter would ensure that the log starts with the word "processing" instead of it being found anywhere in the log:

filter = { events ->
    events.filter { it.message.startsWith("Processing") }
}