receiveFromKafka
Source processor that consumes messages published to the specified Kafka cluster and topic. The processor produces payloads by reading the record’s value as binary data. To convert the binary data to e.g. json, use an inboundTransformationStrategy in a downstream processor.
Consumer groups enable multiple consumers to collaborate on processing messages from multiple partitions concurrently. It is the main tool for scaling on the consumer side. Furthermore, consumer groups enable multiple consuming applications to each have a separate view of the event stream and to read the stream independently at their own pace and with their own offsets.
Delivery modes
This source supports two delivery modes, selected by the flow’s deliveryGuarantee:
Flow-Server Managed Delivery (default — deliveryGuarantee = ByFlowServer)
Every message is persisted by the flow-server before processing. The Kafka offset is committed once the message is safely stored. This provides a server-side at-least-once guarantee independent of Kafka — even if the server restarts, persisted messages are replayed from the journal.
Source-System Managed Delivery (deliveryGuarantee = BySourceSystem)
Messages are forwarded directly to the flow pipeline without persistence in the integration server. Instead, the Kafka consumer manages its own delivery guarantee: offsets are only committed after the flow-server confirms successful processing. If the server restarts, Kafka automatically redelivers any uncommitted messages.
This mode is designed for high-throughput workloads where the persistence overhead of Flow-Server Managed Delivery is a bottleneck. The trade-off is that the delivery guarantee now depends on Kafka’s offset management rather than the server’s journal — which is appropriate when consuming from Kafka, since Kafka already provides this guarantee natively.
When a message fails processing in Source-System Managed Delivery, the flow-server first attempts redelivery to the pipeline (controlled by the flow’s redeliveryStrategy). If redelivery is also unsuccessful, any configured dead letter flow receives the failed message. After all flow-server-level recovery is exhausted, the Kafka endpoint applies the configured nackStrategy: either skip the message and advance the partition, or pause the partition until the problem resolves.
Behavioral characteristics of Source-System Managed Delivery:
-
Downstream must be idempotent. Offsets are committed sequentially — if message 6 is still processing while messages 7 and 8 have already succeeded, the committed offset stays at 5. A crash at that point means messages 6, 7, and 8 are all redelivered on restart, even though 7 and 8 were already processed successfully. This is inherent to at-least-once delivery; design downstream processors to handle duplicate messages safely.
-
A failed message does not stall the partition. When a message fails (NACK), it blocks the offset commit from advancing, but messages after it continue to be delivered and processed normally. This avoids penalizing an entire partition for a single problematic message. The trade-off is a wider redelivery window if a crash occurs while the failure is unresolved.
-
Backpressure is graceful. When the pipeline is slower than the consumption rate, Kafka partitions are paused transparently. The consumer continues sending heartbeats and maintains its group membership — there is no risk of being kicked from the consumer group due to slow processing.
-
Consumer group changes cause redelivery. Scaling, restarts, or rebalances transfer partition ownership to another consumer. Any in-flight messages on the old consumer are abandoned (best-effort offset commit), and the new consumer restarts from the last committed offset. This reinforces the idempotency requirement above.
Memory considerations: In Source-System Managed Delivery, messages are held in memory (not persisted to a journal). The endpoint maintains an internal buffer that can hold up to twice maxPollRecords worth of messages, since a new poll may complete while the buffer is still nearly full. Combined with maxNumberOfInFlightMessages messages currently being processed by the pipeline, the worst-case memory footprint is approximately (2 × maxPollRecords + maxNumberOfInFlightMessages) × average message size.
The goal is not to maximize throughput right up to the memory ceiling — it is to achieve fast, well-tuned processing while leaving a comfortable margin. The flow-server process is shared with other integration flows that also consume memory, and spikes in message size or concurrent load can push usage beyond steady-state estimates. Configure conservatively: favour smaller poll batches and moderate in-flight limits that keep memory well within safe bounds under all expected conditions.
Configuration properties exclusive to Source-System Managed Delivery:
-
nackStrategy— (required) what to do when a message permanently fails: SKIP (drop and advance) or PAUSE (block the partition until resolved). Must be set explicitly — there is no default. -
commitIntervalMillis— how often offsets are committed to Kafka. -
nackMaxRetries— how many seek-back attempts before the NACK strategy triggers. -
nackRetryDelayMillis— delay between seek-back retry attempts.
These properties (except nackStrategy, which is required) are optional and fall back to server-level defaults. All are rejected by validation unless the flow’s deliveryGuarantee is BySourceSystem.
Properties
| Name | Summary |
|---|---|
|
An ID string to pass to the server when making requests. Used to track the source of requests beyond just the IP/port by allowing a logical application name to be included in server-side request logging. Optional and defaults to |
|
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of |
|
Comma-separated list of topic names to which the consumer should subscribe. Required. |
|
The consumer group ID. Enables multiple consumer instances to process messages from different partitions in parallel and support individual message offset tracking for multiple consuming applications. Required. |
|
A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group re-balances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior. |
|
Maximum number of records returned in a single poll. This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to |
|
The maximum delay between invocations of For consumers using a non-null Optional and defaults to |
|
The maximum time to block in the call to |
|
The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its live-ness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a re-balance. Note that the value must be in the allowable range as configured in the broker configuration by |
|
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate re-balancing when new consumers join or leave the group. The value must be set lower than |
|
Optional list of class names or class types, ordered by preference, of supported partition assignment strategies. The client will use these to distribute partition ownership amongst consumer instances when group management is used. Provided implementations:
|
|
The
|
|
Optional, minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will wait for that much data to accumulate before answering the request. The default setting of |
|
The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to |
|
Maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to |
|
A secret key the server uses to look up credentials needed for connecting to the Kafka broker. Optional and uses no authentication by default. |
|
Periodic offset commit interval in milliseconds for Source-System Managed Delivery. Only applicable when the flow’s |
|
Strategy to apply when a message permanently fails processing in Source-System Managed Delivery after all seek-back retry attempts are exhausted. Can be one of the following:
Only applicable when the flow’s |
|
Maximum number of seek-back retry attempts before the NACK strategy triggers in Source-System Managed Delivery. Only applicable when the flow’s |
|
Delay in milliseconds between NACK retry attempts in Source-System Managed Delivery. Only applicable when the flow’s |
|
A cluster placement hint. Sources that share the same spreadKey will be distributed evenly throughout cluster nodes, improving resilience and distributing load. Note that unlike Optional. |
|
Optional, descriptive name for the processor. |
|
Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern |
|
Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here. |
Sub-builders
| Name | Summary |
|---|---|
Strategy for describing the external system integration. Optional. |
|
Strategy for describing how a processor’s message is logged on the server. |
|
Strategy for archiving payloads. |
Details
Authentication
The authenticationConfigKey property supports secrets of type UserNameAndPassword and Tls. You can specify multiple secrets by providing them as a comma-separated list. Comma-separated list support is designed to allow users to combine a Tls secret with a UserNameAndPassword secret. This is useful when you are using SASL-SSL/SCRAM (SHA-256) username/password authentication and also need to add a server trust store or other TLS configuration. For example:
authenticationConfigKey: "tlsSecret,usernamePasswordSecret"
The behavior is undefined if multiple secrets of the same type are provided.
We currently support only SASL-SSL/SCRAM (SHA-256) and mTLS authentication methods.
See the Secret Types documentation for formatting details.