diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index b0fa446e76..589e9329bb 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -493,3 +493,6 @@ blocks: - name: Flink SQL test for joins commands: - make -C _includes/tutorials/joining-stream-stream/flinksql/code tutorial + - name: Flink SQL test for splitting + commands: + - make -C _includes/tutorials/splitting/flinksql/code tutorial diff --git a/_data/harnesses/splitting/flinksql.yml b/_data/harnesses/splitting/flinksql.yml new file mode 100644 index 0000000000..d61e302aa8 --- /dev/null +++ b/_data/harnesses/splitting/flinksql.yml @@ -0,0 +1,158 @@ +dev: + steps: + - title: Prerequisites + content: + - action: skip + render: + file: shared/markup/dev/docker-prerequisite.adoc + + - title: Initialize the project + content: + - action: execute + file: tutorial-steps/dev/init.sh + render: + file: tutorials/splitting/flinksql/markup/dev/init.adoc + + - title: Get Confluent Platform + content: + - action: make_file + file: docker-compose.yml + render: + file: tutorials/splitting/flinksql/markup/dev/make-docker-compose.adoc + + - action: execute_async + file: tutorial-steps/dev/docker-compose-up.sh + render: + file: tutorials/splitting/flinksql/markup/dev/start-compose.adoc + + - action: execute + file: tutorial-steps/dev/wait-for-containers.sh + render: + skip: true + + - title: Write the program interactively using the CLI + content: + - action: docker_flinksql_cli_session + container: flink-sql-client + docker_bootup_file: tutorial-steps/dev/start-cli.sh + column_width: 20 + render: + file: tutorials/splitting/flinksql/markup/dev/start-cli.adoc + stdin: + - file: tutorial-steps/dev/create-acting-events.sql + render: + file: tutorials/splitting/flinksql/markup/dev/create-acting-events.adoc + + - file: tutorial-steps/dev/populate-acting-events.sql + render: + file: tutorials/splitting/flinksql/markup/dev/populate-acting-events.adoc + + - file: tutorial-steps/dev/transient-query-drama.sql + render: + file: tutorials/splitting/flinksql/markup/dev/transient-query-drama.adoc + + - file: tutorial-steps/dev/transient-query-other.sql + render: + file: tutorials/splitting/flinksql/markup/dev/transient-query-other.adoc + + - file: tutorial-steps/dev/create-acting-events-drama.sql + render: + file: tutorials/splitting/flinksql/markup/dev/create-acting-events-drama.adoc + + - file: tutorial-steps/dev/create-acting-events-fantasy.sql + render: + file: tutorials/splitting/flinksql/markup/dev/create-acting-events-fantasy.adoc + + - file: tutorial-steps/dev/create-acting-events-other.sql + render: + file: tutorials/splitting/flinksql/markup/dev/create-acting-events-other.adoc + + stdout: + directory: tutorial-steps/dev/outputs + + - title: Validate output + content: + - action: execute + file: tutorial-steps/dev/validate-acting-events-fantasy.sh + stdout: tutorial-steps/dev/outputs/validate-acting-events-fantasy.log + render: + file: tutorials/splitting/flinksql/markup/dev/validate-acting-events-per-genre.adoc + +test: + steps: + - title: Decide what testing tools to use + content: + - action: skip + render: + file: tutorials/splitting/flinksql/markup/test/test-architecture.adoc + + - title: Create the test skeleton + content: + - action: execute + file: tutorial-steps/test/make-test-dirs.sh + render: + file: tutorials/splitting/flinksql/markup/test/make-test-dirs.adoc + + - action: make_file + file: build.gradle + render: + file: tutorials/splitting/flinksql/markup/test/make-build-gradle.adoc + + - action: execute + file: tutorial-steps/test/gradle-wrapper.sh + render: + file: tutorials/splitting/flinksql/markup/test/make-gradle-wrapper.adoc + + - title: Create SQL resources + content: + - action: make_file + file: src/test/resources/create-acting-events.sql.template + render: + file: tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events.sql.template.adoc + + - action: make_file + file: src/test/resources/populate-acting-events.sql + render: + file: tutorials/splitting/flinksql/markup/test/create-resource-populate-acting-events.sql.adoc + + - action: make_file + file: src/test/resources/create-acting-events-drama.sql.template + render: + file: tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events-drama.sql.template.adoc + + - action: make_file + file: src/test/resources/query-acting-events-drama.sql + render: + file: tutorials/splitting/flinksql/markup/test/create-resource-query-acting-events-drama.sql.adoc + + - action: make_file + file: src/test/resources/expected-acting-events-drama.txt + render: + file: tutorials/splitting/flinksql/markup/test/create-resource-expected-acting-events-drama.txt.adoc + + - title: Write a test + content: + - action: make_file + file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java + render: + file: tutorials/splitting/flinksql/markup/test/make-test-base.adoc + + - action: make_file + file: src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java + render: + file: tutorials/splitting/flinksql/markup/test/make-test.adoc + + - title: Invoke the test + content: + - action: execute + file: tutorial-steps/test/invoke-test.sh + render: + file: tutorials/splitting/flinksql/markup/test/invoke-test.adoc + +ccloud: + steps: + - title: Run your app to Confluent Cloud + content: + - action: skip + render: + file: shared/markup/ccloud/try-ccloud.adoc diff --git a/_data/tutorials.yaml b/_data/tutorials.yaml index 5b390c5abf..3bb712bdb2 100644 --- a/_data/tutorials.yaml +++ b/_data/tutorials.yaml @@ -46,6 +46,7 @@ splitting: kstreams: enabled kafka: enabled confluent: enabled + flinksql: enabled merging: title: How to merge many streams into one stream meta-description: merge many streams into one stream diff --git a/_includes/tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java b/_includes/tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java index 878611a46f..7b7c718004 100644 --- a/_includes/tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java +++ b/_includes/tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java @@ -141,7 +141,7 @@ protected static String tableauResults(TableResult tableResult) { // The given table result may come from a table backed by the Kafka or Upsert Kafka connector, // both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking - // on called to this method, we kick off a thread to kill the underlying job once output has + // on calls to this method, we kick off a thread to kill the underlying job once output has // been printed. // // Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate diff --git a/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-sales-by-title.adoc b/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-sales-by-title.adoc index d232edf30f..9b934228b9 100644 --- a/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-sales-by-title.adoc +++ b/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-sales-by-title.adoc @@ -1,6 +1,6 @@ Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table with the https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/upsert-kafka/[Upsert Kafka] connector and then `INSERT INTO` the table. We use the Upsert Kafka connector because we only care about the most recent aggregate for a given title (the key column). The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `movie_ticket_sales_by_title` table. -Now go ahead and tun the following two commands in your Flink SQL session: +Now go ahead and run the following two commands in your Flink SQL session: +++++
{% include_raw tutorials/aggregating-count/flinksql/code/tutorial-steps/dev/create-movie-sales-by-title.sql %}
+++++ diff --git a/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-ticket-sales.adoc b/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-ticket-sales.adoc index 8c7c5d45c1..b4feb96ce0 100644 --- a/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-ticket-sales.adoc +++ b/_includes/tutorials/aggregating-count/flinksql/markup/dev/create-movie-ticket-sales.adoc @@ -1,7 +1,7 @@ This tutorial takes a stream of individual movie ticket sales events and counts the total number of tickets sold per movie. Not all ticket prices are the same (apparently some of these theaters are fancier than others), but the task of the Flink SQL query is just to group and count regardless of ticket price. This line of Flink SQL DDL creates a table and its underlying Kafka topic to represent the annual sales totals. -Note that we are defining the schema for the table, which includes three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition (the default `num.partitions` configured in the broker), and defines Avro as its data format. +Note that we are defining the schema for the table, which includes three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also specifies the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition (the default `num.partitions` configured in the broker), and defines Avro as its data format. +++++
{% include_raw tutorials/aggregating-count/flinksql/code/tutorial-steps/dev/create-movie-ticket-sales.sql %}
diff --git a/_includes/tutorials/aggregating-count/flinksql/markup/test/make-test.adoc b/_includes/tutorials/aggregating-count/flinksql/markup/test/make-test.adoc index 52ddbf73ea..83ef7cf687 100644 --- a/_includes/tutorials/aggregating-count/flinksql/markup/test/make-test.adoc +++ b/_includes/tutorials/aggregating-count/flinksql/markup/test/make-test.adoc @@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
{% include_raw tutorials/aggregating-count/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlAggregatingCountTest.java %}
+++++ -The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. +The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. diff --git a/_includes/tutorials/aggregating-count/ksql/markup/dev/create-movie-ticket-sales.adoc b/_includes/tutorials/aggregating-count/ksql/markup/dev/create-movie-ticket-sales.adoc index 17274213d2..667e27bc06 100644 --- a/_includes/tutorials/aggregating-count/ksql/markup/dev/create-movie-ticket-sales.adoc +++ b/_includes/tutorials/aggregating-count/ksql/markup/dev/create-movie-ticket-sales.adoc @@ -1,6 +1,6 @@ This tutorial takes a stream of individual movie ticket sales events and counts the total number of tickets sold per movie. Not all ticket prices are the same (apparently some of these theaters are fancier than others), but the task of the ksqlDB query is just to group and count regardless of ticket price. -This line of ksqlDB DDL creates a stream and its underlying Kafka topic to represent the annual sales totals. If the topic already exists, then ksqlDB simply registers is as the source of data underlying the new stream. The stream has three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition, and defines Avro as its data format. +This line of ksqlDB DDL creates a stream and its underlying Kafka topic to represent the annual sales totals. If the topic already exists, then ksqlDB simply registers is as the source of data underlying the new stream. The stream has three fields: `title`, the name of the movie; `sale_ts`, the time at which the ticket was sold; and `ticket_total_value`, the price paid for the ticket. The statement also specifies the underlying Kafka topic as `movie-ticket-sales`, that it should have a single partition, and defines Avro as its data format. +++++
{% include_raw tutorials/aggregating-count/ksql/code/tutorial-steps/dev/create-movie-ticket-sales.sql %}
diff --git a/_includes/tutorials/aggregating-minmax/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java b/_includes/tutorials/aggregating-minmax/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java index 878611a46f..7b7c718004 100644 --- a/_includes/tutorials/aggregating-minmax/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java +++ b/_includes/tutorials/aggregating-minmax/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java @@ -141,7 +141,7 @@ protected static String tableauResults(TableResult tableResult) { // The given table result may come from a table backed by the Kafka or Upsert Kafka connector, // both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking - // on called to this method, we kick off a thread to kill the underlying job once output has + // on calls to this method, we kick off a thread to kill the underlying job once output has // been printed. // // Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate diff --git a/_includes/tutorials/aggregating-minmax/flinksql/markup/dev/create-movie-sales-by-year.adoc b/_includes/tutorials/aggregating-minmax/flinksql/markup/dev/create-movie-sales-by-year.adoc index 747dd87dcb..0edb7f9d5d 100644 --- a/_includes/tutorials/aggregating-minmax/flinksql/markup/dev/create-movie-sales-by-year.adoc +++ b/_includes/tutorials/aggregating-minmax/flinksql/markup/dev/create-movie-sales-by-year.adoc @@ -1,6 +1,6 @@ Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table with the https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/upsert-kafka/[Upsert Kafka] connector and then `INSERT INTO` the table. We use the Upsert Kafka connector because we only care about the most recent aggregates for a given release year (the key column). The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `movie_sales_by_year` table. -Now go ahead and tun the following two commands in your Flink SQL session: +Now go ahead and run the following two commands in your Flink SQL session: +++++
{% include_raw tutorials/aggregating-minmax/flinksql/code/tutorial-steps/dev/create-movie-sales-by-year.sql %}
+++++ diff --git a/_includes/tutorials/aggregating-minmax/flinksql/markup/test/make-test.adoc b/_includes/tutorials/aggregating-minmax/flinksql/markup/test/make-test.adoc index cab90f0f46..9b1ebfa441 100644 --- a/_includes/tutorials/aggregating-minmax/flinksql/markup/test/make-test.adoc +++ b/_includes/tutorials/aggregating-minmax/flinksql/markup/test/make-test.adoc @@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
{% include_raw tutorials/aggregating-minmax/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlAggregatingMinMaxTest.java %}
+++++ -The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. \ No newline at end of file +The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. \ No newline at end of file diff --git a/_includes/tutorials/joining-stream-stream/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java b/_includes/tutorials/joining-stream-stream/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java index fd43a7fd2d..2e42bbbb5a 100644 --- a/_includes/tutorials/joining-stream-stream/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java +++ b/_includes/tutorials/joining-stream-stream/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java @@ -147,7 +147,7 @@ protected static String tableauResults(TableResult tableResult) { // The given table result may come from a table backed by the Kafka or Upsert Kafka connector, // both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking - // on called to this method, we kick off a thread to kill the underlying job once output has + // on calls to this method, we kick off a thread to kill the underlying job once output has // been printed. // // Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate diff --git a/_includes/tutorials/joining-stream-stream/flinksql/markup/dev/populate-shipped-orders-table.adoc b/_includes/tutorials/joining-stream-stream/flinksql/markup/dev/populate-shipped-orders-table.adoc index d11c490722..2f98571344 100644 --- a/_includes/tutorials/joining-stream-stream/flinksql/markup/dev/populate-shipped-orders-table.adoc +++ b/_includes/tutorials/joining-stream-stream/flinksql/markup/dev/populate-shipped-orders-table.adoc @@ -1,6 +1,6 @@ Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table and then execute an `INSERT INTO` statement to populate the table. The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `shipped_orders` table. -Now go ahead and tun the following two commands in your Flink SQL session: +Now go ahead and run the following two commands in your Flink SQL session: +++++
{% include_raw tutorials/joining-stream-stream/flinksql/code/tutorial-steps/dev/create-join-results-table.sql %}
diff --git a/_includes/tutorials/joining-stream-stream/flinksql/markup/test/make-test.adoc b/_includes/tutorials/joining-stream-stream/flinksql/markup/test/make-test.adoc index b864f175c6..6b3432dce8 100644 --- a/_includes/tutorials/joining-stream-stream/flinksql/markup/test/make-test.adoc +++ b/_includes/tutorials/joining-stream-stream/flinksql/markup/test/make-test.adoc @@ -4,4 +4,4 @@ Next, create the test implementation at `src/test/java/io/confluent/developer/Fl
{% include_raw tutorials/joining-stream-stream/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlIntervalJoinTest.java %}
+++++ -The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. +The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. diff --git a/_includes/tutorials/splitting/flinksql/code/.gitignore b/_includes/tutorials/splitting/flinksql/code/.gitignore new file mode 100644 index 0000000000..c1a4a4ee90 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/.gitignore @@ -0,0 +1,7 @@ +tutorial-steps/dev/outputs/ + +# Ignore Gradle project-specific cache directory +.gradle + +# Ignore Gradle build output directory +build diff --git a/_includes/tutorials/splitting/flinksql/code/Makefile b/_includes/tutorials/splitting/flinksql/code/Makefile new file mode 100644 index 0000000000..b15a645a54 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/Makefile @@ -0,0 +1,11 @@ +STEPS_DIR := tutorial-steps +DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs +TEMP_DIR := $(shell mktemp -d) +SEQUENCE := "dev, test, ccloud" + +tutorial: + rm -r $(DEV_OUTPUTS_DIR) || true + mkdir $(DEV_OUTPUTS_DIR) + harness-runner ../../../../../_data/harnesses/splitting/flinksql.yml $(TEMP_DIR) $(SEQUENCE) + diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-acting-events-fantasy.log $(DEV_OUTPUTS_DIR)/validate-acting-events-fantasy.log + reset diff --git a/_includes/tutorials/splitting/flinksql/code/build.gradle b/_includes/tutorials/splitting/flinksql/code/build.gradle new file mode 100644 index 0000000000..8db1441685 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/build.gradle @@ -0,0 +1,32 @@ +buildscript { + repositories { + mavenCentral() + } +} + +plugins { + id "java" +} + +sourceCompatibility = JavaVersion.VERSION_11 +targetCompatibility = JavaVersion.VERSION_11 +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation "com.google.guava:guava:31.1-jre" + testImplementation "junit:junit:4.13.2" + testImplementation 'org.testcontainers:testcontainers:1.17.6' + testImplementation 'org.testcontainers:kafka:1.17.6' + testImplementation "org.apache.flink:flink-sql-connector-kafka:1.16.1" + testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.16.1" + testImplementation "org.apache.flink:flink-test-utils:1.16.1" + testImplementation "org.apache.flink:flink-test-utils-junit:1.16.1" + testImplementation "org.apache.flink:flink-table-api-java-bridge:1.16.1" + testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.1" + testImplementation "org.apache.flink:flink-table-planner_2.12:1.16.1:tests" + testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.16.1" +} diff --git a/_includes/tutorials/splitting/flinksql/code/docker-compose.yml b/_includes/tutorials/splitting/flinksql/code/docker-compose.yml new file mode 100644 index 0000000000..1d50e55115 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/docker-compose.yml @@ -0,0 +1,70 @@ +version: '2' +services: + broker: + image: confluentinc/cp-kafka:7.4.1 + hostname: broker + container_name: broker + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 + KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + schema-registry: + image: confluentinc/cp-schema-registry:7.3.0 + hostname: schema-registry + container_name: schema-registry + depends_on: + - broker + ports: + - 8081:8081 + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092 + flink-sql-client: + image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11 + hostname: flink-sql-client + container_name: flink-sql-client + depends_on: + - flink-jobmanager + environment: + FLINK_JOBMANAGER_HOST: flink-jobmanager + volumes: + - ./settings/:/settings + flink-jobmanager: + image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11 + hostname: flink-jobmanager + container_name: flink-jobmanager + ports: + - 9081:9081 + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: flink-jobmanager + rest.bind-port: 9081 + flink-taskmanager: + image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11 + hostname: flink-taskmanager + container_name: flink-taskmanager + depends_on: + - flink-jobmanager + command: taskmanager + scale: 1 + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: flink-jobmanager + taskmanager.numberOfTaskSlots: 10 diff --git a/_includes/tutorials/splitting/flinksql/code/gradle/wrapper/gradle-wrapper.jar b/_includes/tutorials/splitting/flinksql/code/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000000..249e5832f0 Binary files /dev/null and b/_includes/tutorials/splitting/flinksql/code/gradle/wrapper/gradle-wrapper.jar differ diff --git a/_includes/tutorials/splitting/flinksql/code/gradle/wrapper/gradle-wrapper.properties b/_includes/tutorials/splitting/flinksql/code/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000000..ae04661ee7 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/_includes/tutorials/splitting/flinksql/code/gradlew b/_includes/tutorials/splitting/flinksql/code/gradlew new file mode 100755 index 0000000000..a69d9cb6c2 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/gradlew @@ -0,0 +1,240 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/_includes/tutorials/splitting/flinksql/code/gradlew.bat b/_includes/tutorials/splitting/flinksql/code/gradlew.bat new file mode 100644 index 0000000000..f127cfd49d --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/gradlew.bat @@ -0,0 +1,91 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/_includes/tutorials/splitting/flinksql/code/settings.gradle b/_includes/tutorials/splitting/flinksql/code/settings.gradle new file mode 100644 index 0000000000..7cc1ead975 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html + */ + +rootProject.name = 'aggregating-count-flinksql' diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java b/_includes/tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java new file mode 100644 index 0000000000..d6cc249f95 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java @@ -0,0 +1,175 @@ +package io.confluent.developer; + + +import com.google.common.io.Resources; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.client.JobCancellationException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.assertj.core.util.Sets; +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; + +import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT; + +/** + * Base class for Flink SQL integration tests that use Flink's Kafka connectors. Encapsulates + * Kafka broker and Schema Registry Testcontainer management and includes utility methods for + * dynamically configuring Flink SQL Kafka connectors and processing Table API results. + */ +public class AbstractFlinkKafkaTest { + + protected static StreamTableEnvironment streamTableEnv; + protected static Integer schemaRegistryPort, kafkaPort; + + @BeforeClass + public static void setup() { + // create Flink table environment that test subclasses will use to execute SQL statements + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setStateBackend(new EmbeddedRocksDBStateBackend()); + streamTableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + + // Start Kafka and Schema Registry Testcontainers. Set the exposed ports that test subclasses + // can use to dynamically configure Kafka connectors. Schema Registry enables connectors to + // be configured with 'value.format' = 'avro-confluent' + Network network = Network.newNetwork(); + + KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.1")) + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1") + .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "500") + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") + .withReuse(true) + .withNetwork(network); + kafka.start(); + kafkaPort = kafka.getMappedPort(KAFKA_PORT); + + GenericContainer schemaRegistry = new GenericContainer(DockerImageName.parse("confluentinc/cp-schema-registry:7.4.1")) + .withExposedPorts(8081) + .withNetwork(kafka.getNetwork()) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost") + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + kafka.getNetworkAliases().get(0) + ":9092"); + schemaRegistry.start(); + schemaRegistryPort = schemaRegistry.getMappedPort(8081); + } + + /** + * Given a resource filename and optional Kafka / Schema Registry ports, return the resource + * file contents as a String with ports substituted for KAFKA_PORT and SCHEMA_REGISTRY_PORT + * placeholders. + * + * @param resourceFileName the resource file name + * @param kafkaPort the port that Kafka broker exposes + * @param schemaRegistryPort the port that Schema Registry exposes + * @return resource file contents with port values substituted for placeholders + * @throws IOException if resource file can't be read + */ + protected static String getResourceFileContents( + String resourceFileName, + Optional kafkaPort, + Optional schemaRegistryPort + ) throws IOException { + URL url = Resources.getResource(resourceFileName); + String contents = Resources.toString(url, StandardCharsets.UTF_8); + if (kafkaPort.isPresent()) { + contents = contents.replaceAll("KAFKA_PORT", kafkaPort.get().toString()); + } + if (schemaRegistryPort.isPresent()) { + contents = contents.replaceAll("SCHEMA_REGISTRY_PORT", schemaRegistryPort.get().toString()); + } + return contents; + } + + /** + * Given a resource filename, return the resource file contents as a String. + * + * @param resourceFileName the resource file name + * @return resource file contents + * @throws IOException if resource file can't be read + */ + protected static String getResourceFileContents( + String resourceFileName + ) throws IOException { + // no Kafka / Schema Registry ports + return getResourceFileContents(resourceFileName, Optional.empty(), Optional.empty()); + } + + /** + * Utility method to convert a String containing multiple lines into a set of String's where + * each String is one line. This is useful for creating Flink SQL integration tests based on + * the tableau results printed via the Table API where the order of results is nondeterministic. + * + * @param s multiline String + * @return set of String's where each member is one line + */ + protected static Set stringToLineSet(String s) { + return Sets.newHashSet(Arrays.asList(s.split("\\r?\\n"))); + } + + /** + * Given a Flink Table API `TableResult` respresenting a SELECT statement result, + * capture and return the statement's tableau results. + * + * @param tableResult Flink Table API `TableResult` respresenting a SELECT statement result + * @return the SELECT statement's tableau results + */ + protected static String tableauResults(TableResult tableResult) { + // capture tableau results printed to stdout in a String + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + // The given table result may come from a table backed by the Kafka or Upsert Kafka connector, + // both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking + // on calls to this method, we kick off a thread to kill the underlying job once output has + // been printed. + // + // Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate + // the need to do this. However, the Upsert Kafka connector will still be unbounded. + new Thread(() -> { + while (0 == baos.size()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // do nothing; keep waiting + } + } + tableResult.getJobClient().get().cancel(); + }).start(); + + try { + tableResult.print(); + } catch (RuntimeException rte) { + if (ExceptionUtils.indexOfThrowable(rte, JobCancellationException.class) != -1) { + // a JobCancellationException in the exception stack is expected due to delayed + // job cancellation in separate thread; do nothing + } else { + rte.printStackTrace(); + System.exit(1); + } + } + System.setOut(System.out); + return baos.toString(); + } + +} diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java b/_includes/tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java new file mode 100644 index 0000000000..069147b45d --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java @@ -0,0 +1,32 @@ +package io.confluent.developer; + + +import org.apache.flink.table.api.TableResult; +import org.junit.Test; + +import java.util.Optional; + +import static org.junit.Assert.assertEquals; + +public class FlinkSqlSplitStreamTest extends AbstractFlinkKafkaTest { + + @Test + public void simpleSelect() throws Exception { + // create base acting events table and aggregation table, and populate with test data + streamTableEnv.executeSql(getResourceFileContents("create-acting-events.sql.template", + Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await(); + streamTableEnv.executeSql(getResourceFileContents("populate-acting-events.sql")); + streamTableEnv.executeSql(getResourceFileContents("create-acting-events-drama.sql.template", + Optional.of(kafkaPort), Optional.of(schemaRegistryPort))); + + // execute query on result table that should have drama acting events + TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-acting-events-drama.sql")); + + // Compare actual and expected results. Convert result output to line sets to compare so that order + // doesn't matter. + String actualTableauResults = tableauResults(tableResult); + String expectedTableauResults = getResourceFileContents("expected-acting-events-drama.txt"); + assertEquals(stringToLineSet(actualTableauResults), stringToLineSet(expectedTableauResults)); + } + +} diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/resources/create-acting-events-drama.sql.template b/_includes/tutorials/splitting/flinksql/code/src/test/resources/create-acting-events-drama.sql.template new file mode 100644 index 0000000000..3f484fc575 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/resources/create-acting-events-drama.sql.template @@ -0,0 +1,16 @@ +CREATE TABLE acting_events_drama +WITH ( + 'connector' = 'kafka', + 'topic' = 'acting-events-drama', + 'properties.bootstrap.servers' = 'localhost:KAFKA_PORT', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'avro-confluent', + 'key.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT', + 'key.fields' = 'name;title', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT', + 'value.fields-include' = 'ALL' +) AS + SELECT name, title + FROM acting_events + WHERE genre = 'drama'; diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/resources/create-acting-events.sql.template b/_includes/tutorials/splitting/flinksql/code/src/test/resources/create-acting-events.sql.template new file mode 100644 index 0000000000..d7682a8a50 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/resources/create-acting-events.sql.template @@ -0,0 +1,16 @@ +CREATE TABLE acting_events ( + name STRING, + title STRING, + genre STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'acting-events', + 'properties.bootstrap.servers' = 'localhost:KAFKA_PORT', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'avro-confluent', + 'key.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT', + 'key.fields' = 'name;title', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://localhost:SCHEMA_REGISTRY_PORT', + 'value.fields-include' = 'ALL' +); diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/resources/expected-acting-events-drama.txt b/_includes/tutorials/splitting/flinksql/code/src/test/resources/expected-acting-events-drama.txt new file mode 100644 index 0000000000..d487e79a64 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/resources/expected-acting-events-drama.txt @@ -0,0 +1,6 @@ ++----+--------------------------------+--------------------------------+ +| op | name | title | ++----+--------------------------------+--------------------------------+ +| +I | Matt Damon | The Martian | +| +I | Meryl Streep | The Iron Lady | +| +I | Russell Crowe | Gladiator | diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/resources/populate-acting-events.sql b/_includes/tutorials/splitting/flinksql/code/src/test/resources/populate-acting-events.sql new file mode 100644 index 0000000000..65cf9c8f7d --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/resources/populate-acting-events.sql @@ -0,0 +1,12 @@ +INSERT INTO acting_events VALUES + ('Bill Murray', 'Ghostbusters', 'fantasy'), + ('Christian Bale', 'The Dark Knight', 'crime'), + ('Diane Keaton', 'The Godfather: Part II', 'crime'), + ('Jennifer Aniston', 'Office Space', 'comedy'), + ('Judy Garland', 'The Wizard of Oz', 'fantasy'), + ('Keanu Reeves', 'The Matrix', 'fantasy'), + ('Laura Dern', 'Jurassic Park', 'fantasy'), + ('Matt Damon', 'The Martian', 'drama'), + ('Meryl Streep', 'The Iron Lady', 'drama'), + ('Russell Crowe', 'Gladiator', 'drama'), + ('Will Smith', 'Men in Black', 'comedy'); diff --git a/_includes/tutorials/splitting/flinksql/code/src/test/resources/query-acting-events-drama.sql b/_includes/tutorials/splitting/flinksql/code/src/test/resources/query-acting-events-drama.sql new file mode 100644 index 0000000000..980de85e6c --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/src/test/resources/query-acting-events-drama.sql @@ -0,0 +1 @@ +SELECT * FROM acting_events_drama; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/clean-up.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/clean-up.sh new file mode 100644 index 0000000000..36f5aa9872 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/clean-up.sh @@ -0,0 +1 @@ +docker compose down diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-drama.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-drama.sql new file mode 100644 index 0000000000..1c3dbf07c0 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-drama.sql @@ -0,0 +1,16 @@ +CREATE TABLE acting_events_drama +WITH ( + 'connector' = 'kafka', + 'topic' = 'acting-events-drama', + 'properties.bootstrap.servers' = 'broker:9092', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'avro-confluent', + 'key.avro-confluent.url' = 'http://schema-registry:8081', + 'key.fields' = 'name;title', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://schema-registry:8081', + 'value.fields-include' = 'ALL' +) AS + SELECT name, title + FROM acting_events + WHERE genre = 'drama'; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-fantasy.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-fantasy.sql new file mode 100644 index 0000000000..8f0e4d30a0 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-fantasy.sql @@ -0,0 +1,16 @@ +CREATE TABLE acting_events_fantasy +WITH ( + 'connector' = 'kafka', + 'topic' = 'acting-events-fantasy', + 'properties.bootstrap.servers' = 'broker:9092', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'avro-confluent', + 'key.avro-confluent.url' = 'http://schema-registry:8081', + 'key.fields' = 'name;title', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://schema-registry:8081', + 'value.fields-include' = 'ALL' +) AS + SELECT name, title + FROM acting_events + WHERE genre = 'fantasy'; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-other.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-other.sql new file mode 100644 index 0000000000..05ba201615 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-other.sql @@ -0,0 +1,16 @@ +CREATE TABLE acting_events_other +WITH ( + 'connector' = 'kafka', + 'topic' = 'acting-events-other', + 'properties.bootstrap.servers' = 'broker:9092', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'avro-confluent', + 'key.avro-confluent.url' = 'http://schema-registry:8081', + 'key.fields' = 'name;title', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://schema-registry:8081', + 'value.fields-include' = 'ALL' +) AS + SELECT name, title + FROM acting_events + WHERE genre <> 'drama' AND genre <> 'fantasy'; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events.sql new file mode 100644 index 0000000000..790d6bc846 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events.sql @@ -0,0 +1,16 @@ +CREATE TABLE acting_events ( + name STRING, + title STRING, + genre STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'acting-events', + 'properties.bootstrap.servers' = 'broker:9092', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'avro-confluent', + 'key.avro-confluent.url' = 'http://schema-registry:8081', + 'key.fields' = 'name;title', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://schema-registry:8081', + 'value.fields-include' = 'ALL' +); diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/docker-compose-up.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/docker-compose-up.sh new file mode 100644 index 0000000000..e92332ec8e --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/docker-compose-up.sh @@ -0,0 +1 @@ +docker compose up -d \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-acting-events-fantasy.log b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-acting-events-fantasy.log new file mode 100644 index 0000000000..42a0d55147 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-acting-events-fantasy.log @@ -0,0 +1,5 @@ +{"name":{"string":"Bill Murray"},"title":{"string":"Ghostbusters"}} +{"name":{"string":"Judy Garland"},"title":{"string":"The Wizard of Oz"}} +{"name":{"string":"Keanu Reeves"},"title":{"string":"The Matrix"}} +{"name":{"string":"Laura Dern"},"title":{"string":"Jurassic Park"}} +Processed a total of 4 messages diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-print-acting-events-fantasy.log b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-print-acting-events-fantasy.log new file mode 100644 index 0000000000..34130e600c --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-print-acting-events-fantasy.log @@ -0,0 +1,5 @@ + name title + Bill Murray Ghostbusters + Judy Garland The Wizard of Oz + Keanu Reeves The Matrix + Laura Dern Jurassic Park diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-drama.log b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-drama.log new file mode 100644 index 0000000000..f15188b2f8 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-drama.log @@ -0,0 +1,4 @@ + op name title + +I Matt Damon The Martian + +I Meryl Streep The Iron Lady + +I Russell Crowe Gladiator diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-other.log b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-other.log new file mode 100644 index 0000000000..74b62d1c0b --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-other.log @@ -0,0 +1,7 @@ ++----+--------------------------------+--------------------------------+ +| op | name | title | ++----+--------------------------------+--------------------------------+ +| +I | Christian Bale | The Dark Knight | +| +I | Diane Keaton | The Godfather: Part II | +| +I | Jennifer Aniston | Office Space | +| +I | Will Smith | Men in Black | diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-drama.log b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-drama.log new file mode 100644 index 0000000000..20330c87d2 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-drama.log @@ -0,0 +1,4 @@ + name title + Matt Damon The Martian + Meryl Streep The Iron Lady + Russell Crowe Gladiator diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/init.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/init.sh new file mode 100644 index 0000000000..c779335726 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/init.sh @@ -0,0 +1 @@ +mkdir split-stream && cd split-stream diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/populate-acting-events.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/populate-acting-events.sql new file mode 100644 index 0000000000..65cf9c8f7d --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/populate-acting-events.sql @@ -0,0 +1,12 @@ +INSERT INTO acting_events VALUES + ('Bill Murray', 'Ghostbusters', 'fantasy'), + ('Christian Bale', 'The Dark Knight', 'crime'), + ('Diane Keaton', 'The Godfather: Part II', 'crime'), + ('Jennifer Aniston', 'Office Space', 'comedy'), + ('Judy Garland', 'The Wizard of Oz', 'fantasy'), + ('Keanu Reeves', 'The Matrix', 'fantasy'), + ('Laura Dern', 'Jurassic Park', 'fantasy'), + ('Matt Damon', 'The Martian', 'drama'), + ('Meryl Streep', 'The Iron Lady', 'drama'), + ('Russell Crowe', 'Gladiator', 'drama'), + ('Will Smith', 'Men in Black', 'comedy'); diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/print_acting_events_fantasy.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/print_acting_events_fantasy.sql new file mode 100644 index 0000000000..86df1143aa --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/print_acting_events_fantasy.sql @@ -0,0 +1,4 @@ +SELECT + name, + title +FROM acting_events_fantasy; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql new file mode 100644 index 0000000000..34e9467e44 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql @@ -0,0 +1 @@ +SET 'sql-client.execution.result-mode' = 'table'; \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/start-cli.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/start-cli.sh new file mode 100644 index 0000000000..0dfa90dca3 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/start-cli.sh @@ -0,0 +1 @@ +docker exec -it flink-sql-client sql-client.sh \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-drama.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-drama.sql new file mode 100644 index 0000000000..c71b2aed74 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-drama.sql @@ -0,0 +1,3 @@ +SELECT name, title +FROM acting_events +WHERE genre = 'drama'; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-other.sql b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-other.sql new file mode 100644 index 0000000000..ab12b45971 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-other.sql @@ -0,0 +1,3 @@ +SELECT name, title +FROM acting_events +WHERE genre <> 'drama' AND genre <> 'fantasy'; diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/validate-acting-events-fantasy.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/validate-acting-events-fantasy.sh new file mode 100644 index 0000000000..33fe7be22c --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/validate-acting-events-fantasy.sh @@ -0,0 +1,7 @@ +docker exec -e SCHEMA_REGISTRY_LOG4J_OPTS=" " -it schema-registry /usr/bin/kafka-avro-console-consumer \ + --topic acting-events-fantasy \ + --from-beginning \ + --max-messages 4 \ + --timeout-ms 10000 \ + --bootstrap-server broker:9092 \ + diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/wait-for-containers.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/wait-for-containers.sh new file mode 100644 index 0000000000..a48d3b7ec6 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/dev/wait-for-containers.sh @@ -0,0 +1,3 @@ +while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:9081/) -eq 000 ] ; do sleep 5 ; done; +# Back off for Flink SQL client container to start. +sleep 5 diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/gradle-wrapper.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/gradle-wrapper.sh new file mode 100644 index 0000000000..da5bba080f --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/gradle-wrapper.sh @@ -0,0 +1 @@ +gradle wrapper \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/invoke-test.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/invoke-test.sh new file mode 100644 index 0000000000..19e0855bcd --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/invoke-test.sh @@ -0,0 +1 @@ +./gradlew test \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/make-test-dirs.sh b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/make-test-dirs.sh new file mode 100644 index 0000000000..b3f1266d49 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/code/tutorial-steps/test/make-test-dirs.sh @@ -0,0 +1,2 @@ +mkdir -p src/test/java/io/confluent/developer +mkdir src/test/resources \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-drama.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-drama.adoc new file mode 100644 index 0000000000..e52acc6273 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-drama.adoc @@ -0,0 +1,5 @@ +Since the output of our transient queries looks right, the next step is to make the queries persistent with the following `CREATE TABLE AS SELECT` statements. Go ahead and run the following three commands in your Flink SQL session: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-drama.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-fantasy.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-fantasy.adoc new file mode 100644 index 0000000000..1c5e9bb7b7 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-fantasy.adoc @@ -0,0 +1,3 @@ ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-fantasy.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-other.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-other.adoc new file mode 100644 index 0000000000..5eae73f0dc --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events-other.adoc @@ -0,0 +1,3 @@ ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events-other.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events.adoc new file mode 100644 index 0000000000..0eb5b180b9 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/create-acting-events.adoc @@ -0,0 +1,6 @@ +First, you'll need to create a stream to represent the actor / movie combinations. This line of Flink SQL DDL creates a table and its underlying Kafka topic to do so. +Note that we are defining the schema for the table, which includes three fields: `name`, actor's name; `title`, the title of the movie the actor is in; and `genre`, the genre of the movie. The statement also specifies the underlying Kafka topic as `acting-events`, that it should have a single partition (the default `num.partitions` configured in the broker), and defines Avro as its data format. + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/create-acting-events.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/init.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/init.adoc new file mode 100644 index 0000000000..496fca4877 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/init.adoc @@ -0,0 +1,5 @@ +To get started, make a new directory anywhere you'd like for this project: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/init.sh %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/make-docker-compose.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/make-docker-compose.adoc new file mode 100644 index 0000000000..b1bb19e741 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/make-docker-compose.adoc @@ -0,0 +1,4 @@ +Next, create the following `docker-compose.yml` file to obtain Confluent Platform (for Kafka in the cloud, see https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]). The Docker Compose file will start three Apache Flink® containers that have Kafka connector dependencies preinstalled: an interactive Flink SQL client (`flink-sql-client`) that sends streaming SQL jobs to the Flink Job Manager (`flink-job-manager`), which in turn assigns tasks to the Flink Task Manager (`flink-task-manager`) in the Flink cluster. ++++++ +
{% include_raw tutorials/splitting/flinksql/code/docker-compose.yml %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/populate-acting-events.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/populate-acting-events.adoc new file mode 100644 index 0000000000..d0de5a1eb4 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/populate-acting-events.adoc @@ -0,0 +1,5 @@ +Let's add a small amount of data to our table, so we can see subsequent event routing queries work. Go ahead and paste this statement into the Flink SQL CLI now. + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/populate-acting-events.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/start-cli.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/start-cli.adoc new file mode 100644 index 0000000000..2e6e4573ce --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/start-cli.adoc @@ -0,0 +1,5 @@ +The best way to interact with Flink SQL when you're learning how things work is with the Flink SQL CLI. Fire it up as follows: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/start-cli.sh %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/start-compose.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/start-compose.adoc new file mode 100644 index 0000000000..2a974a57b2 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/start-compose.adoc @@ -0,0 +1,5 @@ +Launch it by running: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/docker-compose-up.sh %}
++++++ \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/transient-query-drama.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/transient-query-drama.adoc new file mode 100644 index 0000000000..2515ec048e --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/transient-query-drama.adoc @@ -0,0 +1,24 @@ +With our test data in place, let's execute a query to find all of the actor / movie combinations for drama films. + +This query will keep running, continuing to return results until you quit by entering `Q`. + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-drama.sql %}
++++++ + +This should yield the following output: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-drama.log %}
++++++ + +Enter `Q` to return to the Flink SQL prompt. + +Note that these results were materialized in memory and printed in a human readable table representation because the default `sql-client.execution.result-mode` configuration value is `table`. You can view non-materialized streaming results as a changelog by running `SET 'sql-client.execution.result-mode' = 'changelog';` +and rerunning the same query. The results will look like this: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-drama.log %}
++++++ + +Or, as a third option, you can see streaming results non-materialized and inline in the SQL client by running ``SET 'sql-client.execution.result-mode' = 'tableau';`` and rerunning the query once more. In this case, the results will look very similar to `changelog` mode results. This is because tables sourced by the Kafka connector are _unbounded_ and can thus only be queried in `streaming` mode. For further reading on these Flink SQL concepts, consult the documentation on https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sqlclient/#sql-client-result-modes[SQL client result modes] and https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode/[streaming vs. batch execution] diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/transient-query-other.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/transient-query-other.adoc new file mode 100644 index 0000000000..98e2325784 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/transient-query-other.adoc @@ -0,0 +1,13 @@ +You can also use negative matches, that is, messages that don’t match the condition. Run this query to get a list of all actor / movie combinations for films that aren’t drama or fantasy. + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/transient-query-other.sql %}
++++++ + +This should yield the following output (assuming the result mode is still `tableau`): + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-transient-query-changelog-other.log %}
++++++ + +Enter `Ctrl-C` to return to the Flink SQL prompt. diff --git a/_includes/tutorials/splitting/flinksql/markup/dev/validate-acting-events-per-genre.adoc b/_includes/tutorials/splitting/flinksql/markup/dev/validate-acting-events-per-genre.adoc new file mode 100644 index 0000000000..6ba7ce8e3f --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/dev/validate-acting-events-per-genre.adoc @@ -0,0 +1,31 @@ +Seeing is believing, so let's query the persistent `acting_events_fantasy` table. First set the result mode back to `table`: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql %}
++++++ + +Then query the `acting_events_fantasy` table: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/print_acting_events_fantasy.sql %}
++++++ + +This will yield the same output that the transient query did: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-print-acting-events-fantasy.log %}
++++++ + +We could also query the underlying topic directly using `kafka-avro-console-consumer`. Open a new terminal window and run the following command: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/validate-acting-events-fantasy.sh %}
++++++ + +This will yield the following results: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/dev/expected-acting-events-fantasy.log %}
++++++ + +Now go ahead and query the `acting_events_drama` and `acting_events_other` tables similarly. diff --git a/_includes/tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events-drama.sql.template.adoc b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events-drama.sql.template.adoc new file mode 100644 index 0000000000..b021ddb91a --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events-drama.sql.template.adoc @@ -0,0 +1,4 @@ +Create the following file at `src/test/resources/create-acting-events-drama.sql.template`. Again, note the `KAFKA_PORT` and `SCHEMA_REGISTRY_PORT` placeholders since this table uses the Upsert Kafka connector and hence must be able to communicate with Kafka and Schema Registry. ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/resources/create-acting-events-drama.sql.template %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events.sql.template.adoc b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events.sql.template.adoc new file mode 100644 index 0000000000..d619a31c84 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-create-acting-events.sql.template.adoc @@ -0,0 +1,9 @@ +We could always inline the SQL statements in our Java test code, but creating separate resource files makes our test more readable and easier to maintain. Further, we can imagine parametrizing URLs as well so that we can have a single set of source controlled queries to use in tests as well as staging or production environments. + +There are a handful of resources to create for our test. These mirror the queries that we developed earlier. + +Create the following file at `src/test/resources/create-acting-events.sql.template`. Note the `KAFKA_PORT` and `SCHEMA_REGISTRY_PORT` placeholders in this file. Out test will dynamically assign these to the ports that Testcontainers assigns. + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/resources/create-acting-events.sql.template %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/create-resource-expected-acting-events-drama.txt.adoc b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-expected-acting-events-drama.txt.adoc new file mode 100644 index 0000000000..80a7964847 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-expected-acting-events-drama.txt.adoc @@ -0,0 +1,4 @@ +Finally, create the following file at `src/test/resources/expected-acting-events-drama.txt` that contains our test's expected output: ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/resources/expected-acting-events-drama.txt %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/create-resource-populate-acting-events.sql.adoc b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-populate-acting-events.sql.adoc new file mode 100644 index 0000000000..4c39935604 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-populate-acting-events.sql.adoc @@ -0,0 +1,4 @@ +Create the following file at `src/test/resources/populate-acting-events.sql`. ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/resources/populate-acting-events.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/create-resource-query-acting-events-drama.sql.adoc b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-query-acting-events-drama.sql.adoc new file mode 100644 index 0000000000..f6e2081b46 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/create-resource-query-acting-events-drama.sql.adoc @@ -0,0 +1,4 @@ +Next, create the following file at `src/test/resources/query-acting-events-drama.sql`: ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/resources/query-acting-events-drama.sql %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/invoke-test.adoc b/_includes/tutorials/splitting/flinksql/markup/test/invoke-test.adoc new file mode 100644 index 0000000000..588c659d22 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/invoke-test.adoc @@ -0,0 +1,5 @@ +Now run the test, which is as simple as: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/test/invoke-test.sh %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/make-build-gradle.adoc b/_includes/tutorials/splitting/flinksql/markup/test/make-build-gradle.adoc new file mode 100644 index 0000000000..cf3a90ca1a --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/make-build-gradle.adoc @@ -0,0 +1,10 @@ +Create the following Gradle build file, named `build.gradle`, in the `split-stream` directory. + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/build.gradle %}
++++++ + +There are a couple of important points to note in the Gradle build file: + +. Java `sourceCompatibility` and `targetCompatibility` are set to Java 11. Flink supports Java 8 (deprecated) and 11 as of the writing of this tutorial +. The `dependencies` section declares test dependencies for Testcontainers and Flink. Among the handful of Flink dependencies are ones providing local execution (e.g., `flink-statebackend-rocksdb`), the Table API (`flink-table-api-java-bridge`), and Kafka connectors that can use Schema Registry (`flink-sql-connector-kafka` and `flink-sql-avro-confluent-registry`). diff --git a/_includes/tutorials/splitting/flinksql/markup/test/make-gradle-wrapper.adoc b/_includes/tutorials/splitting/flinksql/markup/test/make-gradle-wrapper.adoc new file mode 100644 index 0000000000..c3b5ef9eac --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/make-gradle-wrapper.adoc @@ -0,0 +1,5 @@ +And be sure to run the following command to obtain the Gradle wrapper: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/test/gradle-wrapper.sh %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/make-test-base.adoc b/_includes/tutorials/splitting/flinksql/markup/test/make-test-base.adoc new file mode 100644 index 0000000000..f91a8be35f --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/make-test-base.adoc @@ -0,0 +1,8 @@ +Create the following abstract test class at `src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java`: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java %}
++++++ + +Take a look at this class. It contains the functionality and utility methods that any Flink SQL test would use. Namely, it +encapsulates Kafka broker and Schema Registry Testcontainer management and includes utility methods for dynamically configuring Flink SQL Kafka connectors and processing Table API results. diff --git a/_includes/tutorials/splitting/flinksql/markup/test/make-test-dirs.adoc b/_includes/tutorials/splitting/flinksql/markup/test/make-test-dirs.adoc new file mode 100644 index 0000000000..289440d981 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/make-test-dirs.adoc @@ -0,0 +1,7 @@ +The primary choices for programming language in which to write our tests are Java and Python given the need for Flink's Table API. We'll write ours in Java. + +To start our test project, make new directories for test source code and resources within the same `split-stream` folder that you created earlier: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/tutorial-steps/test/make-test-dirs.sh %}
++++++ diff --git a/_includes/tutorials/splitting/flinksql/markup/test/make-test.adoc b/_includes/tutorials/splitting/flinksql/markup/test/make-test.adoc new file mode 100644 index 0000000000..e75320a504 --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/make-test.adoc @@ -0,0 +1,7 @@ +Next, create the test implementation at `src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java`: + ++++++ +
{% include_raw tutorials/splitting/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlSplitStreamTest.java %}
++++++ + +The test itself is straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. \ No newline at end of file diff --git a/_includes/tutorials/splitting/flinksql/markup/test/test-architecture.adoc b/_includes/tutorials/splitting/flinksql/markup/test/test-architecture.adoc new file mode 100644 index 0000000000..9d4416b39e --- /dev/null +++ b/_includes/tutorials/splitting/flinksql/markup/test/test-architecture.adoc @@ -0,0 +1,15 @@ +Now that you have manually developed and tested your Flink SQL application, how might you create an automated test for +it so that it's easier to maintain and upgrade over time? Imagine how painful it would be to have to manually test every change or +software dependency upgrade of your application, and then imagine having to do this for many applications. The benefits of +automated testing are clear, but how do we get there? + +First, what do we want in an automated integration test? For starters: + +. *Real running services* that our application depends on +. *Small resource footprint* so that developers can run the test locally +. *Low enough latency* so that development iterations aren't hampered -- not as low latency as is required for a unit test, but test duration should be on the order of seconds +. *Isolation* so that many tests can run concurrently on the same machine when this test gets run on a build automation server, e.g., no hardcoded ports + +Luckily, tools are at our disposal to solve these problems. We'll use https://www.testcontainers.org/[Testcontainers] to run +containerized Kafka and Schema Registry servers on dynamic ports, Flink's support for local execution environments so that we don't need to spin up a Flink cluster, and Flink's Table API +in order to execute the Flink SQL statements that comprise our application. diff --git a/tutorials/splitting/flinksql.html b/tutorials/splitting/flinksql.html new file mode 100644 index 0000000000..0c7e42adcf --- /dev/null +++ b/tutorials/splitting/flinksql.html @@ -0,0 +1,6 @@ +--- +layout: tutorial +permalink: /split-a-stream-of-events-into-substreams/flinksql +stack: flinksql +static_data: splitting +---