Skip to content

Commit

Permalink
Flink SQL splitting tutorial (#1618)
Browse files Browse the repository at this point in the history
* feat: Flink SQL splitting tutorial

* chore: fix typos in Flink SQL tutorials

* fix failing Flink SQL splitting test
  • Loading branch information
davetroiano authored Sep 14, 2023
1 parent 01be037 commit 72d71ec
Show file tree
Hide file tree
Showing 78 changed files with 1,224 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,6 @@ blocks:
- name: Flink SQL test for joins
commands:
- make -C _includes/tutorials/joining-stream-stream/flinksql/code tutorial
- name: Flink SQL test for splitting
commands:
- make -C _includes/tutorials/splitting/flinksql/code tutorial
158 changes: 158 additions & 0 deletions _data/harnesses/splitting/flinksql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
dev:
steps:
- title: Prerequisites
content:
- action: skip
render:
file: shared/markup/dev/docker-prerequisite.adoc

- title: Initialize the project
content:
- action: execute
file: tutorial-steps/dev/init.sh
render:
file: tutorials/splitting/flinksql/markup/dev/init.adoc

- title: Get Confluent Platform
content:
- action: make_file
file: docker-compose.yml
render:
file: tutorials/splitting/flinksql/markup/dev/make-docker-compose.adoc

- action: execute_async
file: tutorial-steps/dev/docker-compose-up.sh
render:
file: tutorials/splitting/flinksql/markup/dev/start-compose.adoc

- action: execute
file: tutorial-steps/dev/wait-for-containers.sh
render:
skip: true

- title: Write the program interactively using the CLI
content:
- action: docker_flinksql_cli_session
container: flink-sql-client
docker_bootup_file: tutorial-steps/dev/start-cli.sh
column_width: 20
render:
file: tutorials/splitting/flinksql/markup/dev/start-cli.adoc
stdin:
- file: tutorial-steps/dev/create-acting-events.sql
render:
file: tutorials/splitting/flinksql/markup/dev/create-acting-events.adoc

- file: tutorial-steps/dev/populate-acting-events.sql
render:
file: tutorials/splitting/flinksql/markup/dev/populate-acting-events.adoc

- file: tutorial-steps/dev/transient-query-drama.sql
render:
file: tutorials/splitting/flinksql/markup/dev/transient-query-drama.adoc

- file: tutorial-steps/dev/transient-query-other.sql
render:
file: tutorials/splitting/flinksql/markup/dev/transient-query-other.adoc

- file: tutorial-steps/dev/create-acting-events-drama.sql
render:
file: tutorials/splitting/flinksql/markup/dev/create-acting-events-drama.adoc

- file: tutorial-steps/dev/create-acting-events-fantasy.sql
render:
file: tutorials/splitting/flinksql/markup/dev/create-acting-events-fantasy.adoc

- file: tutorial-steps/dev/create-acting-events-other.sql
render:
file: tutorials/splitting/flinksql/markup/dev/create-acting-events-other.adoc

stdout:
directory: tutorial-steps/dev/outputs

- title: Validate output
content:
- action: execute
file: tutorial-steps/dev/validate-acting-events-fantasy.sh
stdout: tutorial-steps/dev/outputs/validate-acting-events-fantasy.log
render:
file: tutorials/splitting/flinksql/markup/dev/validate-acting-events-per-genre.adoc

test:
steps:
- title: Decide what testing tools to use
content:
- action: skip
render:
file: tutorials/splitting/flinksql/markup/test/test-architecture.adoc

- title: Create the test skeleton
content:
- action: execute
file: tutorial-steps/test/make-test-dirs.sh
render:
file: tutorials/splitting/flinksql/markup/test/make-test-dirs.adoc

- action: make_file
file: build.gradle
render:
file: tutorials/splitting/flinksql/markup/test/make-build-gradle.adoc

- action: execute
file: tutorial-steps/test/gradle-wrapper.sh
render:
file: tutorials/splitting/flinksql/markup/test/make-gradle-wrapper.adoc

- title: Create SQL resources
content:
- action: make_file
file: src/test/resources/create-acting-events.sql.template
render:
file: tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events.sql.template.adoc

- action: make_file
file: src/test/resources/populate-acting-events.sql
render:
file: tutorials/splitting/flinksql/markup/test/create-resource-populate-acting-events.sql.adoc

- action: make_file
file: src/test/resources/create-acting-events-drama.sql.template
render:
file: tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events-drama.sql.template.adoc

- action: make_file
file: src/test/resources/query-acting-events-drama.sql
render:
file: tutorials/splitting/flinksql/markup/test/create-resource-query-acting-events-drama.sql.adoc

- action: make_file
file: src/test/resources/expected-acting-events-drama.txt
render:
file: tutorials/splitting/flinksql/markup/test/create-resource-expected-acting-events-drama.txt.adoc

- title: Write a test
content:
- action: make_file
file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java
render:
file: tutorials/splitting/flinksql/markup/test/make-test-base.adoc

- action: make_file
file: src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java
render:
file: tutorials/splitting/flinksql/markup/test/make-test.adoc

- title: Invoke the test
content:
- action: execute
file: tutorial-steps/test/invoke-test.sh
render:
file: tutorials/splitting/flinksql/markup/test/invoke-test.adoc

ccloud:
steps:
- title: Run your app to Confluent Cloud
content:
- action: skip
render:
file: shared/markup/ccloud/try-ccloud.adoc
1 change: 1 addition & 0 deletions _data/tutorials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ splitting:
kstreams: enabled
kafka: enabled
confluent: enabled
flinksql: enabled
merging:
title: How to merge many streams into one stream
meta-description: merge many streams into one stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected static String tableauResults(TableResult tableResult) {

// The given table result may come from a table backed by the Kafka or Upsert Kafka connector,
// both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking
// on called to this method, we kick off a thread to kill the underlying job once output has
// on calls to this method, we kick off a thread to kill the underlying job once output has
// been printed.
//
// Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table with the https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/upsert-kafka/[Upsert Kafka] connector and then `INSERT INTO` the table. We use the Upsert Kafka connector because we only care about the most recent aggregate for a given title (the key column). The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `movie_ticket_sales_by_title` table.

Now go ahead and tun the following two commands in your Flink SQL session:
Now go ahead and run the following two commands in your Flink SQL session:
+++++
<pre class="snippet"><code class="sql">{% include_raw tutorials/aggregating-count/flinksql/code/tutorial-steps/dev/create-movie-sales-by-title.sql %}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
This tutorial takes a stream of individual movie ticket sales events and counts the total number of tickets sold per movie. Not all ticket prices are the same (apparently some of these theaters are fancier than others), but the task of the Flink SQL query is just to group and count regardless of ticket price.

This line of Flink SQL DDL creates a table and its underlying Kafka topic to represent the annual sales totals.
Note that we are defining the schema for the table, which includes three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition (the default `num.partitions` configured in the broker), and defines Avro as its data format.
Note that we are defining the schema for the table, which includes three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also specifies the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition (the default `num.partitions` configured in the broker), and defines Avro as its data format.

+++++
<pre class="snippet"><code class="sql">{% include_raw tutorials/aggregating-count/flinksql/code/tutorial-steps/dev/create-movie-ticket-sales.sql %}</code></pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
<pre class="snippet"><code class="java">{% include_raw tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlAggregatingCountTest.java %}</code></pre>
+++++

The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
This tutorial takes a stream of individual movie ticket sales events and counts the total number of tickets sold per movie. Not all ticket prices are the same (apparently some of these theaters are fancier than others), but the task of the ksqlDB query is just to group and count regardless of ticket price.

This line of ksqlDB DDL creates a stream and its underlying Kafka topic to represent the annual sales totals. If the topic already exists, then ksqlDB simply registers is as the source of data underlying the new stream. The stream has three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition, and defines Avro as its data format.
This line of ksqlDB DDL creates a stream and its underlying Kafka topic to represent the annual sales totals. If the topic already exists, then ksqlDB simply registers is as the source of data underlying the new stream. The stream has three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also specifies the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition, and defines Avro as its data format.

+++++
<pre class="snippet"><code class="sql">{% include_raw tutorials/aggregating-count/ksql/code/tutorial-steps/dev/create-movie-ticket-sales.sql %}</code></pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected static String tableauResults(TableResult tableResult) {

// The given table result may come from a table backed by the Kafka or Upsert Kafka connector,
// both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking
// on called to this method, we kick off a thread to kill the underlying job once output has
// on calls to this method, we kick off a thread to kill the underlying job once output has
// been printed.
//
// Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table with the https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/upsert-kafka/[Upsert Kafka] connector and then `INSERT INTO` the table. We use the Upsert Kafka connector because we only care about the most recent aggregates for a given release year (the key column). The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `movie_sales_by_year` table.

Now go ahead and tun the following two commands in your Flink SQL session:
Now go ahead and run the following two commands in your Flink SQL session:
+++++
<pre class="snippet"><code class="sql">{% include_raw tutorials/aggregating-minmax/flinksql/code/tutorial-steps/dev/create-movie-sales-by-year.sql %}</code></pre>
+++++
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
<pre class="snippet"><code class="java">{% include_raw tutorials/aggregating-minmax/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlAggregatingMinMaxTest.java %}</code></pre>
+++++

The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected static String tableauResults(TableResult tableResult) {

// The given table result may come from a table backed by the Kafka or Upsert Kafka connector,
// both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking
// on called to this method, we kick off a thread to kill the underlying job once output has
// on calls to this method, we kick off a thread to kill the underlying job once output has
// been printed.
//
// Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table and then execute an `INSERT INTO` statement to populate the table. The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `shipped_orders` table.

Now go ahead and tun the following two commands in your Flink SQL session:
Now go ahead and run the following two commands in your Flink SQL session:

+++++
<pre class="snippet"><code class="sql">{% include_raw tutorials/joining-stream-stream/flinksql/code/tutorial-steps/dev/create-join-results-table.sql %}</code></pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
<pre class="snippet"><code class="java">{% include_raw tutorials/joining-stream-stream/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlIntervalJoinTest.java %}</code></pre>
+++++

The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected.
7 changes: 7 additions & 0 deletions _includes/tutorials/splitting/flinksql/code/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
tutorial-steps/dev/outputs/

# Ignore Gradle project-specific cache directory
.gradle

# Ignore Gradle build output directory
build
11 changes: 11 additions & 0 deletions _includes/tutorials/splitting/flinksql/code/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
STEPS_DIR := tutorial-steps
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
TEMP_DIR := $(shell mktemp -d)
SEQUENCE := "dev, test, ccloud"

tutorial:
rm -r $(DEV_OUTPUTS_DIR) || true
mkdir $(DEV_OUTPUTS_DIR)
harness-runner ../../../../../_data/harnesses/splitting/flinksql.yml $(TEMP_DIR) $(SEQUENCE)
diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-acting-events-fantasy.log $(DEV_OUTPUTS_DIR)/validate-acting-events-fantasy.log
reset
32 changes: 32 additions & 0 deletions _includes/tutorials/splitting/flinksql/code/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id "java"
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
version = "0.0.1"

repositories {
mavenCentral()
}

dependencies {
testImplementation "com.google.guava:guava:31.1-jre"
testImplementation "junit:junit:4.13.2"
testImplementation 'org.testcontainers:testcontainers:1.17.6'
testImplementation 'org.testcontainers:kafka:1.17.6'
testImplementation "org.apache.flink:flink-sql-connector-kafka:1.16.1"
testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.16.1"
testImplementation "org.apache.flink:flink-test-utils:1.16.1"
testImplementation "org.apache.flink:flink-test-utils-junit:1.16.1"
testImplementation "org.apache.flink:flink-table-api-java-bridge:1.16.1"
testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.1"
testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.1:tests"
testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.16.1"
}
Loading

0 comments on commit 72d71ec

Please sign in to comment.