Kafka - Ksql

> Data Integration Tool (ETL/ELT) > Kafka (Event Hub)

1 - About

Ksql is the Streaming SQL of Kafka.

You put your Ksql in a file and execute it as an application through KSQL

With KSQL, you can read data as either:

  • a stream, where every update is independent of all others,
  • or as a table, where every update is probably an update to a previous update into the stream.

Once you have streaming tables, you can join them together or do aggregations on them or at some point in the near future query these tables. As streams come in, the queries either produce more events or update the tables.” Relational databases have a similar concept called materialized views.

Kafka stores its tabular overlay in a distributed RocksDB datastore.

Advertising

3 - Concept

  • A Stream is the raw data
  • A table is a structure where each result will often get updated later by another message with the same key. If you aggregated raw data into a session windows, the results would be emitted as a table.

Joining together pure events (what's happening in the world) & tables (current state of the world) is at the heart of stream processing

4 - Getting Started

4.1 - Prerequisites

  • Docker Compose above 1.9 to avoid ERROR: Interactive mode is not yet supported on Windows.
$ docker-compose version
docker-compose version 1.16.1, build 6d1ac219
docker-py version: 2.5.1
CPython version: 2.7.13
OpenSSL version: OpenSSL 1.0.2j  26 Sep 2016

4.2 - Clone and start

git clone https://github.com/confluentinc/ksql.git
cd ksql/docs/quickstart/
docker-machine ls
NAME      ACTIVE   DRIVER       STATE     URL   SWARM   DOCKER   ERRORS
default            virtualbox   Timeout
  • If the name is the not the same as above, you need to edit the docker-compose.yml file to add or replace moby by your name. Below is an example with default
    extra_hosts:
      - "moby:127.0.0.1"
      - "default:127.0.0.1"
  • Start the services (docker machines).
docker-compose up -d # d to start it as a daemon
  • Listing the services
docker-compose ps
               Name                              Command               State                           Ports
-------------------------------------------------------------------------------------------------------------------------------------
quickstart_kafka_1                    /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
quickstart_ksql-cli_1                 perl -e while(1){ sleep 99 ...   Up
quickstart_ksql-datagen-pageviews_1   bash -c echo Waiting for K ...   Up
quickstart_ksql-datagen-users_1       bash -c echo Waiting for K ...   Up
quickstart_schema-registry_1          /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp
quickstart_zookeeper_1                /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
  • All the confluent command except for ksql are run inside the kafka service with bash
docker-compose exec kafka //bin//bash
[email protected]:/#
Advertising

4.3 - Data Model and Data Generation

The Compose file automatically runs a data generator that continuously produces data to two Kafka topics:

  • pageviews
  • and users
kafka-topics --zookeeper zookeeper:32181 --list | grep -iE 'users|pageviews'
pageviews
users

The topic

  • pageviews has a key that is a mock time stamp and a value that is in DELIMITED format.
  • users has a key that is the user ID and a value that is in Json format.

Pageviews with the standard consumer:

kafka-console-consumer \
    --topic pageviews \
    --bootstrap-server kafka:29092 \
    --from-beginning \
    --max-messages 3 \
    --property print.key=true
1       1508932966835,User_6,Page_17
11      1508932967385,User_9,Page_15
21      1508932967416,User_3,Page_30
Processed a total of 3 messages

4.4 - Starting the Ksql console

  • In another shell:
docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092

where:

  • the first ksql-cli is the host name of the ksql-cli service as seen in the docker-compose.yml file
  • the second ksql-cli is the console
                       ======================================
                       =      _  __ _____  ____  _          =
                       =     | |/ // ____|/ __ \| |         =
                       =     | ' /| (___ | |  | | |         =
                       =     |  <  \___ \| |  | | |         =
                       =     | . \ ____) | |__| | |____     =
                       =     |_|\_\_____/ \___\_\______|    =
                       =                                    =
                       =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>
Advertising

5 - KSQL

5.1 - Create a stream

  • Create a stream
CREATE STREAM pageviews_original (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR) 
WITH (
    kafka_topic='pageviews', 
    value_format='DELIMITED');
 Message
----------------
 Stream created
  • describe it
DESCRIBE pageviews_original;
 Field    | Type           
---------------------------
 ROWTIME  | BIGINT         
 ROWKEY   | VARCHAR(STRING)
 VIEWTIME | BIGINT         
 USERID   | VARCHAR(STRING)
 PAGEID   | VARCHAR(STRING)
  • Shows all stream
 SHOW STREAMS;
 Stream Name        | Kafka Topic | Format
----------------------------------------------
 PAGEVIEWS_ORIGINAL | pageviews   | DELIMITED

5.2 - Create a table

  • Create a table
CREATE TABLE users_original (
    registertime BIGINT, 
    gender VARCHAR, 
    regionid VARCHAR, 
    userid VARCHAR) 
WITH (
    kafka_topic='users', 
    value_format='JSON');
 Message
---------------
 Table created
  • Describe it
DESCRIBE users_original;
 Field        | Type
--------------------------------
 ROWTIME      | BIGINT
 ROWKEY       | VARCHAR(STRING)
 REGISTERTIME | BIGINT
 GENDER       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
 USERID       | VARCHAR(STRING)
  • Shows all tables
SHOW TABLES;
 Table Name     | Kafka Topic | Format | Windowed
--------------------------------------------------
 USERS_ORIGINAL | users       | JSON   | false

5.3 - Query

By default KSQL reads the topics for streams and tables from the latest offset.

ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_49
Page_93
Page_20
  • Without the LIMIT keyword, the SELECT query would run indefinitely until you stop it by pressing <ctrl-c>
SELECT pageid FROM pageviews_original LIMIT 3;
Page_59           
Page_92           
Page_37           
Page_30           
Page_69           
Page_27           
^CQuery terminated

5.4 - Persistent Query

Unlike the non-persistent query above,

  • Results from this query are written to the Kafka topic PAGEVIEWS_FEMALE.
  • Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries.

The query below enriches the pageviews STREAM by doing a LEFT JOIN with the users_original TABLE on the user ID, where a condition is met.

CREATE STREAM pageviews_female
AS
    SELECT
            users_original.userid AS userid
          , pageid
          , regionid
          , gender
        FROM
            pageviews_original
        LEFT JOIN users_original
        ON
            pageviews_original.userid = users_original.userid
        WHERE
            gender = 'FEMALE';
 Message
----------------------------
 Stream created and running
SHOW STREAMS;
 Stream Name        | Kafka Topic      | Format
---------------------------------------------------
 PAGEVIEWS_ORIGINAL | pageviews        | DELIMITED
 PAGEVIEWS_FEMALE   | PAGEVIEWS_FEMALE | DELIMITED
  • See the result. Ctrl+c will terminate the output to the console but not the query.
SELECT * FROM pageviews_female;
1508919557785 | User_9 | User_9 | Page_12 | Region_2 | FEMALE
1508919560087 | User_7 | User_7 | Page_12 | Region_4 | FEMALE
^CQuery terminated

Exiting KSQL does not terminate persistent queries.. They will continuously run as KSQL applications until they are manually terminated.

  • Show
SHOW QUERIES;
 Query ID | Kafka Topic      | Query String
------------------------------------------------------------------------------------------------------------------------------------------------------------
 1        | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM
  • Terminate the query. If you want to see the data flowing in the next section with a standard consumer, don't do that now but at the end.
TERMINATE 1
 Message
-------------------
 Query terminated.

5.5 - Create a persistent query with a window

CREATE TABLE user_timeview AS
SELECT
        userid
      , SUM( viewtime ) AS numusers
    FROM
        pageviews_original
    WINDOW TUMBLING( SIZE 30 SECOND )
    GROUP BY
        userid;
1508933520000 | User_9 : Window{start=1508933520000 end=-} | User_9 | 10562534743056
1508933520000 | User_2 : Window{start=1508933520000 end=-} | User_2 | 9053601210034
1508933520000 | User_5 : Window{start=1508933520000 end=-} | User_5 | 12071468299790
1508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 7544667667015
1508933520000 | User_7 : Window{start=1508933520000 end=-} | User_7 | 9053601208679
1508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 9053601212460
1508933520000 | User_3 : Window{start=1508933520000 end=-} | User_3 | 6035734154953
^CQuery terminated

6 - Seeing the topics created

  • The query1 has created several topics:
    • PAGEVIEWS_FEMALE
    • and all ksql_query_* topcis.
docker-compose exec kafka //bin//bash
 kafka-topics --list --zookeeper zookeeper:32181
PAGEVIEWS_FEMALE                                                                       
__consumer_offsets                                                                     
_confluent-metrics                                                                     
_schemas                                                                               
ksql__commands                                                                         
ksql_query_1-KSTREAM-MAP-0000000011-repartition                                        
ksql_query_1-USERS_ORIGINAL_statestore-changelog                                       
ksql_query_1-USERS_ORIGINAL_statestore-repartition                                     
ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-changelog   
ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-repartition 
pageviews                                                                              
users
  • The PAGEVIEWS_FEMALE topic
kafka-topics --topic PAGEVIEWS_FEMALE --zookeeper zookeeper:32181 --describe
        Topic: PAGEVIEWS_FEMALE Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: PAGEVIEWS_FEMALE Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: PAGEVIEWS_FEMALE Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: PAGEVIEWS_FEMALE Partition: 3    Leader: 1       Replicas: 1     Isr: 1
docker-compose exec kafka //bin//bash
kafka-console-consumer --bootstrap-server localhost:29092 --topic PAGEVIEWS_FEMALE --max-messages 20
User_8,Page_15,Region_5,FEMALE  
User_5,Page_74,Region_3,FEMALE  
User_5,Page_39,Region_3,FEMALE  
User_5,Page_56,Region_3,FEMALE  
User_5,Page_60,Region_3,FEMALE  
User_5,Page_12,Region_3,FEMALE  
User_5,Page_73,Region_3,FEMALE  
User_8,Page_91,Region_2,FEMALE  
User_2,Page_87,Region_8,FEMALE  
User_7,Page_21,Region_3,FEMALE  
User_2,Page_59,Region_8,FEMALE  
User_7,Page_54,Region_9,FEMALE  
User_8,Page_62,Region_6,FEMALE  
User_7,Page_45,Region_9,FEMALE  
User_8,Page_69,Region_6,FEMALE  
User_8,Page_19,Region_6,FEMALE  
User_8,Page_32,Region_6,FEMALE  
User_2,Page_71,Region_8,FEMALE  
User_2,Page_83,Region_8,FEMALE  
User_8,Page_40,Region_1,FEMALE  
Processed a total of 20 messages

7 - Client Server mode

From https://github.com/confluentinc/ksql/issues/272

Start one or more servers using the following command:

$ ./bin/ksql-server-start /path/to/ksql_server1.properties
</code.
A sample properties file is as follows:
<code ini>
ksql.cluster.id=ksql_app_tests
application.id=ksql_app_tests
bootstrap.servers=localhost:9092
listeners=http://localhost:8080

Note that for each server you need a properties file and the listeners value should be unique.

In a new terminal window start a cli and connect to one of the servers by passing it's listener address:

$ ./bin/ksql-cli remote http://localhost:8080

8 - Embedded

9 - Documentation / Reference