receiveFromKafka

Source processor that consumes messages published to the specified Kafka cluster and topic. The processor produces payloads by reading the record’s value as binary data. To convert the binary data to e.g. json, use an inboundTransformationStrategy in a downstream processor.

Consumer groups enable multiple consumers to collaborate on processing messages from multiple partitions concurrently. It is the main tool for scaling on the consumer side. Furthermore, consumer groups enable multiple consuming applications to each have a separate view of the event stream and to read the stream independently at their own pace and with their own offsets.

Delivery modes

This source supports two delivery modes, selected by the flow’s deliveryGuarantee:

Flow-Server Managed Delivery (default — deliveryGuarantee = ByFlowServer)

Every message is persisted by the flow-server before processing. The Kafka offset is committed once the message is safely stored. This provides a server-side at-least-once guarantee independent of Kafka — even if the server restarts, persisted messages are replayed from the journal.

Source-System Managed Delivery (deliveryGuarantee = BySourceSystem)

Messages are forwarded directly to the flow pipeline without persistence in the integration server. Instead, the Kafka consumer manages its own delivery guarantee: offsets are only committed after the flow-server confirms successful processing. If the server restarts, Kafka automatically redelivers any uncommitted messages.

This mode is designed for high-throughput workloads where the persistence overhead of Flow-Server Managed Delivery is a bottleneck. The trade-off is that the delivery guarantee now depends on Kafka’s offset management rather than the server’s journal — which is appropriate when consuming from Kafka, since Kafka already provides this guarantee natively.

When a message fails processing in Source-System Managed Delivery, the flow-server first attempts redelivery to the pipeline (controlled by the flow’s redeliveryStrategy). If redelivery is also unsuccessful, any configured dead letter flow receives the failed message. After all flow-server-level recovery is exhausted, the Kafka endpoint applies the configured nackStrategy: either skip the message and advance the partition, or pause the partition until the problem resolves.

Behavioral characteristics of Source-System Managed Delivery:

  • Downstream must be idempotent. Offsets are committed sequentially — if message 6 is still processing while messages 7 and 8 have already succeeded, the committed offset stays at 5. A crash at that point means messages 6, 7, and 8 are all redelivered on restart, even though 7 and 8 were already processed successfully. This is inherent to at-least-once delivery; design downstream processors to handle duplicate messages safely.

  • A failed message does not stall the partition. When a message fails (NACK), it blocks the offset commit from advancing, but messages after it continue to be delivered and processed normally. This avoids penalizing an entire partition for a single problematic message. The trade-off is a wider redelivery window if a crash occurs while the failure is unresolved.

  • Backpressure is graceful. When the pipeline is slower than the consumption rate, Kafka partitions are paused transparently. The consumer continues sending heartbeats and maintains its group membership — there is no risk of being kicked from the consumer group due to slow processing.

  • Consumer group changes cause redelivery. Scaling, restarts, or rebalances transfer partition ownership to another consumer. Any in-flight messages on the old consumer are abandoned (best-effort offset commit), and the new consumer restarts from the last committed offset. This reinforces the idempotency requirement above.

Memory considerations: In Source-System Managed Delivery, messages are held in memory (not persisted to a journal). The endpoint maintains an internal buffer that can hold up to twice maxPollRecords worth of messages, since a new poll may complete while the buffer is still nearly full. Combined with maxNumberOfInFlightMessages messages currently being processed by the pipeline, the worst-case memory footprint is approximately (2 × maxPollRecords + maxNumberOfInFlightMessages) × average message size.

The goal is not to maximize throughput right up to the memory ceiling — it is to achieve fast, well-tuned processing while leaving a comfortable margin. The flow-server process is shared with other integration flows that also consume memory, and spikes in message size or concurrent load can push usage beyond steady-state estimates. Configure conservatively: favour smaller poll batches and moderate in-flight limits that keep memory well within safe bounds under all expected conditions.

Configuration properties exclusive to Source-System Managed Delivery:

  • nackStrategy(required) what to do when a message permanently fails: SKIP (drop and advance) or PAUSE (block the partition until resolved). Must be set explicitly — there is no default.

  • commitIntervalMillis — how often offsets are committed to Kafka.

  • nackMaxRetries — how many seek-back attempts before the NACK strategy triggers.

  • nackRetryDelayMillis — delay between seek-back retry attempts.

These properties (except nackStrategy, which is required) are optional and fall back to server-level defaults. All are rejected by validation unless the flow’s deliveryGuarantee is BySourceSystem.

Properties

Name Summary

clientId

An ID string to pass to the server when making requests. Used to track the source of requests beyond just the IP/port by allowing a logical application name to be included in server-side request logging. Optional and defaults to blank.

brokerList

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of host1:port1,host2:port2, etc. Required.

topics

Comma-separated list of topic names to which the consumer should subscribe. Required.

