Kafka - Ksql
> Data Integration Tool (ETL/ELT) > Kafka (Event Hub)
Table of Contents
1 - About
Ksql is the Streaming SQL of Kafka.
You put your Ksql in a file and execute it as an application through KSQL
With KSQL, you can read data as either:
- a stream, where every update is independent of all others,
- or as a table, where every update is probably an update to a previous update into the stream.
Once you have streaming tables, you can join them together or do aggregations on them or at some point in the near future query these tables. As streams come in, the queries either produce more events or update the tables.” Relational databases have a similar concept called materialized views.
Kafka stores its tabular overlay in a distributed RocksDB datastore.
2 - Articles Related
3 - Concept
- A Stream is the raw data
- A table is a structure where each result will often get updated later by another message with the same key. If you aggregated raw data into a session windows, the results would be emitted as a table.
Joining together pure events (what's happening in the world) & tables (current state of the world) is at the heart of stream processing
4 - Getting Started
4.1 - Prerequisites
- Docker Compose above 1.9 to avoid
ERROR: Interactive mode is not yet supported on Windows
.
$ docker-compose version
docker-compose version 1.16.1, build 6d1ac219 docker-py version: 2.5.1 CPython version: 2.7.13 OpenSSL version: OpenSSL 1.0.2j 26 Sep 2016
4.2 - Clone and start
git clone https://github.com/confluentinc/ksql.git cd ksql/docs/quickstart/
- The docker compose start several machine and have in its configuration file
moby
as the name of the docker host machine.
docker-machine ls
NAME ACTIVE DRIVER STATE URL SWARM DOCKER ERRORS default virtualbox Timeout
- If the name is the not the same as above, you need to edit the docker-compose.yml file to add or replace
moby
by your name. Below is an example withdefault
extra_hosts: - "moby:127.0.0.1" - "default:127.0.0.1"
- Start the services (docker machines).
docker-compose up -d # d to start it as a daemon
- Listing the services
docker-compose ps
Name Command State Ports ------------------------------------------------------------------------------------------------------------------------------------- quickstart_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp quickstart_ksql-cli_1 perl -e while(1){ sleep 99 ... Up quickstart_ksql-datagen-pageviews_1 bash -c echo Waiting for K ... Up quickstart_ksql-datagen-users_1 bash -c echo Waiting for K ... Up quickstart_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp quickstart_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
- All the confluent command except for
ksql
are run inside thekafka
service with bash
docker-compose exec kafka //bin//bash
[email protected]:/#
4.3 - Data Model and Data Generation
The Compose file automatically runs a data generator that continuously produces data to two Kafka topics:
pageviews
- and
users
kafka-topics --zookeeper zookeeper:32181 --list | grep -iE 'users|pageviews'
pageviews users
The topic
pageviews
has a key that is a mock time stamp and a value that is in DELIMITED format.users
has a key that is the user ID and a value that is in Json format.
Pageviews with the standard consumer:
kafka-console-consumer \ --topic pageviews \ --bootstrap-server kafka:29092 \ --from-beginning \ --max-messages 3 \ --property print.key=true
1 1508932966835,User_6,Page_17 11 1508932967385,User_9,Page_15 21 1508932967416,User_3,Page_30 Processed a total of 3 messages
4.4 - Starting the Ksql console
- In another shell:
docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092
where:
- the second
ksql-cli
is the console
====================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ \| | = = | ' /| (___ | | | | | = = | < \___ \| | | | | = = | . \ ____) | |__| | |____ = = |_|\_\_____/ \___\_\______| = = = = Streaming SQL Engine for Kafka = Copyright 2017 Confluent Inc. CLI v0.1, Server v0.1 located at http://localhost:9098 Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
5 - KSQL
5.1 - Create a stream
- Create a stream
CREATE STREAM pageviews_original ( viewtime BIGINT, userid VARCHAR, pageid VARCHAR) WITH ( kafka_topic='pageviews', value_format='DELIMITED');
Message ---------------- Stream created
- describe it
DESCRIBE pageviews_original;
Field | Type --------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) VIEWTIME | BIGINT USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING)
- Shows all stream
SHOW STREAMS;
Stream Name | Kafka Topic | Format ---------------------------------------------- PAGEVIEWS_ORIGINAL | pageviews | DELIMITED
5.2 - Create a table
- Create a table
CREATE TABLE users_original ( registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH ( kafka_topic='users', value_format='JSON');
Message --------------- Table created
- Describe it
DESCRIBE users_original;
Field | Type -------------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) REGISTERTIME | BIGINT GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) USERID | VARCHAR(STRING)
- Shows all tables
SHOW TABLES;
Table Name | Kafka Topic | Format | Windowed -------------------------------------------------- USERS_ORIGINAL | users | JSON | false
5.3 - Query
By default KSQL reads the topics for streams and tables from the latest offset.
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_49 Page_93 Page_20
- Without the
LIMIT
keyword, theSELECT
query would run indefinitely until you stop it by pressing<ctrl-c>
SELECT pageid FROM pageviews_original LIMIT 3;
Page_59 Page_92 Page_37 Page_30 Page_69 Page_27 ^CQuery terminated
5.4 - Persistent Query
Unlike the non-persistent query above,
- Results from this query are written to the Kafka topic
PAGEVIEWS_FEMALE
. - Queries will continuously run as KSQL applications until they are manually terminated. Exiting KSQL does not terminate persistent queries.
The query below enriches the pageviews
STREAM by doing a LEFT JOIN with the users_original TABLE on the user ID, where a condition is met.
CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid , pageid , regionid , gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
Message ---------------------------- Stream created and running
SHOW STREAMS;
Stream Name | Kafka Topic | Format --------------------------------------------------- PAGEVIEWS_ORIGINAL | pageviews | DELIMITED PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | DELIMITED
- See the result.
Ctrl+c
will terminate the output to the console but not the query.
SELECT * FROM pageviews_female;
1508919557785 | User_9 | User_9 | Page_12 | Region_2 | FEMALE 1508919560087 | User_7 | User_7 | Page_12 | Region_4 | FEMALE ^CQuery terminated
Exiting KSQL does not terminate persistent queries.. They will continuously run as KSQL applications until they are manually terminated.
- Show
SHOW QUERIES;
Query ID | Kafka Topic | Query String ------------------------------------------------------------------------------------------------------------------------------------------------------------ 1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM
- Terminate the query. If you want to see the data flowing in the next section with a standard consumer, don't do that now but at the end.
TERMINATE 1
Message ------------------- Query terminated.
5.5 - Create a persistent query with a window
CREATE TABLE user_timeview AS SELECT userid , SUM( viewtime ) AS numusers FROM pageviews_original WINDOW TUMBLING( SIZE 30 SECOND ) GROUP BY userid;
1508933520000 | User_9 : Window{start=1508933520000 end=-} | User_9 | 10562534743056 1508933520000 | User_2 : Window{start=1508933520000 end=-} | User_2 | 9053601210034 1508933520000 | User_5 : Window{start=1508933520000 end=-} | User_5 | 12071468299790 1508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 7544667667015 1508933520000 | User_7 : Window{start=1508933520000 end=-} | User_7 | 9053601208679 1508933520000 | User_6 : Window{start=1508933520000 end=-} | User_6 | 9053601212460 1508933520000 | User_3 : Window{start=1508933520000 end=-} | User_3 | 6035734154953 ^CQuery terminated
6 - Seeing the topics created
- The query1 has created several topics:
- PAGEVIEWS_FEMALE
- and all
ksql_query_*
topcis.
docker-compose exec kafka //bin//bash kafka-topics --list --zookeeper zookeeper:32181
PAGEVIEWS_FEMALE __consumer_offsets _confluent-metrics _schemas ksql__commands ksql_query_1-KSTREAM-MAP-0000000011-repartition ksql_query_1-USERS_ORIGINAL_statestore-changelog ksql_query_1-USERS_ORIGINAL_statestore-repartition ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-changelog ksql_transient_6823967496794798700_1508920727934-USERS_ORIGINAL_statestore-repartition pageviews users
- The PAGEVIEWS_FEMALE topic
kafka-topics --topic PAGEVIEWS_FEMALE --zookeeper zookeeper:32181 --describe
Topic: PAGEVIEWS_FEMALE Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: PAGEVIEWS_FEMALE Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: PAGEVIEWS_FEMALE Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Topic: PAGEVIEWS_FEMALE Partition: 3 Leader: 1 Replicas: 1 Isr: 1
- Get the data with Kafka - Consumer
docker-compose exec kafka //bin//bash kafka-console-consumer --bootstrap-server localhost:29092 --topic PAGEVIEWS_FEMALE --max-messages 20
User_8,Page_15,Region_5,FEMALE User_5,Page_74,Region_3,FEMALE User_5,Page_39,Region_3,FEMALE User_5,Page_56,Region_3,FEMALE User_5,Page_60,Region_3,FEMALE User_5,Page_12,Region_3,FEMALE User_5,Page_73,Region_3,FEMALE User_8,Page_91,Region_2,FEMALE User_2,Page_87,Region_8,FEMALE User_7,Page_21,Region_3,FEMALE User_2,Page_59,Region_8,FEMALE User_7,Page_54,Region_9,FEMALE User_8,Page_62,Region_6,FEMALE User_7,Page_45,Region_9,FEMALE User_8,Page_69,Region_6,FEMALE User_8,Page_19,Region_6,FEMALE User_8,Page_32,Region_6,FEMALE User_2,Page_71,Region_8,FEMALE User_2,Page_83,Region_8,FEMALE User_8,Page_40,Region_1,FEMALE Processed a total of 20 messages
7 - Client Server mode
From https://github.com/confluentinc/ksql/issues/272
Start one or more servers using the following command:
$ ./bin/ksql-server-start /path/to/ksql_server1.properties </code. A sample properties file is as follows: <code ini> ksql.cluster.id=ksql_app_tests application.id=ksql_app_tests bootstrap.servers=localhost:9092 listeners=http://localhost:8080
Note that for each server you need a properties file and the listeners value should be unique.
In a new terminal window start a cli and connect to one of the servers by passing it's listener address:
$ ./bin/ksql-cli remote http://localhost:8080
8 - Embedded
in Stream API ..
9 - Documentation / Reference
- See also a precusor: http://www.landoop.com/kafka/kcql/