Kafka - Message Timestamp

Kafka Commit Log Messaging Process

About

The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source

Source

The timestamp of a message can be retrieved from different source.

Event-time

event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.

Ingestion-time

ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.

Processing-time

The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.

Payload-time

timestamps embedded in the payload of messages.

Management

Set

Built-in

Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka 0.10.0.0)

Get

kafka-console-consumer

Kafka - kafka-console-consumer with the property print.timestamp

kafka-console-consumer.sh \
   .... 
   --property print.timestamp \
   .....

API

A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.

ConsumerRecordTimestampExtractor

The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.

The value of the configuration log.message.timestamp.type controls the type:

WallclockTimestampExtractor

The WallclockTimestampExtractor extractor returns the processing-time of events.

CustomTimestampExtractor

A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).

Example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (in milliseconds).
    Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      return myPojo.getTimestampInMillis();
    }
    else {
      // Kafka allows `null` as message value.  How to handle such message values
      // depends on your use case.  In this example, we decide to fallback to
      // wall-clock time (= processing-time).
      return System.currentTimeMillis();
    }
  }

}

You would then define the custom timestamp extractor in your Streams configuration as follows:

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());

Documentation / Reference





Discover More
Kafka Commit Log Messaging Process
Kafka - (Record|Message)

in Kafka. The Kafka cluster stores streams of records in categories called topics. Each record consists of: a key, a value, and a timestamp. See built-in timestamp org/apache/kafka/connect/data/Structorg.apache.kafka.connect.data.Struct...
Kafka Commit Log Messaging Process
Kafka - kafka-console-consumer

kafka-console-consumer is a consumer command line that: read data from a Kafka topic and write it to standard output (console). Example with docker Option Description Example ...



Share this page:
Follow us:
Task Runner