diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 589e9329bb..552b1b0323 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -496,3 +496,7 @@ blocks: - name: Flink SQL test for splitting commands: - make -C _includes/tutorials/splitting/flinksql/code tutorial + - name: Flink SQL test for filtering + commands: + - make -C _includes/tutorials/filtering/flinksql/code tutorial + diff --git a/_data/harnesses/filtering/flinksql.yml b/_data/harnesses/filtering/flinksql.yml new file mode 100644 index 0000000000..fb73c7a475 --- /dev/null +++ b/_data/harnesses/filtering/flinksql.yml @@ -0,0 +1,155 @@ +dev: + steps: + - title: Prerequisites + content: + - action: skip + render: + file: shared/markup/dev/docker-prerequisite.adoc + + - title: Initialize the project + content: + - action: execute + file: tutorial-steps/dev/init.sh + render: + file: tutorials/filtering/flinksql/markup/dev/init.adoc + + - title: Get Confluent Platform + content: + - action: make_file + file: docker-compose.yml + render: + file: tutorials/filtering/flinksql/markup/dev/make-docker-compose.adoc + + - action: execute_async + file: tutorial-steps/dev/docker-compose-up.sh + render: + file: tutorials/filtering/flinksql/markup/dev/start-compose.adoc + + - action: execute + file: tutorial-steps/dev/wait-for-containers.sh + render: + skip: true + + - title: Write the program interactively using the CLI + content: + - action: docker_flinksql_cli_session + container: flink-sql-client + docker_bootup_file: tutorial-steps/dev/start-cli.sh + column_width: 20 + render: + file: tutorials/filtering/flinksql/markup/dev/start-cli.adoc + stdin: + - file: tutorial-steps/dev/create-all-publications.sql + render: + file: tutorials/filtering/flinksql/markup/dev/create-all-publications.adoc + + - file: tutorial-steps/dev/populate-publication-events.sql + render: + file: tutorials/filtering/flinksql/markup/dev/populate-publication-events.adoc + + - file: tutorial-steps/dev/transient-query.sql + render: + file: tutorials/filtering/flinksql/markup/dev/transient-query.adoc + + - file: tutorial-steps/dev/create-publications-by-author.sql + render: + file: tutorials/filtering/flinksql/markup/dev/create-publications-by-author.adoc + + - file: tutorial-steps/dev/populate-publications-by-author.sql + render: + file: tutorials/filtering/flinksql/markup/dev/populate-publications-by-author.adoc + + + stdout: + directory: tutorial-steps/dev/outputs + + - title: Validate output + content: + - action: execute + file: tutorial-steps/dev/validate-publications-by-author.sh + stdout: tutorial-steps/dev/outputs/validate-publications-by-author.log + render: + file: tutorials/filtering/flinksql/markup/dev/validate-publications-by-author.adoc + +test: + steps: + - title: Decide what testing tools to use + content: + - action: skip + render: + file: tutorials/filtering/flinksql/markup/test/test-architecture.adoc + + - title: Create the test skeleton + content: + - action: execute + file: tutorial-steps/test/make-test-dirs.sh + render: + file: tutorials/filtering/flinksql/markup/test/make-test-dirs.adoc + + - action: make_file + file: build.gradle + render: + file: tutorials/filtering/flinksql/markup/test/make-build-gradle.adoc + + - action: execute + file: tutorial-steps/test/gradle-wrapper.sh + render: + file: tutorials/filtering/flinksql/markup/test/make-gradle-wrapper.adoc + + - title: Create SQL resources + content: + - action: make_file + file: src/test/resources/create-all-publications.sql.template + render: + file: tutorials/filtering/flinksql/markup/test/create-all-publications.sql.template.adoc + + - action: make_file + file: src/test/resources/populate-publication-events.sql + render: + file: tutorials/filtering/flinksql/markup/test/create-resource-populate-publication-events.sql.adoc + - action: make_file + file: src/test/resources/create-publications-by-author.sql.template + render: + file: tutorials/filtering/flinksql/markup/test/create-resource-create-publications-by-author.sql.template.adoc + + - action: make_file + file: src/test/resources/populate-publications-by-author.sql + render: + file: tutorials/filtering/flinksql/markup/test/create-resource-populate-publications-by-author.sql.adoc + + - action: make_file + file: src/test/resources/query-publications-by-author.sql + render: + file: tutorials/filtering/flinksql/markup/test/create-resource-query-publications-by-author.sql.adoc + + - action: make_file + file: src/test/resources/expected-publications-by-author.txt + render: + file: tutorials/filtering/flinksql/markup/test/create-resource-expected-publications-by-author.txt.adoc + + - title: Write a test + content: + - action: make_file + file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java + render: + file: tutorials/filtering/flinksql/markup/test/make-test-base.adoc + + - action: make_file + file: src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java + render: + file: tutorials/filtering/flinksql/markup/test/make-test.adoc + + - title: Invoke the test + content: + - action: execute + file: tutorial-steps/test/invoke-test.sh + render: + file: tutorials/filtering/flinksql/markup/test/invoke-test.adoc + +ccloud: + steps: + - title: Run your app to Confluent Cloud + content: + - action: skip + render: + file: shared/markup/ccloud/try-ccloud.adoc diff --git a/_data/tutorials.yaml b/_data/tutorials.yaml index 3bb712bdb2..d59c651b8b 100644 --- a/_data/tutorials.yaml +++ b/_data/tutorials.yaml @@ -28,6 +28,7 @@ filtering: kstreams: enabled kafka: enabled confluent: enabled + flinksql: enabled splitting: title: How to split a stream of events into substreams meta-description: split a stream of events into substreams @@ -621,7 +622,8 @@ kafka-producer-application-callback: the Callback interface canonical: confluent slug: /kafka-producer-callback-application - question: How can you use callbacks with a KafkaProducer to handle responses from the broker? + question: How can you use callbacks with a KafkaProducer to handle responses from + the broker? introduction: You have an application using an Apache Kafka producer, and you want an automatic way of handling responses after producing records. In this tutorial, you'll learn how to use the Callback interface to automatically handle responses diff --git a/_includes/tutorials/filtering/flinksql/code/.gitignore b/_includes/tutorials/filtering/flinksql/code/.gitignore new file mode 100644 index 0000000000..c1a4a4ee90 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/.gitignore @@ -0,0 +1,7 @@ +tutorial-steps/dev/outputs/ + +# Ignore Gradle project-specific cache directory +.gradle + +# Ignore Gradle build output directory +build diff --git a/_includes/tutorials/filtering/flinksql/code/Makefile b/_includes/tutorials/filtering/flinksql/code/Makefile new file mode 100644 index 0000000000..03f102b2d7 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/Makefile @@ -0,0 +1,11 @@ +STEPS_DIR := tutorial-steps +DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs +TEMP_DIR := $(shell mktemp -d) +SEQUENCE := "dev, test, ccloud" + +tutorial: + rm -r $(DEV_OUTPUTS_DIR) || true + mkdir $(DEV_OUTPUTS_DIR) + harness-runner ../../../../../_data/harnesses/filtering/flinksql.yml $(TEMP_DIR) $(SEQUENCE) + diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-books-by-author.log $(DEV_OUTPUTS_DIR)/validate-publications-by-author.log + reset diff --git a/_includes/tutorials/filtering/flinksql/code/build.gradle b/_includes/tutorials/filtering/flinksql/code/build.gradle new file mode 100644 index 0000000000..2ea77181f0 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/build.gradle @@ -0,0 +1,34 @@ +buildscript { + repositories { + mavenCentral() + } +} + +plugins { + id "java" + id "idea" +} + +sourceCompatibility = JavaVersion.VERSION_11 +targetCompatibility = JavaVersion.VERSION_11 +version = "0.0.1" + +repositories { + mavenCentral() +} + +dependencies { + testImplementation "com.google.guava:guava:31.1-jre" + testImplementation "junit:junit:4.13.2" + testImplementation 'org.testcontainers:testcontainers:1.17.6' + testImplementation 'org.testcontainers:kafka:1.17.6' + testImplementation "org.apache.flink:flink-sql-connector-kafka:1.17.1" + testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.17.1" + testImplementation "org.apache.flink:flink-test-utils:1.17.1" + testImplementation "org.apache.flink:flink-test-utils-junit:1.17.1" + testImplementation 'org.apache.flink:flink-json:1.17.1' + testImplementation "org.apache.flink:flink-table-api-java-bridge:1.17.0" + testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1" + testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1:tests" + testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.17.1" +} diff --git a/_includes/tutorials/filtering/flinksql/code/docker-compose.yml b/_includes/tutorials/filtering/flinksql/code/docker-compose.yml new file mode 100644 index 0000000000..ac72cce11c --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/docker-compose.yml @@ -0,0 +1,59 @@ +version: '2' +services: + broker: + image: confluentinc/cp-kafka:7.4.1 + hostname: broker + container_name: broker + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 + KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + flink-sql-client: + image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11 + hostname: flink-sql-client + container_name: flink-sql-client + depends_on: + - flink-jobmanager + environment: + FLINK_JOBMANAGER_HOST: flink-jobmanager + volumes: + - ./settings/:/settings + flink-jobmanager: + image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11 + hostname: flink-jobmanager + container_name: flink-jobmanager + ports: + - 9081:9081 + command: jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: flink-jobmanager + rest.bind-port: 9081 + flink-taskmanager: + image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11 + hostname: flink-taskmanager + container_name: flink-taskmanager + depends_on: + - flink-jobmanager + command: taskmanager + scale: 1 + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: flink-jobmanager + taskmanager.numberOfTaskSlots: 10 diff --git a/_includes/tutorials/filtering/flinksql/code/gradle/wrapper/gradle-wrapper.jar b/_includes/tutorials/filtering/flinksql/code/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000000..249e5832f0 Binary files /dev/null and b/_includes/tutorials/filtering/flinksql/code/gradle/wrapper/gradle-wrapper.jar differ diff --git a/_includes/tutorials/filtering/flinksql/code/gradle/wrapper/gradle-wrapper.properties b/_includes/tutorials/filtering/flinksql/code/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000000..ae04661ee7 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/_includes/tutorials/filtering/flinksql/code/gradlew b/_includes/tutorials/filtering/flinksql/code/gradlew new file mode 100755 index 0000000000..a69d9cb6c2 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/gradlew @@ -0,0 +1,240 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/_includes/tutorials/filtering/flinksql/code/gradlew.bat b/_includes/tutorials/filtering/flinksql/code/gradlew.bat new file mode 100644 index 0000000000..f127cfd49d --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/gradlew.bat @@ -0,0 +1,91 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/_includes/tutorials/filtering/flinksql/code/settings.gradle b/_includes/tutorials/filtering/flinksql/code/settings.gradle new file mode 100644 index 0000000000..9a5bf50f31 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html + */ + +rootProject.name = 'filtering-events-flinksql' diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java b/_includes/tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java new file mode 100644 index 0000000000..878611a46f --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java @@ -0,0 +1,175 @@ +package io.confluent.developer; + + +import com.google.common.io.Resources; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.client.JobCancellationException; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.assertj.core.util.Sets; +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; + +import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT; + +/** + * Base class for Flink SQL integration tests that use Flink's Kafka connectors. Encapsulates + * Kafka broker and Schema Registry Testcontainer management and includes utility methods for + * dynamically configuring Flink SQL Kafka connectors and processing Table API results. + */ +public class AbstractFlinkKafkaTest { + + protected static StreamTableEnvironment streamTableEnv; + protected static Integer schemaRegistryPort, kafkaPort; + + @BeforeClass + public static void setup() { + // create Flink table environment that test subclasses will use to execute SQL statements + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.setStateBackend(new EmbeddedRocksDBStateBackend()); + streamTableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + + // Start Kafka and Schema Registry Testcontainers. Set the exposed ports that test subclasses + // can use to dynamically configure Kafka connectors. Schema Registry enables connectors to + // be configured with 'value.format' = 'avro-confluent' + Network network = Network.newNetwork(); + + KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.2")) + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS", "1") + .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "500") + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") + .withReuse(true) + .withNetwork(network); + kafka.start(); + kafkaPort = kafka.getMappedPort(KAFKA_PORT); + + GenericContainer schemaRegistry = new GenericContainer(DockerImageName.parse("confluentinc/cp-schema-registry:7.3.2")) + .withExposedPorts(8081) + .withNetwork(kafka.getNetwork()) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost") + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + kafka.getNetworkAliases().get(0) + ":9092"); + schemaRegistry.start(); + schemaRegistryPort = schemaRegistry.getMappedPort(8081); + } + + /** + * Given a resource filename and optional Kafka / Schema Registry ports, return the resource + * file contents as a String with ports substituted for KAFKA_PORT and SCHEMA_REGISTRY_PORT + * placeholders. + * + * @param resourceFileName the resource file name + * @param kafkaPort the port that Kafka broker exposes + * @param schemaRegistryPort the port that Schema Registry exposes + * @return resource file contents with port values substituted for placeholders + * @throws IOException if resource file can't be read + */ + protected static String getResourceFileContents( + String resourceFileName, + Optional kafkaPort, + Optional schemaRegistryPort + ) throws IOException { + URL url = Resources.getResource(resourceFileName); + String contents = Resources.toString(url, StandardCharsets.UTF_8); + if (kafkaPort.isPresent()) { + contents = contents.replaceAll("KAFKA_PORT", kafkaPort.get().toString()); + } + if (schemaRegistryPort.isPresent()) { + contents = contents.replaceAll("SCHEMA_REGISTRY_PORT", schemaRegistryPort.get().toString()); + } + return contents; + } + + /** + * Given a resource filename, return the resource file contents as a String. + * + * @param resourceFileName the resource file name + * @return resource file contents + * @throws IOException if resource file can't be read + */ + protected static String getResourceFileContents( + String resourceFileName + ) throws IOException { + // no Kafka / Schema Registry ports + return getResourceFileContents(resourceFileName, Optional.empty(), Optional.empty()); + } + + /** + * Utility method to convert a String containing multiple lines into a set of String's where + * each String is one line. This is useful for creating Flink SQL integration tests based on + * the tableau results printed via the Table API where the order of results is nondeterministic. + * + * @param s multiline String + * @return set of String's where each member is one line + */ + protected static Set stringToLineSet(String s) { + return Sets.newHashSet(Arrays.asList(s.split("\\r?\\n"))); + } + + /** + * Given a Flink Table API `TableResult` respresenting a SELECT statement result, + * capture and return the statement's tableau results. + * + * @param tableResult Flink Table API `TableResult` respresenting a SELECT statement result + * @return the SELECT statement's tableau results + */ + protected static String tableauResults(TableResult tableResult) { + // capture tableau results printed to stdout in a String + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + // The given table result may come from a table backed by the Kafka or Upsert Kafka connector, + // both of which perform unbounded (neverending) scans. So, in order to prevent tests from blocking + // on called to this method, we kick off a thread to kill the underlying job once output has + // been printed. + // + // Note: as of Flink 1.17.0, the Kafka connector will support bounded scanning, which would obviate + // the need to do this. However, the Upsert Kafka connector will still be unbounded. + new Thread(() -> { + while (0 == baos.size()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // do nothing; keep waiting + } + } + tableResult.getJobClient().get().cancel(); + }).start(); + + try { + tableResult.print(); + } catch (RuntimeException rte) { + if (ExceptionUtils.indexOfThrowable(rte, JobCancellationException.class) != -1) { + // a JobCancellationException in the exception stack is expected due to delayed + // job cancellation in separate thread; do nothing + } else { + rte.printStackTrace(); + System.exit(1); + } + } + System.setOut(System.out); + return baos.toString(); + } + +} diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java b/_includes/tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java new file mode 100644 index 0000000000..ec05fdc9c3 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java @@ -0,0 +1,38 @@ +package io.confluent.developer; + + +import org.apache.flink.table.api.TableResult; +import org.junit.Test; + +import java.util.Optional; + +import static org.junit.Assert.assertEquals; + +public class FlinkSqlFilteringTest extends AbstractFlinkKafkaTest { + + @Test + public void simpleSelect() throws Exception { + // create base movie sales table and aggregation table, and populate with test data + streamTableEnv.executeSql(getResourceFileContents("create-all-publications.sql.template", + Optional.of(kafkaPort),Optional.of(schemaRegistryPort))).await(); + streamTableEnv.executeSql(getResourceFileContents("create-publications-by-author.sql.template", + Optional.of(kafkaPort),Optional.of(schemaRegistryPort))).await(); + + + // We can't call await() on this result since it won't return. In Flink 17 and later this can change + // by setting 'scan.bounded.mode' = 'latest-offset' in the movie_sales CREATE TABLE statement, which will + // cause this INSERT to terminate once the latest offset of movie_sales table is reached. + streamTableEnv.executeSql(getResourceFileContents("populate-publication-events.sql")); + streamTableEnv.executeSql(getResourceFileContents("populate-publications-by-author.sql")); + + // execute query on result table that should have movie sales aggregated by release year + TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-publications-by-author.sql")); + + // Compare actual and expected results. Convert result output to line sets to compare so that order + // doesn't matter, because the grouped result order doesn't matter -- 2017's could come before or after 2019's. + String actualTableauResults = tableauResults(tableResult); + String expectedTableauResults = getResourceFileContents("expected-publications-by-author.txt"); + assertEquals(stringToLineSet(actualTableauResults), stringToLineSet(expectedTableauResults)); + } + +} diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/resources/create-all-publications.sql.template b/_includes/tutorials/filtering/flinksql/code/src/test/resources/create-all-publications.sql.template new file mode 100644 index 0000000000..22ac3a863d --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/resources/create-all-publications.sql.template @@ -0,0 +1,14 @@ +CREATE TABLE publication_events ( + book_id INT, + author STRING, + title STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'publication_events', + 'properties.bootstrap.servers' = 'localhost:KAFKA_PORT', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'raw', + 'key.fields' = 'book_id', + 'value.format' = 'json', + 'value.fields-include' = 'EXCEPT_KEY' +); \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/resources/create-publications-by-author.sql.template b/_includes/tutorials/filtering/flinksql/code/src/test/resources/create-publications-by-author.sql.template new file mode 100644 index 0000000000..2ace024991 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/resources/create-publications-by-author.sql.template @@ -0,0 +1,13 @@ +CREATE TABLE george_martin_books ( + book_id INT, + title STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'george_martin_books', + 'properties.bootstrap.servers' = 'localhost:KAFKA_PORT', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'raw', + 'key.fields' = 'book_id', + 'value.format' = 'json', + 'value.fields-include' = 'EXCEPT_KEY' +); \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/resources/expected-publications-by-author.txt b/_includes/tutorials/filtering/flinksql/code/src/test/resources/expected-publications-by-author.txt new file mode 100644 index 0000000000..a3a0e28b01 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/resources/expected-publications-by-author.txt @@ -0,0 +1,7 @@ ++----+-------------+--------------------------------+ +| op | book_id | title | ++----+-------------+--------------------------------+ +| +I | 1 | A Song of Ice and Fire | +| +I | 3 | Fire & Blood | +| +I | 6 | A Dream of Spring | +| +I | 8 | The Ice Dragon | diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/resources/populate-publication-events.sql b/_includes/tutorials/filtering/flinksql/code/src/test/resources/populate-publication-events.sql new file mode 100644 index 0000000000..ccebe07bbc --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/resources/populate-publication-events.sql @@ -0,0 +1,11 @@ +INSERT INTO publication_events VALUES + (0, 'C.S. Lewis', 'The Silver Chair'), + (1, 'George R. R. Martin', 'A Song of Ice and Fire'), + (2, 'C.S. Lewis', 'Perelandra'), + (3, 'George R. R. Martin', 'Fire & Blood'), + (4, 'J. R. R. Tolkien', 'The Hobbit'), + (5, 'J. R. R. Tolkien', 'The Lord of the Rings'), + (6, 'George R. R. Martin', 'A Dream of Spring'), + (7, 'J. R. R. Tolkien', 'The Fellowship of the Ring'), + (8, 'George R. R. Martin', 'The Ice Dragon'), + (9, 'Mario Puzo', 'The Godfather'); \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/resources/populate-publications-by-author.sql b/_includes/tutorials/filtering/flinksql/code/src/test/resources/populate-publications-by-author.sql new file mode 100644 index 0000000000..b24f717e04 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/resources/populate-publications-by-author.sql @@ -0,0 +1,6 @@ +INSERT INTO george_martin_books +SELECT + book_id, + title +FROM publication_events +WHERE author = 'George R. R. Martin'; \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/src/test/resources/query-publications-by-author.sql b/_includes/tutorials/filtering/flinksql/code/src/test/resources/query-publications-by-author.sql new file mode 100644 index 0000000000..ad36e91b5d --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/src/test/resources/query-publications-by-author.sql @@ -0,0 +1 @@ +SELECT * FROM george_martin_books; \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/clean-up.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/clean-up.sh new file mode 100644 index 0000000000..36f5aa9872 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/clean-up.sh @@ -0,0 +1 @@ +docker compose down diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/create-all-publications.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/create-all-publications.sql new file mode 100644 index 0000000000..9d03fef043 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/create-all-publications.sql @@ -0,0 +1,14 @@ +CREATE TABLE publication_events ( + book_id INT, + author STRING, + title STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'publication_events', + 'properties.bootstrap.servers' = 'broker:9092', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'raw', + 'key.fields' = 'book_id', + 'value.format' = 'json', + 'value.fields-include' = 'EXCEPT_KEY' +); \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/create-publications-by-author.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/create-publications-by-author.sql new file mode 100644 index 0000000000..d5d845f9e3 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/create-publications-by-author.sql @@ -0,0 +1,13 @@ +CREATE TABLE george_martin_books ( + book_id INT, + title STRING +) WITH ( + 'connector' = 'kafka', + 'topic' = 'george_martin_books', + 'properties.bootstrap.servers' = 'broker:9092', + 'scan.startup.mode' = 'earliest-offset', + 'key.format' = 'raw', + 'key.fields' = 'book_id', + 'value.format' = 'json', + 'value.fields-include' = 'EXCEPT_KEY' +); \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/docker-compose-up.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/docker-compose-up.sh new file mode 100644 index 0000000000..e92332ec8e --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/docker-compose-up.sh @@ -0,0 +1 @@ +docker compose up -d \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-books-by-author.log b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-books-by-author.log new file mode 100644 index 0000000000..99c1d23609 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-books-by-author.log @@ -0,0 +1,5 @@ +1-{"title":"A Song of Ice and Fire"} +3-{"title":"Fire & Blood"} +6-{"title":"A Dream of Spring"} +8-{"title":"The Ice Dragon"} +Processed a total of 4 messages diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-print-output-topic.log b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-print-output-topic.log new file mode 100644 index 0000000000..3faf84b261 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-print-output-topic.log @@ -0,0 +1,5 @@ + book_id title + 1 A Song of Ice and Fire + 3 Fire & Blood + 6 A Dream of Spring + 8 The Ice Dragon \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-transient-query.log b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-transient-query.log new file mode 100644 index 0000000000..d21d7a16f2 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-transient-query.log @@ -0,0 +1,5 @@ + book_id author title + 1 George R. R. Martin A Song of Ice and Fire + 3 George R. R. Martin Fire & Blood + 6 George R. R. Martin A Dream of Spring + 8 George R. R. Martin The Ice Dragon \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/init.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/init.sh new file mode 100644 index 0000000000..e92b6d5364 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/init.sh @@ -0,0 +1 @@ +mkdir filter-events && cd filter-events diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publication-events.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publication-events.sql new file mode 100644 index 0000000000..ccebe07bbc --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publication-events.sql @@ -0,0 +1,11 @@ +INSERT INTO publication_events VALUES + (0, 'C.S. Lewis', 'The Silver Chair'), + (1, 'George R. R. Martin', 'A Song of Ice and Fire'), + (2, 'C.S. Lewis', 'Perelandra'), + (3, 'George R. R. Martin', 'Fire & Blood'), + (4, 'J. R. R. Tolkien', 'The Hobbit'), + (5, 'J. R. R. Tolkien', 'The Lord of the Rings'), + (6, 'George R. R. Martin', 'A Dream of Spring'), + (7, 'J. R. R. Tolkien', 'The Fellowship of the Ring'), + (8, 'George R. R. Martin', 'The Ice Dragon'), + (9, 'Mario Puzo', 'The Godfather'); \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publications-by-author.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publications-by-author.sql new file mode 100644 index 0000000000..40d3587a26 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publications-by-author.sql @@ -0,0 +1,6 @@ +INSERT INTO george_martin_books +SELECT + book_id, + title +FROM publication_events +WHERE author = 'George R. R. Martin'; \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/print-output-topic.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/print-output-topic.sql new file mode 100644 index 0000000000..df45f9654a --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/print-output-topic.sql @@ -0,0 +1,4 @@ +SELECT + book_id, + title +FROM george_martin_books; \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql new file mode 100644 index 0000000000..34e9467e44 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql @@ -0,0 +1 @@ +SET 'sql-client.execution.result-mode' = 'table'; \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/start-cli.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/start-cli.sh new file mode 100644 index 0000000000..0dfa90dca3 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/start-cli.sh @@ -0,0 +1 @@ +docker exec -it flink-sql-client sql-client.sh \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/transient-query.sql b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/transient-query.sql new file mode 100644 index 0000000000..575cf9c7bc --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/transient-query.sql @@ -0,0 +1 @@ +SELECT * FROM publication_events WHERE author = 'George R. R. Martin' ; \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/validate-publications-by-author.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/validate-publications-by-author.sh new file mode 100644 index 0000000000..4559e610f8 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/validate-publications-by-author.sh @@ -0,0 +1,9 @@ +docker exec -it broker /usr/bin/kafka-console-consumer\ + --topic george_martin_books \ + --from-beginning \ + --max-messages 4 \ + --timeout-ms 10000 \ + --bootstrap-server broker:9092 \ + --property key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer \ + --property print.key=true \ + --property key.separator="-" \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/wait-for-containers.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/wait-for-containers.sh new file mode 100644 index 0000000000..a48d3b7ec6 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/dev/wait-for-containers.sh @@ -0,0 +1,3 @@ +while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:9081/) -eq 000 ] ; do sleep 5 ; done; +# Back off for Flink SQL client container to start. +sleep 5 diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/gradle-wrapper.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/gradle-wrapper.sh new file mode 100644 index 0000000000..da5bba080f --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/gradle-wrapper.sh @@ -0,0 +1 @@ +gradle wrapper \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/invoke-test.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/invoke-test.sh new file mode 100644 index 0000000000..19e0855bcd --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/invoke-test.sh @@ -0,0 +1 @@ +./gradlew test \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/make-test-dirs.sh b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/make-test-dirs.sh new file mode 100644 index 0000000000..b3f1266d49 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/code/tutorial-steps/test/make-test-dirs.sh @@ -0,0 +1,2 @@ +mkdir -p src/test/java/io/confluent/developer +mkdir src/test/resources \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/create-all-publications.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/create-all-publications.adoc new file mode 100644 index 0000000000..e3fb5be0fc --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/create-all-publications.adoc @@ -0,0 +1,8 @@ +Our tutorial demonstrates how to filter results when selecting from a table. To keep things simple, we're going to create a table backed by a Kafka topic with published book events. + +This line of Flink SQL DDL creates a table and its underlying Kafka topic to represent events generated when a publisher releases a new book. +Note that we are defining the schema for the table, which includes four fields: `book_id`, `author`, and `title` . We are also specifying that the underlying Kafka topic—which Flink SQL will auto-create—be called `publication_events` and have just one partition (the default `num.partitions` configured in the broker), and that its messages will be in JSON format. + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/create-all-publications.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/create-publications-by-author.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/create-publications-by-author.adoc new file mode 100644 index 0000000000..1a1a8711d6 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/create-publications-by-author.adoc @@ -0,0 +1,6 @@ +Since the output of our transient query looks right, the next step is to make the query persistent. This looks exactly like the transient query, except we first create a new table. The `INSERT INTO` statement returns to the CLI prompt right away, having created a persistent stream processing program running in the Flink cluster, continuously processing input records and updating the resulting `george_martin_books` table. + +Now go ahead and tun the following two commands in your Flink SQL session: ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/create-publications-by-author.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/init.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/init.adoc new file mode 100644 index 0000000000..0d4678bfb4 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/init.adoc @@ -0,0 +1,5 @@ +To get started, make a new directory anywhere you'd like for this project: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/init.sh %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/make-docker-compose.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/make-docker-compose.adoc new file mode 100644 index 0000000000..3d2d5c3e9d --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/make-docker-compose.adoc @@ -0,0 +1,4 @@ +Next, create the following `docker-compose.yml` file to obtain Confluent Platform (for Kafka in the cloud, see https://www.confluent.io/confluent-cloud/tryfree/[Confluent Cloud]). The Docker Compose file will start three Apache Flink® containers that have Kafka connector dependencies preinstalled: an interactive Flink SQL client (`flink-sql-client`) that sends streaming SQL jobs to the Flink Job Manager (`flink-job-manager`), which in turn assigns tasks to the Flink Task Manager (`flink-task-manager`) in the Flink cluster. ++++++ +
{% include_raw tutorials/filtering/flinksql/code/docker-compose.yml %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/populate-publication-events.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/populate-publication-events.adoc new file mode 100644 index 0000000000..030b610cf1 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/populate-publication-events.adoc @@ -0,0 +1,5 @@ +Let's add a small amount of data to our table, so we can see our query work. Go ahead and paste this statement into the Flink SQL CLI now. + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publication-events.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/populate-publications-by-author.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/populate-publications-by-author.adoc new file mode 100644 index 0000000000..f473b96a35 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/populate-publications-by-author.adoc @@ -0,0 +1,3 @@ ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/populate-publications-by-author.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/print-output-topic.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/print-output-topic.adoc new file mode 100644 index 0000000000..df3def501f --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/print-output-topic.adoc @@ -0,0 +1,10 @@ +Seeing is believing, so let's query the persistent `movie_sales_by_year` table. + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/print-output-topic.sql %}
++++++ + +This should yield the following output: ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-print-output-topic.log %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/start-cli.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/start-cli.adoc new file mode 100644 index 0000000000..401b7aa65e --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/start-cli.adoc @@ -0,0 +1,5 @@ +The best way to interact with Flink SQL when you're learning how things work is with the Flink SQL CLI. Fire it up as follows: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/start-cli.sh %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/start-compose.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/start-compose.adoc new file mode 100644 index 0000000000..f1de2eaf0d --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/start-compose.adoc @@ -0,0 +1,5 @@ +Launch it by running: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/docker-compose-up.sh %}
++++++ \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/transient-query.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/transient-query.adoc new file mode 100644 index 0000000000..e83230d647 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/transient-query.adoc @@ -0,0 +1,17 @@ +With our test data in place, let's try a query to compute the min and max. A `SELECT` statement in Flink SQL is called a _continuous query_ because it will continue to run and produce results dynamically. This query is _transient_, meaning that after we stop it, it is gone and will not keep processing the input stream. Further, the results aren't persisted anywhere. We'll create a _persistent_ query, the contrast to a transient push query, a few steps from now. + +If you're familiar with SQL, the text of the query itself is fairly self-explanatory. We are only selecting books written by George R. R. Martin using a `WHERE` clause. This query will keep running, continuing to return results until you use `Ctrl-C`. + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/transient-query.sql %}
++++++ + +This should yield the following output: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-transient-query.log %}
++++++ + +Enter `Q` to return to the Flink SQL prompt. + +Note that these results were materialized in memory and printed in a human readable table representation because the default `sql-client.execution.result-mode` configuration value is `table`. diff --git a/_includes/tutorials/filtering/flinksql/markup/dev/validate-publications-by-author.adoc b/_includes/tutorials/filtering/flinksql/markup/dev/validate-publications-by-author.adoc new file mode 100644 index 0000000000..3f43b864f1 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/dev/validate-publications-by-author.adoc @@ -0,0 +1,29 @@ +Seeing is believing, so let's query the persistent `george_martin_books` table. First let's ensure the result mode is set to `table`: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/set-result-mode-table.sql %}
++++++ + +Then query the `george_martin_books` table: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/print-output-topic.sql %}
++++++ + +This will yield the same output that the transient query did (perhaps in a different order) + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-print-output-topic.log %}
++++++ + +We could also query the underlying topic directly using `kafka-console-consumer`. Open a new terminal window and run the following command: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/validate-publications-by-author.sh %}
++++++ + +This will yield the following results: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/dev/expected-books-by-author.log %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/create-all-publications.sql.template.adoc b/_includes/tutorials/filtering/flinksql/markup/test/create-all-publications.sql.template.adoc new file mode 100644 index 0000000000..8c683072be --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/create-all-publications.sql.template.adoc @@ -0,0 +1,9 @@ +We could always inline the SQL statements in our Java test code, but creating separate resource files makes our test more readable and easier to maintain. Further, we can imagine parametrizing URLs as well so that we can have a single set of source controlled queries to use in tests as well as staging or production environments. + +There are a handful of resources to create for our test. These mirror the queries that we developed earlier. + +Create the following file at `src/test/resources/create-all-publications.sql.template`. Note the `KAFKA_PORT` placeholder in this file. Out test will dynamically assign it to the port that Testcontainers assigns. + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/resources/create-all-publications.sql.template %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/create-resource-create-publications-by-author.sql.template.adoc b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-create-publications-by-author.sql.template.adoc new file mode 100644 index 0000000000..a2e9e114a9 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-create-publications-by-author.sql.template.adoc @@ -0,0 +1,4 @@ +Create the following file at `src/test/resources/create-publications-by-author.sql.template`. Again, note the `KAFKA_PORT` placeholder since this table uses the Kafka connector and hence must be able to communicate with Kafka. ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/resources/create-publications-by-author.sql.template %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/create-resource-expected-publications-by-author.txt.adoc b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-expected-publications-by-author.txt.adoc new file mode 100644 index 0000000000..b2eeb84d10 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-expected-publications-by-author.txt.adoc @@ -0,0 +1,4 @@ +Finally, create the following file at `src/test/resources/expected-publications-by-author.txt` that contains our test's expected output: ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/resources/expected-publications-by-author.txt %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/create-resource-populate-publication-events.sql.adoc b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-populate-publication-events.sql.adoc new file mode 100644 index 0000000000..844fbf0bc2 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-populate-publication-events.sql.adoc @@ -0,0 +1,4 @@ +Create the following file at `src/test/resources/populate-publication-events.sql`. ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/resources/populate-publication-events.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/create-resource-populate-publications-by-author.sql.adoc b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-populate-publications-by-author.sql.adoc new file mode 100644 index 0000000000..d407b76f49 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-populate-publications-by-author.sql.adoc @@ -0,0 +1,4 @@ +Create the following file at `src/test/resources/populate-publications-by-author.sql`: ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/resources/populate-publications-by-author.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/create-resource-query-publications-by-author.sql.adoc b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-query-publications-by-author.sql.adoc new file mode 100644 index 0000000000..b50b2df7ab --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/create-resource-query-publications-by-author.sql.adoc @@ -0,0 +1,4 @@ +Next, create the following file at `src/test/resources/query-publications-by-author.sql`: ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/resources/query-publications-by-author.sql %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/invoke-test.adoc b/_includes/tutorials/filtering/flinksql/markup/test/invoke-test.adoc new file mode 100644 index 0000000000..b636584bed --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/invoke-test.adoc @@ -0,0 +1,5 @@ +Now run the test, which is as simple as: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/test/invoke-test.sh %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/make-build-gradle.adoc b/_includes/tutorials/filtering/flinksql/markup/test/make-build-gradle.adoc new file mode 100644 index 0000000000..7de98c2b5f --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/make-build-gradle.adoc @@ -0,0 +1,10 @@ +Create the following Gradle build file, named `build.gradle`, in the `filtering-events` directory. + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/build.gradle %}
++++++ + +There are a couple of important points to note in the Gradle build file: + +. Java `sourceCompatibility` and `targetCompatibility` are set to Java 11. Flink supports Java 8 (deprecated) and 11 as of the writing of this tutorial +. The `dependencies` section declares test dependencies for Testcontainers and Flink. Among the handful of Flink dependencies are ones providing local execution (e.g., `flink-statebackend-rocksdb`), the Table API (`flink-table-api-java-bridge`), and Kafka connectors that can use Schema Registry (`flink-sql-connector-kafka` and `flink-sql-avro-confluent-registry`). diff --git a/_includes/tutorials/filtering/flinksql/markup/test/make-gradle-wrapper.adoc b/_includes/tutorials/filtering/flinksql/markup/test/make-gradle-wrapper.adoc new file mode 100644 index 0000000000..a0fc0c3ac6 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/make-gradle-wrapper.adoc @@ -0,0 +1,5 @@ +And be sure to run the following command to obtain the Gradle wrapper: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/test/gradle-wrapper.sh %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/make-test-base.adoc b/_includes/tutorials/filtering/flinksql/markup/test/make-test-base.adoc new file mode 100644 index 0000000000..3b8c0bc784 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/make-test-base.adoc @@ -0,0 +1,8 @@ +Create the following abstract test class at `src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java`: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java %}
++++++ + +Take a look at this class. It contains the functionality and utility methods that any Flink SQL test would use. Namely, it +encapsulates Kafka broker and Schema Registry Testcontainer management and includes utility methods for dynamically configuring Flink SQL Kafka connectors and processing Table API results. diff --git a/_includes/tutorials/filtering/flinksql/markup/test/make-test-dirs.adoc b/_includes/tutorials/filtering/flinksql/markup/test/make-test-dirs.adoc new file mode 100644 index 0000000000..699bdd3451 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/make-test-dirs.adoc @@ -0,0 +1,7 @@ +The primary choices for programming language in which to write our tests are Java and Python given the need for Flink's Table API. We'll write ours in Java. + +To start our test project, make new directories for test source code and resources within the same `filtering-events` folder that you created earlier: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/tutorial-steps/test/make-test-dirs.sh %}
++++++ diff --git a/_includes/tutorials/filtering/flinksql/markup/test/make-test.adoc b/_includes/tutorials/filtering/flinksql/markup/test/make-test.adoc new file mode 100644 index 0000000000..d001a013a9 --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/make-test.adoc @@ -0,0 +1,7 @@ +Next, create the test implementation at `src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java`: + ++++++ +
{% include_raw tutorials/filtering/flinksql/code/src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java %}
++++++ + +The test itself it straightforward to follow. It executes the SQL from our resource files, then runs a select statement against the final output `TABLE` of our application and compares the results to what's expected. \ No newline at end of file diff --git a/_includes/tutorials/filtering/flinksql/markup/test/test-architecture.adoc b/_includes/tutorials/filtering/flinksql/markup/test/test-architecture.adoc new file mode 100644 index 0000000000..9d4416b39e --- /dev/null +++ b/_includes/tutorials/filtering/flinksql/markup/test/test-architecture.adoc @@ -0,0 +1,15 @@ +Now that you have manually developed and tested your Flink SQL application, how might you create an automated test for +it so that it's easier to maintain and upgrade over time? Imagine how painful it would be to have to manually test every change or +software dependency upgrade of your application, and then imagine having to do this for many applications. The benefits of +automated testing are clear, but how do we get there? + +First, what do we want in an automated integration test? For starters: + +. *Real running services* that our application depends on +. *Small resource footprint* so that developers can run the test locally +. *Low enough latency* so that development iterations aren't hampered -- not as low latency as is required for a unit test, but test duration should be on the order of seconds +. *Isolation* so that many tests can run concurrently on the same machine when this test gets run on a build automation server, e.g., no hardcoded ports + +Luckily, tools are at our disposal to solve these problems. We'll use https://www.testcontainers.org/[Testcontainers] to run +containerized Kafka and Schema Registry servers on dynamic ports, Flink's support for local execution environments so that we don't need to spin up a Flink cluster, and Flink's Table API +in order to execute the Flink SQL statements that comprise our application. diff --git a/tutorials/filtering/flinksql.html b/tutorials/filtering/flinksql.html new file mode 100644 index 0000000000..a987732aaa --- /dev/null +++ b/tutorials/filtering/flinksql.html @@ -0,0 +1,6 @@ +--- +layout: tutorial +permalink: /filter-a-stream-of-events/flinksql +stack: flinksql +static_data: filtering +---