Kafka Connect - Distributed Worker

> Data Integration Tool (ETL/ELT) > Kafka (Event Hub) > Kafka - Connect

1 - About

The runtime distributed mode of connect when running/starting a worker


3 - Management

3.1 - Metadata (Internal topics)

3.2 - Start

3.2.1 - Command line

bin/connect-distributed worker.properties
# Example -  Configuration that works well with Kafka and Schema Registry services running locally, no more than one broker
$ ./bin/connect-distributed ./etc/schema-registry/connect-avro-distributed.properties


  • worker.properties is the configuration file

New workers will either start a new group or join an existing one based on the worker properties provided.

The connector configuration is not given at the command line because coordination is required between workers. See below connector

3.2.2 - Docker

docker run -d \
  --name=kafka-connect \
  --net=host \
  -e CONNECT_PRODUCER_INTERCEPTOR_CLASSES=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor \
  -e CONNECT_CONSUMER_INTERCEPTOR_CLASSES=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor \
  -e CONNECT_BOOTSTRAP_SERVERS=localhost:29092 \
  -e CONNECT_REST_PORT=28082 \
  -e CONNECT_GROUP_ID="quickstart" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-status" \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR \
  -e CONNECT_PLUGIN_PATH=/usr/share/java \
  -v /tmp/quickstart/file:/tmp/quickstart \

3.3 - Failure

When a worker fails, tasks are rebalanced across the active workers. When a task fails, no rebalance is triggered as a task failure is considered an exceptional case.

If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.


4 - Configuration Management

4.1 - Worker

4.1.1 - Cluster

Distributed workers that are configured with matching group.id values in their configuration file automatically discover each other and form a cluster.

All worker configurations must have matching:

  • config.storage.topic,
  • offset.storage.topic,
  • and status.storage.topic properties.

4.1.2 - Locally

In distributed mode, if you run more than one worker per host (for example, if you are testing distributed mode locally during development), the following settings must have different values for each instance:

  • rest.port - the port the REST interface listens on for HTTP requests

4.2 - Connector

4.2.1 - Creation - Rest

To create a connector, you start the workers and then make a REST request to create/manage a connector

Example: FileSinkConnector where the worker has been started on localhost

curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors
  • or inline
 curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors

Use the REST API to deploy and manage the connectors when running in distributed mode. See connect-managing-distributed-mode - Confluent cli

With the cli. Load a bundled connector with a predefined name or custom connector with a given configuration.

Upon success it will print the connector’s configuration.

confluent load [<connector-name> [-d <connector-config-file>]]


  • connector-name is the id of the connector (which is its name). If the config file has it, it will be overwritten with the command line value.
  • connector-config-file can be a properties or json files. If not specified, default to /etc/kafka/connect-<connector-name>.properties.


confluent load file-source
# same as 
confluent load file-source -d /etc/kafka/connect-file-source.properties
  "name": "file-source",                   
  "config": {                              
    "connector.class": "FileStreamSource", 
    "tasks.max": "1",                      
    "file": "test.txt",                    
    "topic": "connect-test",               
    "name": "file-source"                  
  "tasks": []                              

4.2.2 - Status

Connectors and their tasks publish status updates to a shared topic (configured with status.storage.topic) which all workers in the cluster monitor.

You can see the signification of the state here.

# All connectors
confluent status connectors
# One connector
confluent status connectorName
  "name": "connectorName",
  "connector": {
    "state": "RUNNING",
    "worker_id": ""
  "tasks": [
      "state": "RUNNING",
      "id": 0,
      "worker_id": ""

Because of the rebalancing that happens between worker tasks every time a connector is loaded, a call to confluent status <connector-name> might not succeed immediately after a new connector is loaded. Once rebalancing completes, such calls will be able to return the actual status of a connector.

4.2.3 - List - Predefined Connectors

Predefined Connectors are just properties file in /etc/kafka

With the cli:

confluent list connectors
Bundled Predefined Connectors (edit configuration under etc/):
  s3-sink - Running connectors
confluent status connectors>

4.2.4 - Unload / Stop ?

confluent unload <connector-name>

5 - Documentation / Reference