forked from confluentinc/examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathksql.commands
47 lines (38 loc) · 4.08 KB
/
ksql.commands
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
--The STREAM and TABLE names are prefixed with `ksql_` to enable you to run this demo
--concurrently with the Kafka Streams Music Demo java application, to avoid conflicting names
--The play-events Kafka topic is a feed of song plays, generated by KafkaMusicExampleDriver
--We need to identify fields in schema, even with Avro, because this is a script (https://github.com/confluentinc/ksql/issues/1031)
CREATE STREAM ksql_playevents (SONG_ID BIGINT, DURATION BIGINT) WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO');
--Filter the play events to only accept events where the duration is >= 30 seconds
CREATE STREAM ksql_playevents_min_duration AS SELECT * FROM ksql_playevents WHERE DURATION > 30000;
--The song-feed Kafka topic contains all of the songs available in the streaming service, generated by KafkaMusicExampleDriver
--This Kafka topic does not have keys but KSQL TABLE requires String keys
--The following three KSQL commands result in a key'd TABLE
CREATE STREAM ksql_songfeed (ID BIGINT, ALBUM VARCHAR, ARTIST VARCHAR, NAME VARCHAR, GENRE VARCHAR) WITH (KAFKA_TOPIC='song-feed', VALUE_FORMAT='AVRO');
CREATE STREAM ksql_songfeedwithkey WITH (KAFKA_TOPIC='KSQL_SONGFEEDWITHKEY', VALUE_FORMAT='AVRO') AS SELECT CAST(ID AS STRING) as ID, ALBUM, ARTIST, NAME, GENRE FROM ksql_songfeed PARTITION BY ID;
CREATE TABLE ksql_songtable (ID VARCHAR, ALBUM VARCHAR, ARTIST VARCHAR, NAME VARCHAR, GENRE VARCHAR) WITH (KAFKA_TOPIC='KSQL_SONGFEEDWITHKEY', VALUE_FORMAT='Avro', KEY='ID');
--Join the plays with song as we will use it later for charting
--Also create a fixed key `KEYCOL` for global view across multiple partitions (https://github.com/confluentinc/ksql/issues/1053)
CREATE STREAM ksql_songplays AS SELECT plays.SONG_ID AS ID, ALBUM, ARTIST, NAME, GENRE, DURATION, 1 AS KEYCOL FROM ksql_playevents_min_duration plays LEFT JOIN ksql_songtable songtable ON plays.SONG_ID = songtable.ID;
--Track song play counts in 30 second intervals
CREATE TABLE ksql_songplaycounts30 AS SELECT ID, NAME, GENRE, KEYCOL, COUNT(*) AS COUNT FROM ksql_songplays WINDOW TUMBLING (size 30 second) GROUP BY ID, NAME, GENRE, KEYCOL;
--Convert TABLE to STREAM
CREATE STREAM ksql_songplaycounts30stream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, KEYCOL BIGINT, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS30', value_format='AVRO');
--Get all data into a STREAM with a single partition, using the `KEYCOL` field described earlier
CREATE STREAM ksql_songplaycounts30streampart AS SELECT * FROM ksql_songplaycounts30stream WHERE ROWTIME is not null PARTITION BY KEYCOL;
--Track song play counts for all time
CREATE TABLE ksql_songplaycounts AS SELECT ID, NAME, GENRE, KEYCOL, COUNT(*) AS COUNT FROM ksql_songplays GROUP BY ID, NAME, GENRE, KEYCOL;
--Convert TABLE to STREAM
CREATE STREAM ksql_songplaycountsstream (ID BIGINT, NAME VARCHAR, GENRE VARCHAR, KEYCOL BIGINT, COUNT BIGINT) WITH (kafka_topic='KSQL_SONGPLAYCOUNTS', value_format='AVRO');
--Get all data into a STREAM with a single partition, using the `KEYCOL` field described earlier
CREATE STREAM ksql_songplaycountsstreampart AS SELECT * FROM ksql_songplaycountsstream WHERE ROWTIME is not null PARTITION BY KEYCOL;
--Top Five song counts for all time based on ksql_songplaycountsstreampart
--At this time, `TOPK` does not support sorting by one column and selecting the value of another column (https://github.com/confluentinc/ksql/issues/403)
--So the results are just counts but not names of the songs associated with the counts
CREATE TABLE ksql_top5 AS SELECT KEYCOL, TOPK(COUNT,5) FROM ksql_songplaycountsstreampart GROUP BY KEYCOL;
--Top Five songs for each genre based on each WINDOW of ksql_songplaycounts
CREATE TABLE ksql_top5bygenre AS SELECT GENRE, TOPK(COUNT,5) FROM ksql_songplaycountsstreampart GROUP BY GENRE;
--There is no automatic way to micro-batch to get one "final" count per window (https://github.com/confluentinc/ksql/issues/1030)
--However, for windows that have closed, assuming no late-arriving data, the results will be final
--e.g. if you know that window time 1521807660000 has closed, you can get the final counts
--SELECT * FROM KSQL_SONGPLAYCOUNTS30 WHERE ROWTIME = 1521807660000;