Kafka - Producer (Write / Input)

Kafka Commit Log Messaging Process

About

A Kafka producer is an object that consists of:

  • a pool of buffer space that holds records that haven't yet been transmitted to the server
  • a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster.

Kafka Connect uses it to communicate with Kafka.

Properties

A producer:

  • has no group for coordination.
  • map each message to a topic partition and send a produce request to the leader of that partition.
  • writes asynchronously by default. To make writes synchronous, just wait on the returned future.

Management

Configuration

Full list of configuration settings

The configs can be overridden by prefixing them with producer. For example:

producer.retries=1

Minimal configuration:

Configuration Desc
Producer Registry
bootstrap.servers A list of servers to discover the cluster
client.id The client id for authentication

Partitioner

The built-in partitioner selects a partition using a hash function. All messages with:

  • the same non-empty key will be sent to the same partition
  • an empty key will be sent in a round-robin fashion to ensure an even distribution across the topic partitions.

A custom partitioner may be created for a semantic partition function (for instance by month based on some key in the record).

Batch

Configuration Desc
batch.size The size of the buffer. larger = more batching and requires more memory
linger.ms the producer will delay sending - wait up to that number of milliseconds before sending a request - analogous to Nagle's algorithm in TCP
compression.type Larger batches mean a higher compression ratio.

Producer Queue (unsent messages)

Configuration Desc
buffer.memory limit the total memory
max.block.ms retry time to insert in the queue before raising an exception
request.timeout.ms Removed messages older than this timeout and raise an exception

Acknowledgement / Message Durability

Each produce request to the partition leader can be held until the replicas have successfully acknowledged the write. (control over message durability vs overall throughput)

Messages which were acknowledged by the leader only (i.e. acks=1) can be lost if the partition leader fails before the replicas have copied the message.

Value Require
0 no guarantee that the message was successfully written to the broker’s log - the broker does not send a response - maximum throughput
1 an explicit acknowledgement from the partition leader that the write succeeded.
all the partition leader accept the write, but it was successfully replicated to all of the in-sync replicas.

Request retry

If a request fails, the producer can automatically retry. Enabling retries also opens up the possibility of duplicates. See message delivery semantics

This behavior is controlled bh the retries configuration property

retries=0 won't try to retry.

In idempotent mode, retries will not introduce duplicate.

Message Ordering

Without retries enabled, the broker will preserve the order of writes it receives, but there could be gaps due to individual send failures.

If retries > 0 - Message reordering occurs. To enable retries without reordering, you can set max.in.flight.requests.per.connection to 1 to ensure that only one request can be sent to the broker at a time.

Idempotent and transactional Mode

Since Kafka 0.11, two modes:

  • the idempotent producer
  • and the transactional producer.

Idempotent

The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. The producer can only guarantee idempotence for messages sent within a single session.

retries will no longer introduce duplicates.

To enable idempotence, set the following properties enable.idempotence=true. If set, the following properties are set:

  • retries=Integer.MAX_VALUE
  • acks=all.

Transactional

The transactional producer allows an application to send messages to multiple partitions (and topics) atomically.

To enable, the transaction mode, the following properties must be set:

  • transactional.id
  • replication.factor=3
  • min.insync.replicas=2
  • read only committed messages

The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance.

Example:

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();

API and Example

http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Utility

kafka-console-producer and kafka-avro-console-producer are command line tool to read data from standard output and write it to a Kafka topic.





Discover More
Kafka Commit Log Messaging Process
Kafka - Commit

Messages written to the partition leader are not immediately readable by consumers regardless of the producer’s acknowledgement settings. When all in-sync replicas have acknowledged the write, then...
Kafka Commit Log Messaging Process
Kafka - Docker Single Node (Multiple Service Broker + Zookeeper)

Docker Single Node step by step tutorial adapted from the Quickstart documentation. Made: on Windows 7 with Git Bash for Windows...
Kafka Commit Log Messaging Process
Kafka - Message Timestamp

The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See The timestamp of a message can be retrieved from different source....
Kafka Commit Log Messaging Process
Kafka - Partition

in Kafka Each partition is an: ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential...
Kafka Commit Log Messaging Process
Kafka - Transaction

Calling commitTransaction() just publishes the TX commit marker to the topic so commited consumers know it's safe to consume the messages. There is only one transaction per producer possible. If you...
Kafka Commit Log Messaging Process
Kafka - kafka-avro-console-producer utility

The kafka-avro-console-producer is a producer command line to read data from standard input and write it to a Kafka topic in an avro format. This console uses the Avro converter with the Schema Registry...
Kafka Commit Log Messaging Process
Kafka - kafka-console-producer

kafka-console-producer is a producer command line tool to read data from standard input and write it to a Kafka topic. From the command line: With docker-compose and a kafka services
Converter Basics
Kafka Connect - Connector Plugin

Connector is a component of the connect framework that coordinates data streaming by managing tasks A connector instance is a logical job. Each connector instance coordinates a set of tasks that actually...
Converter Basics
Kafka Connect - Transform (Single Message Transform - SMT)

Transform is a concept of connect to apply simple transformation. A transform is a simple function that accepts one record as input and outputs a modified record. Transform are chained. In case of...



Share this page:
Follow us:
Task Runner