sendToKafka

Processor that acts as a Kafka producer to publish key-value pairs to a given Kafka server.

Properties

Name Summary

brokerList

Required list of host/port pairs to use for establishing the initial connection to the Kafka cluster. This string must be in the form of host1:port1,host2:port2, etc.

topic

Name of the topic to which the message should be published. Required.

acks

Number of acknowledgments the producer requires the leader to have received before considering a request complete. Available options are defined on the KafkaAcks enum. Optional and defaults to the value defined in Kafka ( PARTIAL).

deliveryTimeoutMs

The upper bound on the time to report success or failure after a call to send() returns. According to Kafka docs, this value should be greater than lingerPeriodMs plus requestTimeoutMs. In other words: deliveryTimeoutMs>= lingerPeriodMs + requestTimeoutMs. Optional and defaults to the value from the flow server config.

maxBlockMs

The upper bound on the time to get metadata from the broker. Used to verify connection to the server, as well as obtain information about the cluster. This value should be kept considerably low, since it controls how fast a connection attempt will fail when the backend has problems. Optional and defaults to the value from the flow server config.

requestTimeoutMs

The maximum amount of time (in milliseconds) the client will wait for the response of a request. Optional and defaults to the value from the flow server config.

lingerPeriodMs

Specifies how long the producer will wait for messages before sending them to the broker. It is used to batch several messages together in a single request, reducing traffic to the broker. Messages will be sent once either the lingerPeriodMs time has passed or the batchSize limit is reached. Optional and defaults to the value from the flow server config.

batchSize

Upper bound on the batch size in bytes. If there is a linger period, the producer will try to put several messages in a single batch until a batchSize limit is reached. Once either the lingerPeriodMs time has passed or the batchSize limit is reached, the request will be sent to the broker. Optional and defaults to the value from the flow server config.

compressionMode

The KafkaCompressionMode for all data generated by the producer. Recommended value is ZSTD. Optional and defaults to the value from the flow server config.

maxInFlightRequests

Optional, maximum number of unacknowledged requests the client will send on a single connection before blocking. The client usually creates one connection per broker in the cluster.

The recommended strategy for this parameter is to use no less than half of the potential requests per broker in the cluster. This can be determined with the following equations:

  • Number of potential requests to the cluster: (number of in-flight messages coming to Kafka producer) / (number of messages in a batch)

  • Number of messages in a batch: (rounded up value of batchSize) / (average message size)

  • Number of potential requests per broker: (rounded up value of number of messages in a batch) / (number of brokers in a cluster)

For example, when the Kafka producer receives 2000 messages of about 600 bytes each, has a batch size of 16000 bytes, and must deliver it to a cluster of 12 brokers, the following math applies:

  • Number of messages in a batch: 16000 / 600 = 27

  • Number of potential requests to the cluster: 2000 / 27 = 75

  • Number of potential requests per broker: 75 / 12 = 7

  • Target value of maxInFlightRequests: 7 / 2 = 4. So use the default value of 5.

When the Kafka producer receives 2 messages of about 8000000 bytes each, has a batch size of 16000 bytes, and must deliver it to a cluster of 3 brokers, the following math applies:

  • Number of messages in a batch: 16000 / 8000000 = 1

  • Number of potential requests to the cluster: 2 / 1 = 2

  • Number of potential requests per broker: 2 / 3 = 1

  • Target value of maxInFlightRequests: 1 / 2 = 1. So use the default value of 5.

When the Kafka producer receives 2000 messages of about 600 bytes each, has a batch size of 16000 bytes, and must deliver it to a cluster of 1 broker, the following math applies:

  • Number of messages in a batch: 16000 / 600 = 27

  • Number of potential requests to the cluster: 2000 / 27 = 75

  • Number of potential requests per broker: 75 / 1 = 75

  • Target value of maxInFlightRequests: 75 / 2 = 38. So use 40.

authenticationConfigKey

Key used to look up the credentials needed to connect to the Kafka broker. Optional and uses no authentication by default.

name

Optional, descriptive name for the processor.

id

Required identifier of the processor, unique across all processors within the flow. Must be between 3 and 30 characters long; contain only lower and uppercase alphabetical characters (a-z and A-Z), numbers, dashes ("-"), and underscores ("_"); and start with an alphabetical character. In other words, it adheres to the regex pattern [a-zA-Z][a-zA-Z0-9_-]{2,29}.

exchangeProperties

Optional set of custom properties in a simple jdk-format, that are added to the message exchange properties before processing the incoming payload. Any existing properties with the same name will be replaced by properties defined here.

retainPayloadOnFailure

Whether the incoming payload is available for error processing on failure. Defaults to false.

Sub-builders

Name Summary

processingStrategy

Strategy for providing message processing hints to the server. Optional.

externalSystemDetails

Strategy for describing the external system integration. Optional.

circuitBreakerStrategy

Strategy for configuring the processor’s circuit breaker. Optional.

messageLoggingStrategy

Strategy for describing how a processor’s message is logged on the server.

payloadArchivingStrategy

Strategy for archiving payloads.

inboundTransformationStrategy

Strategy that customizes the conversion of an incoming payload by a processor (e.g., string to object). Should be used when the processor’s default conversion logic cannot be used.

Details

Authentication

The authenticationConfigKey property supports secrets of type UserNameAndPassword and Tls. You can specify multiple secrets by providing them as a comma-separated list. Comma-separated list support is designed to allow users to combine a Tls secret with a UserNameAndPassword secret. This is useful when you are using SASL-SSL/SCRAM (SHA-256) username/password authentication and also need to add a server trust store or other TLS configuration. For example:

authenticationConfigKey: "tlsSecret,usernamePasswordSecret"

The behavior is undefined if multiple secrets of the same type are provided.

We currently support only SASL-SSL/SCRAM (SHA-256) and mTLS authentication methods.

See the Secret Types documentation for formatting details.