Kafka - kafka-console-consumer

> Data Integration Tool (ETL/ELT) > Kafka (Event Hub)

1 - About

kafka-console-consumer is a consumer command line that:

Advertising

3 - Example

3.1 - Command line

# 1 New Consumer
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42
 
# 2 Old consumer
/usr/bin/kafka-console-consumer --zookeeper zk01.example.com:2181 --topic t1

3.2 - Docker

Example with docker

docker-compose exec kafka  \
  kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --new-consumer --from-beginning --max-messages 42

4 - Options

Option Description Example
New Consumer
--bootstrap-server Kafka - Bootstrap Servers broker1:9092,broker2:9092
--new-consumer Use the new consumer implementation. This is the default. No value
Old Consumer
--zookeeper The connection string for the zookeeper connection host:port,host:port
Addressing: Topic
--topic The topic id to consume on.
--whitelist Whitelist of topics to include for consumption.
--blacklist Blacklist of topics to exclude from consumption.
Addressing: Offset
--delete-consumer-offsets Kafka - (Consumer) Offset - If specified, the consumer path in zookeeper is deleted when starting up
--from-beginning Start with the earliest message present in the log rather than the latest message.
--offset The offset id to consume from.
- a non-negative number),
- or earliest (from the beginning)
- or latest (from end)
default: latest
Addressing: Partition
--partition The partition to consume from. Integer
Config
--consumer-property Pass config properties in the form key=value
--consumer.config Consumer config properties file.
Serialization
--key-deserializer deserializer for key org.apache.kafka.common. serialization.StringDeserializer
--value-deserializer deserializer for values org.apache.kafka.common. serialization.StringSerializer
Processing parameters
--skip-message-on-error Skip the errors instead of halt.
--timeout-ms Exit if no message is available for consumption in the specified interval.
--isolation-level Set to:
read_committed in order to filter out transactional messages which are not committed.
read_uncommitted to read all messages.
default: read_uncommitted
Reporter
--csv-reporter-enabled If set, the CSV metrics reporter will be enabled
--metrics-dir Output directory for the CSV metrics
--enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)
Advertising

4.1 - Console output

Console Output
--max-messages The maximum number of messages to consume before exiting. If not set, consumption is continual. 10
--formatter The name of a class to use for formatting kafka messages for display. default kafka.tools.DefaultMessageFormatter.
--property The properties to initialize the message formatter. print a key –property print.key=true

A formatter extends the MessageFormatter class.

There is actually:

Property options are:

  • print.timestamp - print the timestamp
  • print.key - print the key
  • print.value - print the value
  • key.separator and line.separator
  • key.deserializer and value.deserializer. Kafka - Serdes
Advertising

5 - The full Java command and the code

java -Xmx512M
 -server
 -XX:+UseG1GC
 -XX:MaxGCPauseMillis=20
 -XX:InitiatingHeapOccupancyPercent=35
 -XX:+DisableExplicitGC
 -Djava.awt.headless=true
 -Dcom.sun.management.jmxremote
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.ssl=false
 -Dschema-registry.log.dir=/usr/bin/../logs
 -Dlog4j.configuration=file:/etc/schema-registry/log4j.properties
 -cp :/usr/bin/../share/java/kafka-serde-tools/*:/usr/bin/../package-kafka-serde-tools/target/kafka-serde-tools-package-*-development/share/java/kafka-serde-tools/*:/usr/bin/../package-schema-registry/target/kafka-schema-registry-package-*-development/share/java/schema-registry/*:/usr/bin/../share/java/confluent-common/*:/usr/bin/../share/java/rest-utils/*:/usr/bin/../share/java/schema-registry/*
 kafka.tools.ConsoleConsumer
 --formatter  io.confluent.kafka.formatter.AvroMessageFormatter
 --property  schema.registry.url=http://localhost:8081
 --bootstrap-server broker:9092
 --topic test-sqlite-jdbc-accounts
 --from-beginning