receiveFromKafka
Source processor that consumes messages published to the specified Kafka cluster and topic.
Consumer groups enable multiple consumers to collaborate on processing messages from multiple partitions concurrently. It is the main tool for scaling on the consumer side. Furthermore, consumer groups enable multiple consuming applications to each have a separate view of the event stream and to read the stream independently at their own pace and with their own offsets.
The message will not be acknowledged to the Kafka cluster until it has been successfully persisted in the source processor.
Properties
Name | Summary |
---|---|
|
An ID string to pass to the server when making requests. Used to track the source of requests beyond just the IP/port by allowing a logical application name to be included in server-side request logging. Optional and defaults to |
|
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of |
|
Comma-separated list of topic names to which the consumer should subscribe. Required. |
|
The consumer group ID. Enables multiple consumer instances to process messages from different partitions in parallel and support individual message offset tracking for multiple consuming applications. Required. |
|
A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group re-balances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior. |
|
Maximum number of records returned in a single poll. This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to |
|
The maximum delay between invocations of For consumers using a non-null Optional and defaults to |
|
The maximum time to block in the call to |
|
The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its live-ness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a re-balance. Note that the value must be in the allowable range as configured in the broker configuration by |
|
The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate re-balancing when new consumers join or leave the group. The value must be set lower than |
|
Optional list of class names or class types, ordered by preference, of supported partition assignment strategies. The client will use these to distribute partition ownership amongst consumer instances when group management is used. Provided implementations:
|
|
The
|
|
Optional, minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will wait for that much data to accumulate before answering the request. The default setting of |
|
The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to |
|
Deserializer class for key that implements the |
|
Deserializer class for value that implements the |
|
Maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to |
|
Key from the server configuration used to look up SSL credentials for server and client authentication. Optional. |
|
Optional, descriptive name for the processor. |
|
Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern |
|
Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here. |
|
Whether the incoming payload is available for error processing on failure. Defaults to |
Sub-builders
Name | Summary |
---|---|
Strategy for describing the external system integration. Optional. |
|
Strategy for describing how a processor’s message is logged on the server. |
|
Strategy for archiving payloads. |
|
Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used. |
Details
Authentication
The authenticationConfigKey
property supports secrets of type UserNameAndPassword and Tls.
See the Secret Types documentation for formatting details.
TLS (SSL) Configuration
The Kafka consumer can be configured to use TLS/SSL on the transport level. :imageprefix:
If the target service is using a valid SSL certificate, signed by a trusted CA, there is no need for additional configuration. However, configuration must be provided if you require one of the following features:
-
Providing a client certificate (if requested by the server).
-
Providing a truststore for server certificate validation. This is useful, for example, when the root certificate has not been signed by a trusted CA.
-
Limiting the server certificate to a specified public key.
-
Accepting a self-signed server certificate.
-
Accepting a server certificate issued for a host other than the one being requested.
To enable these features, the authenticationConfigKey
property must be set and point to a secret of type Tls.
See the Secret Types documentation for formatting details.