Kafka - Message Timestamp

> Data Integration Tool (ETL/ELT) > Kafka (Event Hub)

1 - About

The message timestamp is used to control the progress of streams and dependent of the application can be differently defined. See source

Advertising

3 - Source

The timestamp of a message can be retrieved from different source.

3.1 - Event-time

event-time processing aka “producer time” is the default. This represents the time when the Kafka producer sent the original message.

3.2 - Ingestion-time

ingestion-time processing aka “broker time” is the time when the Kafka broker received the original message.

3.3 - Processing-time

The timestamp will be the current time in milliseconds from the system clock. The Streams will operate on the basis of the so-called processing-time of events.

3.4 - Payload-time

timestamps embedded in the payload of messages.

4 - Built-in

Built-in timestamp are automatically embedded into Kafka messages by the Kafka producer client (since Kafka 0.10.0.0)

Advertising

5 - Timestamp Extractor API

A timestamp extractor configured with the timestamp.extractor properties: extracts a timestamp from an instance of ConsumerRecord.

5.1 - ConsumerRecordTimestampExtractor

The default extractor is ConsumerRecordTimestampExtractor and extract the built-in timestamp.

The value of the configuration log.message.timestamp.type controls the type:

5.2 - WallclockTimestampExtractor

The WallclockTimestampExtractor extractor returns the processing-time of events.

5.3 - CustomTimestampExtractor

A custom TimestampExtractor retrieve the payload-time timestamp (ie embedded in the payload of messages).

Example of a custom TimestampExtractor implementation:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
 
// Extracts the embedded timestamp of a record (giving you "event time" semantics).
public class MyEventTimeExtractor implements TimestampExtractor {
 
  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    // `Foo` is your own custom class, which we assume has a method that returns
    // the embedded timestamp (in milliseconds).
    Foo myPojo = (Foo) record.value();
    if (myPojo != null) {
      return myPojo.getTimestampInMillis();
    }
    else {
      // Kafka allows `null` as message value.  How to handle such message values
      // depends on your use case.  In this example, we decide to fallback to
      // wall-clock time (= processing-time).
      return System.currentTimeMillis();
    }
  }
 
}
You would then define the custom timestamp extractor in your Streams configuration as follows:
<code java>
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
 
Properties settings = new Properties();
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());

6 - Documentation / Reference

Advertising