Skip to content

Commit

Permalink
Feat add filtering flinksql (#1619)
Browse files Browse the repository at this point in the history
* Initial commit for FlinkSQL filtering; first four sections complete

* Completed validation section

* Completed the create sql resources section

* Completed tutorial

* Update makefile for expected results file

* Add new line end of expected log

* Fixes for running integration test

* update test for missing sql file

* review comments - fix typos
  • Loading branch information
bbejeck authored Sep 18, 2023
1 parent 72d71ec commit 548459a
Show file tree
Hide file tree
Showing 64 changed files with 1,158 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -496,3 +496,7 @@ blocks:
- name: Flink SQL test for splitting
commands:
- make -C _includes/tutorials/splitting/flinksql/code tutorial
- name: Flink SQL test for filtering
commands:
- make -C _includes/tutorials/filtering/flinksql/code tutorial

155 changes: 155 additions & 0 deletions _data/harnesses/filtering/flinksql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
dev:
steps:
- title: Prerequisites
content:
- action: skip
render:
file: shared/markup/dev/docker-prerequisite.adoc

- title: Initialize the project
content:
- action: execute
file: tutorial-steps/dev/init.sh
render:
file: tutorials/filtering/flinksql/markup/dev/init.adoc

- title: Get Confluent Platform
content:
- action: make_file
file: docker-compose.yml
render:
file: tutorials/filtering/flinksql/markup/dev/make-docker-compose.adoc

- action: execute_async
file: tutorial-steps/dev/docker-compose-up.sh
render:
file: tutorials/filtering/flinksql/markup/dev/start-compose.adoc

- action: execute
file: tutorial-steps/dev/wait-for-containers.sh
render:
skip: true

- title: Write the program interactively using the CLI
content:
- action: docker_flinksql_cli_session
container: flink-sql-client
docker_bootup_file: tutorial-steps/dev/start-cli.sh
column_width: 20
render:
file: tutorials/filtering/flinksql/markup/dev/start-cli.adoc
stdin:
- file: tutorial-steps/dev/create-all-publications.sql
render:
file: tutorials/filtering/flinksql/markup/dev/create-all-publications.adoc

- file: tutorial-steps/dev/populate-publication-events.sql
render:
file: tutorials/filtering/flinksql/markup/dev/populate-publication-events.adoc

- file: tutorial-steps/dev/transient-query.sql
render:
file: tutorials/filtering/flinksql/markup/dev/transient-query.adoc

- file: tutorial-steps/dev/create-publications-by-author.sql
render:
file: tutorials/filtering/flinksql/markup/dev/create-publications-by-author.adoc

- file: tutorial-steps/dev/populate-publications-by-author.sql
render:
file: tutorials/filtering/flinksql/markup/dev/populate-publications-by-author.adoc


stdout:
directory: tutorial-steps/dev/outputs

- title: Validate output
content:
- action: execute
file: tutorial-steps/dev/validate-publications-by-author.sh
stdout: tutorial-steps/dev/outputs/validate-publications-by-author.log
render:
file: tutorials/filtering/flinksql/markup/dev/validate-publications-by-author.adoc

test:
steps:
- title: Decide what testing tools to use
content:
- action: skip
render:
file: tutorials/filtering/flinksql/markup/test/test-architecture.adoc

- title: Create the test skeleton
content:
- action: execute
file: tutorial-steps/test/make-test-dirs.sh
render:
file: tutorials/filtering/flinksql/markup/test/make-test-dirs.adoc

- action: make_file
file: build.gradle
render:
file: tutorials/filtering/flinksql/markup/test/make-build-gradle.adoc

- action: execute
file: tutorial-steps/test/gradle-wrapper.sh
render:
file: tutorials/filtering/flinksql/markup/test/make-gradle-wrapper.adoc

- title: Create SQL resources
content:
- action: make_file
file: src/test/resources/create-all-publications.sql.template
render:
file: tutorials/filtering/flinksql/markup/test/create-all-publications.sql.template.adoc

- action: make_file
file: src/test/resources/populate-publication-events.sql
render:
file: tutorials/filtering/flinksql/markup/test/create-resource-populate-publication-events.sql.adoc
- action: make_file
file: src/test/resources/create-publications-by-author.sql.template
render:
file: tutorials/filtering/flinksql/markup/test/create-resource-create-publications-by-author.sql.template.adoc

- action: make_file
file: src/test/resources/populate-publications-by-author.sql
render:
file: tutorials/filtering/flinksql/markup/test/create-resource-populate-publications-by-author.sql.adoc

- action: make_file
file: src/test/resources/query-publications-by-author.sql
render:
file: tutorials/filtering/flinksql/markup/test/create-resource-query-publications-by-author.sql.adoc

- action: make_file
file: src/test/resources/expected-publications-by-author.txt
render:
file: tutorials/filtering/flinksql/markup/test/create-resource-expected-publications-by-author.txt.adoc

- title: Write a test
content:
- action: make_file
file: src/test/java/io/confluent/developer/AbstractFlinkKafkaTest.java
render:
file: tutorials/filtering/flinksql/markup/test/make-test-base.adoc

- action: make_file
file: src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java
render:
file: tutorials/filtering/flinksql/markup/test/make-test.adoc

- title: Invoke the test
content:
- action: execute
file: tutorial-steps/test/invoke-test.sh
render:
file: tutorials/filtering/flinksql/markup/test/invoke-test.adoc

ccloud:
steps:
- title: Run your app to Confluent Cloud
content:
- action: skip
render:
file: shared/markup/ccloud/try-ccloud.adoc
4 changes: 3 additions & 1 deletion _data/tutorials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ filtering:
kstreams: enabled
kafka: enabled
confluent: enabled
flinksql: enabled
splitting:
title: How to split a stream of events into substreams
meta-description: split a stream of events into substreams
Expand Down Expand Up @@ -621,7 +622,8 @@ kafka-producer-application-callback:
the Callback interface
canonical: confluent
slug: /kafka-producer-callback-application
question: How can you use callbacks with a KafkaProducer to handle responses from the broker?
question: How can you use callbacks with a KafkaProducer to handle responses from
the broker?
introduction: You have an application using an Apache Kafka producer, and you want
an automatic way of handling responses after producing records. In this tutorial,
you'll learn how to use the Callback interface to automatically handle responses
Expand Down
7 changes: 7 additions & 0 deletions _includes/tutorials/filtering/flinksql/code/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
tutorial-steps/dev/outputs/

# Ignore Gradle project-specific cache directory
.gradle

# Ignore Gradle build output directory
build
11 changes: 11 additions & 0 deletions _includes/tutorials/filtering/flinksql/code/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
STEPS_DIR := tutorial-steps
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
TEMP_DIR := $(shell mktemp -d)
SEQUENCE := "dev, test, ccloud"

tutorial:
rm -r $(DEV_OUTPUTS_DIR) || true
mkdir $(DEV_OUTPUTS_DIR)
harness-runner ../../../../../_data/harnesses/filtering/flinksql.yml $(TEMP_DIR) $(SEQUENCE)
diff --strip-trailing-cr $(STEPS_DIR)/dev/expected-books-by-author.log $(DEV_OUTPUTS_DIR)/validate-publications-by-author.log
reset
34 changes: 34 additions & 0 deletions _includes/tutorials/filtering/flinksql/code/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id "java"
id "idea"
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
version = "0.0.1"

repositories {
mavenCentral()
}

dependencies {
testImplementation "com.google.guava:guava:31.1-jre"
testImplementation "junit:junit:4.13.2"
testImplementation 'org.testcontainers:testcontainers:1.17.6'
testImplementation 'org.testcontainers:kafka:1.17.6'
testImplementation "org.apache.flink:flink-sql-connector-kafka:1.17.1"
testImplementation "org.apache.flink:flink-sql-avro-confluent-registry:1.17.1"
testImplementation "org.apache.flink:flink-test-utils:1.17.1"
testImplementation "org.apache.flink:flink-test-utils-junit:1.17.1"
testImplementation 'org.apache.flink:flink-json:1.17.1'
testImplementation "org.apache.flink:flink-table-api-java-bridge:1.17.0"
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1"
testImplementation "org.apache.flink:flink-table-planner_2.12:1.17.1:tests"
testImplementation "org.apache.flink:flink-statebackend-rocksdb:1.17.1"
}
59 changes: 59 additions & 0 deletions _includes/tutorials/filtering/flinksql/code/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.16.0-scala_2.12-java11
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
flink-jobmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
flink-taskmanager:
image: cnfldemos/flink-kafka:1.16.0-scala_2.12-java11
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading

0 comments on commit 548459a

Please sign in to comment.