Source processor that consumes messages published to the specified Kafka cluster and topic.

Consumer groups enable multiple consumers to collaborate on processing messages from multiple partitions concurrently. It is the main tool for scaling on the consumer side. Furthermore, consumer groups enable multiple consuming applications to each have a separate view of the event stream and to read the stream independently at their own pace and with their own offsets.

The message will not be acknowledged to the Kafka cluster until it has been successfully persisted in the source processor.


Name Summary


An ID string to pass to the server when making requests. Used to track the source of requests beyond just the IP/port by allowing a logical application name to be included in server-side request logging. Optional and defaults to blank.


A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of host1:port1,host2:port2, etc. Required.


Comma-separated list of topic names to which the consumer should subscribe. Required.


The consumer group ID. Enables multiple consumer instances to process messages from different partitions in parallel and support individual message offset tracking for multiple consuming applications. Required.


A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group re-balances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.


Maximum number of records returned in a single poll. This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to 500.


The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed, and the group will re-balance in order to reassign the partitions to another member.

For consumers using a non-null groupInstanceId that reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats, and partitions will be reassigned after expiration of sessionTimeoutMs. This mirrors the behavior of a static consumer which has shutdown.

Optional and defaults to 300_000.


The maximum time to block in the call to poll(). However, the poll() function might block beyond this timeout in order to execute custom ConsumerRebalanceListener callbacks. Optional and defaults to 10_000.


The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its live-ness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a re-balance. Note that the value must be in the allowable range as configured in the broker configuration by and Optional and defaults to 10_000.


The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate re-balancing when new consumers join or leave the group. The value must be set lower than but typically should be no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal re-balances. Optional and defaults to 3_000.


Optional list of class names or class types, ordered by preference, of supported partition assignment strategies. The client will use these to distribute partition ownership amongst consumer instances when group management is used. Provided implementations:

  • org.apache.kafka.clients.consumer.RangeAssignor (default)

  • org.apache.kafka.clients.consumer.RoundRobinAssignor


The KafkaAutoOffsetReset option to use when there is no initial offset in Kafka, or if the current offset does not exist anymore on the server (i.e., because that data has been deleted). Can be one of the following:

  • EARLIEST: Automatically reset the offset to the earliest offset (but can potentially result in consuming a very large amount of messages after starting up).

  • LATEST: Automatically reset the offset to the latest offset (default).


Optional, minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available, or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate, which can improve server throughput a bit at the cost of some additional latency.


The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to 1_048_576.


Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface. Optional and defaults to org.apache.kafka.common.serialization.StringDeserializer.


Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. Optional and defaults to org.apache.kafka.common.serialization.StringDeserializer.


Maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to 30_000.


Key from the server configuration used to look up SSL credentials for server and client authentication. Optional.


Optional, descriptive name for the processor.


Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.


Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.


Whether the incoming payload is available for error processing on failure. Defaults to false.


Name Summary


Strategy for describing the external system integration. Optional.


Strategy for describing how a processor’s message is logged on the server.


Strategy for archiving payloads.


Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.



The authenticationConfigKey property supports secrets of type UserNameAndPassword and Tls. See the Secret Types documentation for formatting details.

TLS (SSL) Configuration

The Kafka consumer can be configured to use TLS/SSL on the transport level. :imageprefix:

If the target service is using a valid SSL certificate, signed by a trusted CA, there is no need for additional configuration. However, configuration must be provided if you require one of the following features:

  • Providing a client certificate (if requested by the server).

  • Providing a truststore for server certificate validation. This is useful, for example, when the root certificate has not been signed by a trusted CA.

  • Limiting the server certificate to a specified public key.

  • Accepting a self-signed server certificate.

  • Accepting a server certificate issued for a host other than the one being requested.

To enable these features, the authenticationConfigKey property must be set and point to a secret of type Tls. See the Secret Types documentation for formatting details.