Archiving and Resubmission

Message payloads can be archived at various stages in a flow. For instance, you can archive the input to a processor (including source processors) using a payloadArchivingStrategy. The final response payload in a RequestResponse flow can also be archived using a flow-level responsePayloadArchivingStrategy.

Payload archiving is an expensive operation and should only be used when absolutely needed.

Archiving Payloads

Archiving works by sending a copy of the payload to a dedicated handoff flow. It is up to this "archiver" flow to actually store the payload somewhere, though, such as in a Google Cloud Storage bucket. Thus, an archiver flow might look like the following example:

flowConfig {
    id = "my-archiver-flow-handoff"
    ownerId = OWNER_ID
    exchangePattern = OneWay

    handoff { id = "archiver-handoff" }

    setExchangeProperty {
        id = "set-path-as-filename"
        propertyName = "fileName"
        source = "envelope.payload.archivePath"
    }

    setPayload {
        id = "set-original-as-payload"
        source = "envelope.payload.originalMessagePayload"
    }

    writeToGoogleStorage {
        ...
    }
}

Archived payloads need to have a unique path that can later be used to retrieve them from the storage solution. By default, the path is generated using the flow and message IDs and available in the archiver flow as the payload property archivePath. If a custom path is needed, the standard path can be overridden by setting the objectPathExpr property on the archiving strategy. For example:

payloadArchivingStrategy {
    flowId = "my-archiver-flow-handoff"
    objectPathExpr = "'my-own-id/' + envelope.originalMessageId"
}

Retrieving Payloads

To retrieve payloads from the archive, you must create a separate flow with an ID of archive and a source processor of archiveLookup. The archiveLookup processor creates a REST endpoint that receives the archive path on the property payloadArchivePath, which is then used to look up the archived payload from the given storage solution. For example:

flowConfig {
    id = "archive"
    ownerId = OWNER_ID
    exchangePattern = RequestResponse

    archiveLookup { id = "archive-retriever-api" }

    setPayload {
        id = "set-path-as-payload"
        source = "envelope.payload.payloadArchivePath"
    }

    lookupFromGoogleStorage {
        ...
    }
}

The flow is triggered by making a GET request to /{ownerId}/rs/archive/payloads/<archive path>. Heartbeat will also do this automatically for you when viewing payloads in the Flow Traces page, as long as the predefined insights-archive-user service account is given access to the archive flow and all downstream flows.

Resubmitting Payloads

Sometimes it is desirable to resend a message that has already been processed by a flow. A typical example would be if a backend has had an issue where it was rejecting messages for longer than the flow’s redelivery cycle, causing the message to fail and be discarded by the flow source. When the backend has been fixed, the messages can be fetched from the archive and sent again. This is called "resubmission".

It is possible to resubmit some types of archived messages from the Flow Traces page in Heartbeat, but this requires the following concrete initial setup:

  • Resubmissions must be enabled for your organization.

  • A separate flow with ID resubmits has been set up. This flow must have a resubmit source. See below for implementation details.

  • The predefined insights-archive-user service account is given access to the resubmits flow and all downstream flows.

The flow you are resubmitting to is the "destination flow". The following constraints must be fulfilled for a message to be eligible for resubmission to a destination flow:

  • The destination flow has a handoff source. It is not possible to resubmit messages to any other source type (restApi, readFiles, etc.).

  • The message must have been archived from the source of the destination flow (i.e., the message must be identical to the one originally received by the flow and not a version archived from a processor further down the flow pipeline).

Setting up the Flows

The resubmits flow is the endpoint where Heartbeat will post resubmission requests. It is also possible to post requests there manually, as long as the request is authenticated with the same insights-archive-user credentials.

The flow exposes a POST endpoint that receives a list of archived paths to be resubmitted to a destination flow. For example:

{
  "payloadArchivePaths": [
    "/path/to/payload1",
    "/path/to/payload2"
  ],
  "destinationFlowId": "destination-flow-a-handoff"
}

The flow should be able to fetch and forward messages to any destination flow your organization wants to be able to resubmit messages to. A typical resubmits flow would look like the following:

flowConfig {
    id = "resubmits"
    ownerId = OWNER_ID
    exchangePattern = OneWay

    resubmit { id = "resubmit-api" }

    (1)
    map {
        id = "build-list"
        mapSpec = """
            let #archivePaths := for #path in #input.payload.payloadArchivePaths[]
            return {
                "payloadArchivePath" : #path,
                "destinationFlowId" : #input.payload.destinationFlowId
            }

            return #archivePaths
        """.trimIndent()
    }

    (2)
    split {
        id = "split-list"
        flowId = "retrieve-and-resubmit-handoff"
    }
}
1 Convert the resubmit object into a list of path/destination objects using map.
2 split the list into individual messages and forward them to a handoff flow so that they can be processed individually.

The handoff flow would then look like the following example:

flowConfig {
    id = "retrieve-and-resubmit-handoff"
    ownerId = OWNER_ID
    exchangePattern = OneWay

    handoff { id = "retrieval-handoff" }

    (1)
    saveToStash {
        id = "stash-destination"
        key = "destinationFlowId"
        source = "envelope.payload.destinationFlowId"
    }

    (2)
    lookupFromGoogleStorage {
        id = "fetch-payload"
        ...
    }

    (3)
    distribute {
        id = "distribute-for-resubmission"
        flowId("destination-flow-a-handoff")
        flowId("destination-flow-b-handoff")
        flowId("destination-flow-c-handoff")

        targetFilterExpression("targetFlowId == envelope.stash.destinationFlowId")
    }
}
1 In the handoff flow, stash the destination ID for later use in the distribution filter.
2 Fetch the payload to resubmit.
3 distribute the message to the destination flow, using targetFilterExpression to route the message to the correct destinationFlowId.