Streaming ETL demo - Enriching event stream data with CDC data from Oracle, stream into Elasticsearch
This is designed to be run as a step-by-step demo. The ksql-statements.sql
should match those run in this doc end-to-end and in theory you can just run the file, but I have not tested it. PRs welcome for a one-click script that just demonstrates the end-to-end running demo :)
The slides that accompany this demo can be found here: https://speakerdeck.com/rmoff/apache-kafka-and-ksql-in-action-lets-build-a-streaming-data-pipeline
Start the environment
cd docker-compose
./scripts/setup.sh
Optionally, use something like screen
or tmux
to have these both easily to hand. Or multiple Terminal tabs. Whatever works for you :)
-
KSQL CLI:
docker exec -it ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 1 ; done; ksql http://ksql-server:8088'
-
SQL*Plus
docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus Debezium/dbz@localhost:1521/ORCLPDB1'
(the
sleep
is necessary to avoidrlwrap: error: My terminal reports width=0 (is it emacs?) I can’t handle this, sorry!
ref)-
This uses rlwrap, which isn’t in the vanilla Oracle image. To include it in your own image, add this to
OracleDatabase/SingleInstance/dockerfiles/12.2.0.1/Dockerfile
(or use this fork):RUN rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm RUN yum install -y rlwrap
-
-
Launch Reflector 3, mirror phone to Mac
-
Disable other phone notifications (WhatsApp/Messages/other Slack etc)
-
Disable Mac notifications
-
Disable Flux
COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CLUB_STATUS FOR A12
COL EMAIL FOR A30
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS, UPDATE_TS FROM CUSTOMERS;
ID FIRST_NAME LAST_NAME EMAIL CLUB_STATUS UPDATE_TS
---- --------------- --------------- ------------------------------ ------------ ---------------------------------------------------------------------------
1 Rica Blaisdell rblaisdell0@rambler.ru bronze 04-DEC-18 10.56.37.000000 AM
2 Ruthie Brockherst rbrockherst1@ow.ly platinum 04-DEC-18 10.56.37.000000 AM
3 Mariejeanne Cocci mcocci2@techcrunch.com bronze 04-DEC-18 10.56.37.000000 AM
4 Hashim Rumke hrumke3@sohu.com platinum 04-DEC-18 10.56.37.000000 AM
5 Hansiain Coda hcoda4@senate.gov platinum 04-DEC-18 10.56.37.000000 AM
curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
es_sink_ratings-with-customer-data-tsrouter | RUNNING | RUNNING
es_sink_unhappy_platinum_customers | RUNNING | RUNNING
ora-source-jdbc | RUNNING | RUNNING
In KSQL:
ksql> list topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
ora-CUSTOMERS-jdbc | true | 1 | 1 | 1 | 1
connect-status | false | 5 | 1 | 0 | 0
my_connect_configs | false | 1 | 1 | 0 | 0
my_connect_offsets | false | 25 | 1 | 0 | 0
schema-changes.inventory | false | 1 | 1 | 0 | 0
-----------------------------------------------------------------------------------------------------------
Show topic contents
ksql> PRINT 'ora-CUSTOMERS-jdbc' FROM BEGINNING;
Format:AVRO
11/30/18 10:44:27 AM UTC, , {"before": null, "after": {"ID": {"bytes": "\u0001"}, "FIRST_NAME": "Rica", "LAST_NAME": "Blaisdell", "EMAIL": "[email protected]", "GENDER": "Female", "CLUB_STATUS": "bronze", "COMMENTS": "Universal optimal hierarchy", "CREATE_TS": 1543515952219218, "UPDATE_TS": 1543515952219218}, "source": {"version": "0.9.0.Alpha2", "connector": "oracle", "name": "asgard", "ts_ms": 1543574662454, "txId": null, "scn": 1755382, "snapshot": true}, "op": "r", "ts_ms": 1543574662472}
[...]
SET AUTOCOMMIT ON;
INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,EMAIL,CLUB_STATUS) VALUES ('Rick','Astley','[email protected]','Bronze');
CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');
CREATE STREAM POOR_RATINGS AS
SELECT STARS, CHANNEL, MESSAGE FROM RATINGS WHERE STARS<3;
SELECT * FROM POOR_RATINGS LIMIT 5;
DESCRIBE EXTENDED POOR_RATINGS;
-- Inspect raw topic data if you want
-- PRINT 'ora-CUSTOMERS-jdbc' FROM BEGINNING;
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_STREAM_SRC PARTITION BY ID;
SELECT ID, FIRST_NAME, LAST_NAME, CLUB_STATUS FROM CUSTOMERS_STREAM WHERE ID=42 LIMIT 1;
Wait for a moment here; if you run the CTAS immediately after the CSAS it may fail with error Could not fetch the AVRO schema from schema registry. Subject not found.; error code: 40401
. You may also get this error if you have not set 'auto.offset.reset'='earliest' and there is no data flowing into the source CUSTOMERS topic, since no messages will have triggered the target stream to be created. See confluentinc/ksql#713.
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='CUSTOMERS_STREAM', VALUE_FORMAT ='AVRO', KEY='ID');
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS WHERE ID=42;
Here’s the stream - every event, which in this context is every change event on the source database:
ksql> SELECT ID, FIRST_NAME, LAST_NAME, CLUB_STATUS FROM CUSTOMERS_STREAM WHERE ID=42;
42 | Rick | Astley | Bronze
42 | Rick | Astley | Platinum
Here’s the table - the latest value for a given key
ksql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS WHERE ID=42;
42 | Rick | Astley | nevergonna@giveyou.up | Platinum
SELECT R.RATING_ID, R.MESSAGE,
C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.CLUB_STATUS
FROM RATINGS R
INNER JOIN CUSTOMERS C
ON R.USER_ID = C.ID;
524 | Surprisingly good, maybe you are getting your mojo back at long last! | Patti Rosten | silver
525 | meh | Fred Blaisdell | bronze
526 | more peanuts please | Hashim Rumke | platinum
527 | more peanuts please | Laney Toopin | platinum
529 | Exceeded all my expectations. Thank you ! | Ruthie Brockherst | platinum
530 | (expletive deleted) | Brianna Paradise | bronze
…
Persist this stream of data & create stream of unhappy VIPs
CREATE STREAM RATINGS_WITH_CUSTOMER_DATA
WITH (PARTITIONS=1,
KAFKA_TOPIC='ratings-enriched')
AS
SELECT R.RATING_ID, R.MESSAGE, R.STARS, R.CHANNEL,
C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.CLUB_STATUS, C.EMAIL
FROM RATINGS R
INNER JOIN CUSTOMERS C
ON R.USER_ID = C.ID
WHERE C.FIRST_NAME IS NOT NULL;
CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS
WITH (VALUE_FORMAT='JSON', PARTITIONS=1) AS
SELECT FULL_NAME, CLUB_STATUS, EMAIL, STARS, MESSAGE
FROM RATINGS_WITH_CUSTOMER_DATA
WHERE STARS < 3
AND CLUB_STATUS = 'platinum';
The WITH (PARTITIONS=1)
is only necessary if the Elasticsearch connector has already been defined, as it will create the topic before KSQL does, and using a single partition (not 4, as KSQL wants to by default).
Simple aggregations
SELECT FULL_NAME,COUNT(*) FROM RATINGS_WITH_CUSTOMER_DATA WINDOW TUMBLING (SIZE 5 MINUTE) GROUP BY FULL_NAME;
SELECT CLUB_STATUS, COUNT(*) FROM RATINGS_WITH_CUSTOMER_DATA WINDOW TUMBLING (SIZE 5 MINUTE) GROUP BY CLUB_STATUS;
Persist this and show the timestamp:
CREATE TABLE RATINGS_PER_CUSTOMER_PER_MINUTE AS SELECT FULL_NAME,COUNT(*) AS RATINGS_COUNT FROM ratings_with_customer_data WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY FULL_NAME;
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , FULL_NAME, RATINGS_COUNT FROM RATINGS_PER_CUSTOMER_PER_MINUTE;
You’ll need a Slack API key, and create the following file as docker-compose/docker-compose.override.yml
. By default Docker Compose will load this in addition to `docker-compose.yml`. The docker image [source is here](https://github.com/rmoff/kafka-slack-notify-unhappy-users).
---
version: '2'
services:
kafka-slack-notify-unhappy-users:
image: rmoff/kafka-slack-notify-unhappy-users:latest
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- SLACK_API_TOKEN=xxx-token-xxx