groupId

The consumer group ID. Enables multiple consumer instances to process messages from different partitions in parallel and support individual message offset tracking for multiple consuming applications. Required.

groupInstanceId

A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group re-balances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.

maxPollRecords

Maximum number of records returned in a single poll. This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to 500.

maxPollIntervalMs

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed, and the group will re-balance in order to reassign the partitions to another member.

For consumers using a non-null groupInstanceId that reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats, and partitions will be reassigned after expiration of sessionTimeoutMs. This mirrors the behavior of a static consumer which has shutdown.

Optional and defaults to 300_000.

pollTimeoutMs

The maximum time to block in the call to poll(). However, the poll() function might block beyond this timeout in order to execute custom ConsumerRebalanceListener callbacks. Optional and defaults to 10_000.

sessionTimeoutMs

The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its live-ness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a re-balance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms. Optional and defaults to 10_000.

heartbeatIntervalMs

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate re-balancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms but typically should be no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal re-balances. Optional and defaults to 3_000.

partitionAssignmentStrategy

Optional list of class names or class types, ordered by preference, of supported partition assignment strategies. The client will use these to distribute partition ownership amongst consumer instances when group management is used. Provided implementations:

  • org.apache.kafka.clients.consumer.RangeAssignor (default)

  • org.apache.kafka.clients.consumer.RoundRobinAssignor

autoOffsetReset

The KafkaAutoOffsetReset option to use when there is no initial offset in Kafka, or if the current offset does not exist anymore on the server (i.e., because that data has been deleted). Can be one of the following:

  • EARLIEST: Automatically reset the offset to the earliest offset (but can potentially result in consuming a very large amount of messages after starting up).

  • LATEST: Automatically reset the offset to the latest offset (default).

fetchMinBytes

Optional, minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available, or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate, which can improve server throughput a bit at the cost of some additional latency.

maxPartitionFetchBytes

The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to 1_048_576.

requestTimeoutMs

Maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to 30_000.

authenticationConfigKey

A secret key the server uses to look up credentials needed for connecting to the Kafka broker. Optional and uses no authentication by default.

commitIntervalMillis

Periodic offset commit interval in milliseconds for Source-System Managed Delivery. Only applicable when the flow’s deliveryGuarantee is BySourceSystem. Optional and defaults to the server-level configuration.

nackStrategy

Strategy to apply when a message permanently fails processing in Source-System Managed Delivery after all seek-back retry attempts are exhausted. Can be one of the following:

  • SKIP: Skip the failed message and advance the offset watermark. At this point the message has already been through the configured redelivery attempts and any configured error handling (such as dead letter flow forwarding) has been executed. The message is dropped as a final fallback after all other recovery mechanisms are exhausted.

  • PAUSE: Pause the affected partition and continue retrying indefinitely. The partition resumes automatically once the message processes successfully. If the root cause is never resolved (e.g., a poison message that always fails validation, or a permanently unavailable downstream system), the partition remains blocked indefinitely.

Only applicable when the flow’s deliveryGuarantee is BySourceSystem. Required — must be explicitly set on all BySourceSystem Kafka flows. There is no default; flow designers must make a deliberate choice between data loss (SKIP) and partition stalling (PAUSE) for their use case.

nackMaxRetries

Maximum number of seek-back retry attempts before the NACK strategy triggers in Source-System Managed Delivery. Only applicable when the flow’s deliveryGuarantee is BySourceSystem. Optional and defaults to the server-level configuration.

nackRetryDelayMillis

Delay in milliseconds between NACK retry attempts in Source-System Managed Delivery. Only applicable when the flow’s deliveryGuarantee is BySourceSystem. Optional and defaults to the server-level configuration.

spreadKey

A cluster placement hint. Sources that share the same spreadKey will be distributed evenly throughout cluster nodes, improving resilience and distributing load. Note that unlike groupId, which governs message consumption within Kafka, this value is used exclusively by the server for cluster node placement. If not configured, default placement logic will be applied.

Optional.

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

Sub-builders

Name Summary

externalSystemDetails

Strategy for describing the external system integration. Optional.

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

Details

Authentication

The authenticationConfigKey property supports secrets of type UserNameAndPassword and Tls. You can specify multiple secrets by providing them as a comma-separated list. Comma-separated list support is designed to allow users to combine a Tls secret with a UserNameAndPassword secret. This is useful when you are using SASL-SSL/SCRAM (SHA-256) username/password authentication and also need to add a server trust store or other TLS configuration. For example:

authenticationConfigKey: "tlsSecret,usernamePasswordSecret"

The behavior is undefined if multiple secrets of the same type are provided.

We currently support only SASL-SSL/SCRAM (SHA-256) and mTLS authentication methods.

See the Secret Types documentation for formatting details.

Record Key

The receiveFromKafka processor does not include the record key when producing payloads.