receiveFromKafka

Source processor that consumes messages published to the specified Kafka cluster and topic.

Consumer groups enable multiple consumers to collaborate on processing messages from multiple partitions concurrently. It is the main tool for scaling on the consumer side. Furthermore, consumer groups enable multiple consuming applications to each have a separate view of the event stream and to read the stream independently at their own pace and with their own offsets.

The message will not be acknowledged to the Kafka cluster until it has been successfully persisted in the source processor.

Properties

Name Summary

clientId

An ID string to pass to the server when making requests. Used to track the source of requests beyond just the IP/port by allowing a logical application name to be included in server-side request logging. Optional and defaults to blank.

brokerList

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of host1:port1,host2:port2, etc. Required.

topics

Comma-separated list of topic names to which the consumer should subscribe. Required.

groupId

The consumer group ID. Enables multiple consumer instances to process messages from different partitions in parallel and support individual message offset tracking for multiple consuming applications. Required.

groupInstanceId

A unique identifier of the consumer instance provided by the end user. Only non-empty strings are permitted. If set, the consumer is treated as a static member, which means that only one instance with this ID is allowed in the consumer group at any time. This can be used in combination with a larger session timeout to avoid group re-balances caused by transient unavailability (e.g., process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.

maxPollRecords

Maximum number of records returned in a single poll. This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to 500.

maxPollIntervalMs

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed, and the group will re-balance in order to reassign the partitions to another member.

For consumers using a non-null groupInstanceId that reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats, and partitions will be reassigned after expiration of sessionTimeoutMs. This mirrors the behavior of a static consumer which has shutdown.

Optional and defaults to 300_000.

pollTimeoutMs

The maximum time to block in the call to poll(). However, the poll() function might block beyond this timeout in order to execute custom ConsumerRebalanceListener callbacks. Optional and defaults to 10_000.

sessionTimeoutMs

The timeout used to detect client failures when using Kafka’s group management facility. The client sends periodic heartbeats to indicate its live-ness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a re-balance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms. Optional and defaults to 10_000.

heartbeatIntervalMs

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate re-balancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms but typically should be no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal re-balances. Optional and defaults to 3_000.

partitionAssignmentStrategy

Optional list of class names or class types, ordered by preference, of supported partition assignment strategies. The client will use these to distribute partition ownership amongst consumer instances when group management is used. Provided implementations:

  • org.apache.kafka.clients.consumer.RangeAssignor (default)

  • org.apache.kafka.clients.consumer.RoundRobinAssignor

autoOffsetReset

The KafkaAutoOffsetReset option to use when there is no initial offset in Kafka, or if the current offset does not exist anymore on the server (i.e., because that data has been deleted). Can be one of the following:

  • EARLIEST: Automatically reset the offset to the earliest offset (but can potentially result in consuming a very large amount of messages after starting up).

  • LATEST: Automatically reset the offset to the latest offset (default).

fetchMinBytes

Optional, minimum amount of data the server should return for a fetch request. If insufficient data is available, the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available, or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate, which can improve server throughput a bit at the cost of some additional latency.

maxPartitionFetchBytes

The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

This value is only treated as a hint and may be limited by server/tenant wide configuration. Optional and defaults to 1_048_576.

keyDeserializerClass

Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface. Optional and defaults to org.apache.kafka.common.serialization.StringDeserializer.

valueDeserializerClass

Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. Optional and defaults to org.apache.kafka.common.serialization.StringDeserializer.

requestTimeoutMs

Maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to 30_000.

authenticationConfigKey

Key from the server configuration used to look up SSL credentials for server and client authentication. Optional.

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

retainPayloadOnFailure

Whether the incoming payload is available for error processing on failure. Defaults to false.

Sub-builders

Name Summary

externalSystemDetails

Strategy for describing the external system integration. Optional.

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

inboundTransformationStrategy

Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.

Details

Authentication

The authenticationConfigKey property supports secrets of type UserNameAndPassword and Tls. See the Secret Types documentation for formatting details.

TLS (SSL) Configuration

The Kafka consumer can be configured to use TLS/SSL on the transport level. :imageprefix:

If the target service is using a valid SSL certificate, signed by a trusted CA, there is no need for additional configuration. However, configuration must be provided if you require one of the following features:

  • Providing a client certificate (if requested by the server).

  • Providing a truststore for server certificate validation. This is useful, for example, when the root certificate has not been signed by a trusted CA.

  • Limiting the server certificate to a specified public key.

  • Accepting a self-signed server certificate.

  • Accepting a server certificate issued for a host other than the one being requested.

To enable these features, the authenticationConfigKey property must be set and point to a secret of type Tls. See the Secret Types documentation for formatting details.