receiveFromKafka
Source processor that consumes messages published to the specified Kafka cluster and topic. The processor produces payloads by reading the record’s value as binary data. To convert the binary data to e.g. json, use an inboundTransformationStrategy in a downstream processor.
Consumer groups enable multiple consumers to collaborate on processing messages from multiple partitions concurrently. It is the main tool for scaling on the consumer side. Furthermore, consumer groups enable multiple consuming applications to each have a separate view of the event stream and to read the stream independently at their own pace and with their own offsets.
The message will not be acknowledged to the Kafka cluster until it has been successfully persisted in the source processor.
Properties
Name | Summary |
---|---|
|
An ID string to pass to the server when making requests. Used to track the source of requests beyond just the IP/port by allowing a logical application name to be included in server-side request logging. Optional and defaults to |
|
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of |
|
Comma-separated list of topic names to which the consumer should subscribe. Required. |
|
The consumer group ID. Enables multiple consumer instances to process messages from different partitions in parallel and support individual message offset tracking for multiple consuming applications. Required. |
|
A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group re-balances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior. |
|
Maximum number of records returned in a single poll. This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to |
|
The maximum delay between invocations of For consumers using a non-null Optional and defaults to |
|
The maximum time to block in the call to |
|
The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its live-ness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a re-balance. Note that the value must be in the allowable range as configured in the broker configuration by |
|
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate re-balancing when new consumers join or leave the group. The value must be set lower than |
|
Optional list of class names or class types, ordered by preference, of supported partition assignment strategies. The client will use these to distribute partition ownership amongst consumer instances when group management is used. Provided implementations:
|
|
The
|
|
Optional, minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will wait for that much data to accumulate before answering the request. The default setting of |
|
The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to |
|
Maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to |
|
Key used to look up the credentials needed to connect to the Kafka broker. Optional and uses no authentication by default. |
|
Optional, descriptive name for the processor. |
|
Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern |
|
Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here. |
|
Whether the incoming payload is available for error processing on failure. Defaults to |
Sub-builders
Name | Summary |
---|---|
Strategy for describing the external system integration. Optional. |
|
Strategy for describing how a processor’s message is logged on the server. |
|
Strategy for archiving payloads. |
|
Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used. |
Details
Authentication
The authenticationConfigKey
property supports secrets of type UserNameAndPassword and Tls. You can specify multiple secrets by providing them as a comma-separated list. Comma-separated list support is designed to allow users to combine a Tls secret with a UserNameAndPassword secret. This is useful when you are using SASL-SSL/SCRAM (SHA-256) username/password authentication and also need to add a server trust store or other TLS configuration. For example:
authenticationConfigKey: "tlsSecret,usernamePasswordSecret"
The behavior is undefined if multiple secrets of the same type are provided.
We currently support only SASL-SSL/SCRAM (SHA-256) and mTLS authentication methods.
See the Secret Types documentation for formatting details.