diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index f5a05485cd..aab820ed88 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,2 @@ -# See go/codeowners - automatically generated for confluentinc/kafka-streams-examples: -* @confluentinc/ksql +# assign ksql team as reviewers for all PRs: +* @confluentinc/ksql diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000000..b901097f2d --- /dev/null +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000000..a9a4b38f68 --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,4 @@ +# custmized maven that help avoid download only highest poms from version range +# https://issues.apache.org/jira/browse/MRESOLVER-164 +distributionUrl=https://confluent-packaging-tools.s3-us-west-2.amazonaws.com/apache-maven-3.8.1.2-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/Jenkinsfile b/Jenkinsfile index 1cc643814a..7b9bbdae11 100755 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -11,4 +11,5 @@ dockerfile { osTypes = ['ubi8'] slackChannel = 'ksqldb-warn' nanoVersion = true + buildArm = true } diff --git a/README.md b/README.md index 406c57efdb..20ee75c17a 100644 --- a/README.md +++ b/README.md @@ -319,7 +319,7 @@ The next step is to create a standalone jar ("fat jar") of the [application exam # Create a standalone jar ("fat jar") $ mvn clean package -# >>> Creates target/kafka-streams-examples-7.0.0-standalone.jar +# >>> Creates target/kafka-streams-examples-7.4.0-0-standalone.jar ``` > Tip: If needed, you can disable the test suite during packaging, for example to speed up the packaging or to lower @@ -333,7 +333,7 @@ You can now run the application examples as follows: ```shell # Run an example application from the standalone jar. Here: `WordCountLambdaExample` -$ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \ +$ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \ io.confluent.examples.streams.WordCountLambdaExample ``` @@ -348,7 +348,7 @@ If you want to turn on log4j while running your example application, you can edi ```shell # Run an example application from the standalone jar. Here: `WordCountLambdaExample` -$ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \ +$ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \ -Dlog4j.configuration=file:src/main/resources/log4j.properties \ io.confluent.examples.streams.WordCountLambdaExample ``` diff --git a/docker-compose.yml b/docker-compose.yml index 8bd53ec86d..f4b0bdd6a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.0.0 + image: confluentinc/cp-zookeeper:7.4.x-latest hostname: zookeeper ports: - '32181:32181' @@ -13,7 +13,7 @@ services: - "moby:127.0.0.1" kafka: - image: confluentinc/cp-enterprise-kafka:7.0.0 + image: confluentinc/cp-enterprise-kafka:7.4.x-latest hostname: kafka ports: - '9092:9092' @@ -38,7 +38,7 @@ services: - "moby:127.0.0.1" schema-registry: - image: confluentinc/cp-schema-registry:7.0.0 + image: confluentinc/cp-schema-registry:7.4.x-latest hostname: schema-registry depends_on: - zookeeper @@ -47,6 +47,8 @@ services: - '8081:8081' environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092' + SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081" SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181 extra_hosts: - "moby:127.0.0.1" @@ -54,7 +56,7 @@ services: # This "container" is a workaround to pre-create topics for the Kafka Music application # until we have a more elegant way to do that. kafka-create-topics: - image: confluentinc/cp-kafka:7.0.0 + image: confluentinc/cp-kafka:7.4.x-latest depends_on: - kafka hostname: kafka-create-topics @@ -65,8 +67,9 @@ services: # See https://docs.docker.com/compose/startup-order/ command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b kafka:29092 1 20 && \ - kafka-topics --create --topic play-events --if-not-exists --zookeeper zookeeper:32181 --partitions 4 --replication-factor 1 && \ - kafka-topics --create --topic song-feed --if-not-exists --zookeeper zookeeper:32181 --partitions 4 --replication-factor 1 && \ + kafka-topics --create --topic play-events --if-not-exists --bootstrap-server kafka:29092 --partitions 4 --replication-factor 1 && \ + kafka-topics --create --topic play-events-per-session --if-not-exists --bootstrap-server kafka:29092 --partitions 4 --replication-factor 1 && \ + kafka-topics --create --topic song-feed --if-not-exists --bootstrap-server kafka:29092 --partitions 4 --replication-factor 1 && \ sleep infinity'" environment: # The following settings are listed here only to satisfy the image's requirements. @@ -78,7 +81,7 @@ services: # Continuously generates input data for the Kafka Music application. kafka-music-data-generator: - image: confluentinc/kafka-streams-examples:7.0.0 + image: confluentinc/kafka-streams-examples:7.4.x-latest hostname: kafka-music-data-generator depends_on: - kafka @@ -89,7 +92,7 @@ services: cub kafka-ready -b kafka:29092 1 20 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 20 && \ - java -cp /usr/share/java/kafka-streams-examples/kafka-streams-examples-7.0.0-standalone.jar \ + java -cp /usr/share/java/kafka-streams-examples/kafka-streams-examples-7.4.0-0-standalone.jar \ io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver \ kafka:29092 http://schema-registry:8081'" environment: @@ -103,7 +106,7 @@ services: # Runs the Kafka Music application. kafka-music-application: - image: confluentinc/kafka-streams-examples:7.0.0 + image: confluentinc/kafka-streams-examples:7.4.x-latest hostname: kafka-music-application depends_on: - kafka diff --git a/mvnw b/mvnw new file mode 100755 index 0000000000..41c0f0c23d --- /dev/null +++ b/mvnw @@ -0,0 +1,310 @@ +#!/bin/sh +# ---------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/pom.xml b/pom.xml index a4ede43089..02cb880cb8 100644 --- a/pom.xml +++ b/pom.xml @@ -7,12 +7,12 @@ io.confluent rest-utils-parent - 7.0.0 + [7.4.0-0, 7.4.1-0) kafka-streams-examples jar - 7.0.0 + 7.4.0-0 Confluent, Inc. @@ -56,8 +56,6 @@ false 1.8 UTF-8 - 2.13 - 2.13.2 3.1.2 ubi8 @@ -66,7 +64,7 @@ ${project.version}-${docker.os_type} ${confluent.version.range} ${confluent.version.range} - 7.0.0 + 7.4.0-0 @@ -131,12 +129,12 @@ org.slf4j - slf4j-log4j12 + slf4j-reload4j io.confluent - confluent-log4j + logredactor javax.ws.rs diff --git a/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java b/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java index 28bbad1266..3533b16c69 100644 --- a/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java @@ -58,7 +58,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.AnomalyDetectionLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.AnomalyDetectionLambdaExample
  * }
*

* 4) Write some input data to the source topic (e.g. via {@code kafka-console-producer}. The already diff --git a/src/main/java/io/confluent/examples/streams/ApplicationResetExample.java b/src/main/java/io/confluent/examples/streams/ApplicationResetExample.java index f7d400829d..653c271baa 100644 --- a/src/main/java/io/confluent/examples/streams/ApplicationResetExample.java +++ b/src/main/java/io/confluent/examples/streams/ApplicationResetExample.java @@ -59,7 +59,7 @@ * Once packaged you can then run: *

  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.ApplicationResetExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.ApplicationResetExample
  * }
  * 
* 4) Write some input data to the source topic (e.g. via {@code kafka-console-producer}). @@ -114,7 +114,7 @@ * Thus, restart the application via: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.ApplicationResetExample localhost:9092 --reset
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.ApplicationResetExample localhost:9092 --reset
  * }
* 9) If your console consumer (from step 5) is still running, you should see the same output data again. * If it was stopped and you restart it, if will print the result "twice". diff --git a/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java b/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java index 6b256e3b8e..54f16445cc 100644 --- a/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java +++ b/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java @@ -76,7 +76,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.GlobalKTablesExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link GlobalKTablesAndStoresExampleDriver}). The @@ -86,7 +86,7 @@ * {@code * # Here: Write input data using the example driver. The driver will exit once it has received * # all EnrichedOrders - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.GlobalKTablesAndStoresExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.GlobalKTablesAndStoresExampleDriver * } * *

diff --git a/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java b/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java index f450245ff2..e7095d0cfc 100644 --- a/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java +++ b/src/main/java/io/confluent/examples/streams/JsonToAvroExample.java @@ -57,7 +57,7 @@ * Once packaged you can then run: *

  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.JsonToAvroExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.JsonToAvroExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link JsonToAvroExampleDriver}). The @@ -68,7 +68,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.JsonToAvroExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.JsonToAvroExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java b/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java index 58abb194e5..e490031bca 100644 --- a/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/JsonToAvroExampleDriver.java @@ -49,7 +49,7 @@ * Once packaged you can then run: *
  * {@code
- * java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.JsonToAvroExampleDriver
+ * java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.JsonToAvroExampleDriver
  * }
  * 
* diff --git a/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java b/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java index e35f81b516..b80128846a 100644 --- a/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java @@ -60,7 +60,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.MapFunctionLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.MapFunctionLambdaExample
  * }
  * 
* 4) Write some input data to the source topic (e.g. via {@code kafka-console-producer}). The already diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java b/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java index f2aa9559ae..2b6cae7716 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java @@ -78,7 +78,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link PageViewRegionExampleDriver}). The @@ -88,7 +88,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via `Ctrl-C`. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver * } * * 5) Inspect the resulting data in the output topic, e.g. via {@code kafka-console-consumer}. diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java b/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java index de2b77b262..61cb90b425 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java @@ -49,7 +49,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver
  * }
  * 
* You should terminate with {@code Ctrl-C}. diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java b/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java index 6cab0625fa..8b416393be 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java @@ -75,7 +75,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link PageViewRegionExampleDriver}). @@ -85,7 +85,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via `Ctrl-C`. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver * } * * 5) Inspect the resulting data in the output topic, e.g. via {@code kafka-console-consumer}. diff --git a/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java b/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java index aceebd8074..9952891303 100644 --- a/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java +++ b/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java @@ -100,7 +100,7 @@ * [vagrant@kafka ~]$ mvn clean -DskipTests=true package * * # Now we can start this example application - * [vagrant@kafka ~]$ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \ + * [vagrant@kafka ~]$ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \ * io.confluent.examples.streams.SecureKafkaStreamsExample * } * diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java index 14a245b7f0..0002b329b7 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java @@ -70,7 +70,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.SessionWindowsExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.SessionWindowsExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link SessionWindowsExampleDriver}). The @@ -80,7 +80,7 @@ * {@code * # Here: Write input data using the example driver. The driver will also consume, and print, the data from the output * topic. The driver will stop when it has received all output records - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver * } * * You should see output data similar to: diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java index bd085b43c1..8b443591f2 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java @@ -42,7 +42,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver
  * }
  * 
*/ diff --git a/src/main/java/io/confluent/examples/streams/SumLambdaExample.java b/src/main/java/io/confluent/examples/streams/SumLambdaExample.java index 5a42497371..5b566ed4c5 100644 --- a/src/main/java/io/confluent/examples/streams/SumLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/SumLambdaExample.java @@ -53,7 +53,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.SumLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.SumLambdaExample
  * }
  * 
* 4) Write some input data to the source topic (e.g. via {@link SumLambdaExampleDriver}). The @@ -63,7 +63,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via `Ctrl-C`. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver * } * * 5) Inspect the resulting data in the output topics, e.g. via {@code kafka-console-consumer}. diff --git a/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java b/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java index 30ce201d1d..23af0221ab 100644 --- a/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java @@ -42,7 +42,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver
  * }
  * 
* You should terminate with {@code Ctrl-C}. diff --git a/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java b/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java index 324b0ab41e..ac9d8c1086 100644 --- a/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java @@ -52,7 +52,7 @@ * Once packaged you can then run: *
  * {@code
- * java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
+ * java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
  * }
  * 
* diff --git a/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java b/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java index e554f3a443..3d3974d4e9 100644 --- a/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java @@ -81,7 +81,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link TopArticlesExampleDriver}). @@ -92,7 +92,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.TopArticlesExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.TopArticlesExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java b/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java index 410b09a294..6945957895 100644 --- a/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java @@ -54,7 +54,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.UserRegionLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.UserRegionLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@code kafka-console-producer}). The already diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java index 467de6b06a..c1fecbff58 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java @@ -58,7 +58,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link WikipediaFeedAvroExampleDriver}). @@ -69,7 +69,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java index ad028dd8a5..ed39a23903 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java @@ -45,7 +45,7 @@ * Once packaged you can then run: *
  * {@code
- * java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver
+ * java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver
  * }
  * 
* diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java index 348b7b29e5..4ca3cce478 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java @@ -55,7 +55,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link WikipediaFeedAvroExampleDriver}). @@ -66,7 +66,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java b/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java index 71ae5dc9c9..eeadcc9415 100644 --- a/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java @@ -62,7 +62,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WordCountLambdaExample
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WordCountLambdaExample
  * }
  * 
* 4) Write some input data to the source topic "streams-plaintext-input" (e.g. via {@code kafka-console-producer}). diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java index 8bd310f5d8..3bf1e4ef33 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java @@ -36,7 +36,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesDriver
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesDriver
  * }
  * 
* You should terminate with Ctrl-C @@ -62,7 +62,7 @@ public static void main(final String [] args) throws Exception { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java index 3725641cfe..084cff27f5 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java @@ -83,7 +83,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample 7070
  * }
  * 
@@ -94,7 +94,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample 7071
  * }
  * 
diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java index e319fe6376..0ac54986da 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java @@ -110,7 +110,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample 7070
  * }
  * 
@@ -121,7 +121,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample 7071
  * }
  * 
diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java index 7cf81ab8f1..27bd6f1253 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java @@ -50,7 +50,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver
  * }
  * 
diff --git a/src/main/java/io/confluent/examples/streams/microservices/AddInventory.java b/src/main/java/io/confluent/examples/streams/microservices/AddInventory.java index 6ea9acd08d..7365a424c4 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/AddInventory.java +++ b/src/main/java/io/confluent/examples/streams/microservices/AddInventory.java @@ -33,7 +33,7 @@ private static void sendInventory(final List> invento producerConfig.putAll(defaultConfig); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "inventory-generator"); MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(producerConfig); diff --git a/src/main/java/io/confluent/examples/streams/microservices/PostOrdersAndPayments.java b/src/main/java/io/confluent/examples/streams/microservices/PostOrdersAndPayments.java index 181d82b0bc..06bfe14e3a 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/PostOrdersAndPayments.java +++ b/src/main/java/io/confluent/examples/streams/microservices/PostOrdersAndPayments.java @@ -51,7 +51,7 @@ private static KafkaProducer buildPaymentProducer(final String producerConfig.putAll(defaultConfig); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "payment-generator"); MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(producerConfig); diff --git a/src/main/java/io/confluent/examples/streams/microservices/README.md b/src/main/java/io/confluent/examples/streams/microservices/README.md index ae6d680e48..5b31189325 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/README.md +++ b/src/main/java/io/confluent/examples/streams/microservices/README.md @@ -34,7 +34,7 @@ Then run the fully-working demo [end-to-end](https://docs.confluent.io/current/t It runs the ecosystem and all the microservices for you including Kafka Connect, Elasticsearch, KSQL and Control Center. To play with this ecosystem the simplest way is to run the tests and fiddle with the code (stand alone execution is only supported in branch 5.0.0+ so go there if you want stand alone or docker support). Each test boots a self-contained Kafka cluster so it's easy to play with different queries and configurations. -The best place to start is [EndToEndTest.java](https://github.com/confluentinc/kafka-streams-examples/blob/7.0.x/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java) +The best place to start is [EndToEndTest.java](https://github.com/confluentinc/kafka-streams-examples/blob/master/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java) # Running the Examples: * Requires Java 1.8 diff --git a/src/main/java/io/confluent/examples/streams/microservices/util/ProduceCustomers.java b/src/main/java/io/confluent/examples/streams/microservices/util/ProduceCustomers.java index 3ebbd617b7..d7419c7693 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/util/ProduceCustomers.java +++ b/src/main/java/io/confluent/examples/streams/microservices/util/ProduceCustomers.java @@ -61,7 +61,7 @@ public static void main(final String[] args) throws Exception { props.putAll(defaultConfig); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.RETRIES_CONFIG, 1); MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(props); try (final KafkaProducer producer = new KafkaProducer<>(props, new LongSerializer(), mySerializer)) { diff --git a/src/main/java/io/confluent/examples/streams/microservices/util/ProduceOrders.java b/src/main/java/io/confluent/examples/streams/microservices/util/ProduceOrders.java index 2c64086433..06726ac33e 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/util/ProduceOrders.java +++ b/src/main/java/io/confluent/examples/streams/microservices/util/ProduceOrders.java @@ -63,7 +63,7 @@ public static void main(final String[] args) throws Exception { props.putAll(defaultConfig); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.RETRIES_CONFIG, 1); MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(props); try (final KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), mySerializer)) { diff --git a/src/main/java/io/confluent/examples/streams/microservices/util/ProducePayments.java b/src/main/java/io/confluent/examples/streams/microservices/util/ProducePayments.java index 23a8b89149..8c1d7a6f92 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/util/ProducePayments.java +++ b/src/main/java/io/confluent/examples/streams/microservices/util/ProducePayments.java @@ -61,7 +61,7 @@ public static void main(final String[] args) throws Exception { props.putAll(defaultConfig); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, "all"); - props.put(ProducerConfig.RETRIES_CONFIG, 0); + props.put(ProducerConfig.RETRIES_CONFIG, 1); MonitoringInterceptorUtils.maybeConfigureInterceptorsProducer(props); try (final KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), mySerializer)) { diff --git a/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala b/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala index f44178356c..83dbcb4218 100644 --- a/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala +++ b/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala @@ -54,7 +54,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} * Once packaged you can then run: * * {{{ - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.MapFunctionScalaExample + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.MapFunctionScalaExample * }}} * * 4) Write some input data to the source topics (e.g. via `kafka-console-producer`. The already diff --git a/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala b/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala index ee4c2ce1a2..42a50b6a19 100644 --- a/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala +++ b/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala @@ -41,7 +41,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} * Once packaged you can then run: * * {{{ - * $ java -cp target/kafka-streams-examples-7.0.0-standalone.jar io.confluent.examples.streams.WordCountLambdaExample + * $ java -cp target/kafka-streams-examples-7.4.0-0-standalone.jar io.confluent.examples.streams.WordCountLambdaExample * }}} * * 4) Write some input data to the source topic "streams-plaintext-input" (e.g. via `kafka-console-producer`). diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala index c50fe726f0..c35ca8a4c4 100644 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala +++ b/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala @@ -194,7 +194,7 @@ class CMSStore[T: CMSHasher](override val name: String, name, Serdes.Integer(), TopCMSSerde[T]) - changeLogger = new CMSStoreChangeLogger[Integer, TopCMS[T]](name, context, serdes) + changeLogger = new CMSStoreChangeLogger[Integer, TopCMS[T]](name, context, serdes, name) // Note: We must manually guard with `loggingEnabled` here because `context.register()` ignores // that parameter. diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala index 362f0810cc..4bd0d0f0a9 100644 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala +++ b/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala @@ -16,7 +16,7 @@ package io.confluent.examples.streams.algebird import org.apache.kafka.streams.processor.StateStoreContext -import org.apache.kafka.streams.processor.internals.{ProcessorStateManager, RecordCollector} +import org.apache.kafka.streams.processor.internals.{InternalProcessorContext, ProcessorStateManager, RecordCollector} import org.apache.kafka.streams.state.StateSerdes /** @@ -31,20 +31,21 @@ import org.apache.kafka.streams.state.StateSerdes class CMSStoreChangeLogger[K, V](val storeName: String, val context: StateStoreContext, val partition: Int, - val serialization: StateSerdes[K, V]) { + val serialization: StateSerdes[K, V], + val processorNodeId: String) { - private val topic = ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName) + private val topic = ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName, context.taskId().topologyName()) private val collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector - def this(storeName: String, context: StateStoreContext, serialization: StateSerdes[K, V]) = { - this(storeName, context, context.taskId.partition(), serialization) + def this(storeName: String, context: StateStoreContext, serialization: StateSerdes[K, V], processorNodeId: String) = { + this(storeName, context, context.taskId.partition(), serialization, processorNodeId) } def logChange(key: K, value: V, timestamp: Long): Unit = { if (collector != null) { val keySerializer = serialization.keySerializer val valueSerializer = serialization.valueSerializer - collector.send(this.topic, key, value, null, this.partition, timestamp, keySerializer, valueSerializer) + collector.send(this.topic, key, value, null, this.partition, timestamp, keySerializer, valueSerializer, processorNodeId, context.asInstanceOf[InternalProcessorContext[Void,Void]]) } } diff --git a/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java b/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java index f1198fadb7..56ed770bf7 100644 --- a/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java @@ -100,7 +100,7 @@ public void shouldReprocess() throws Exception { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig); diff --git a/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java b/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java index 0b2b562bd0..8b69e4fe45 100644 --- a/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java @@ -345,7 +345,7 @@ private void writeInputDataToStream(final List + * Makes sense only on per partition basis. + *

+ * Reordering occurs within the time windows defined by the {@code grace} constructor parameter. + * + * Note: This example uses lambda expressions and thus works with Java 8+ only. + */ +public class ReorderIntegrationTest { + + public static class ReorderTransformer + implements Transformer> { + + public interface StoreKeyGenerator { + K getStoreKey(K key, V val); + } + + private final String storeName; + private final Duration grace; + private KeyValueStore reorderStore; + private ProcessorContext context; + private final StoreKeyGenerator storeKeyGenerator; + + public ReorderTransformer(final String storeName, final Duration grace, + final StoreKeyGenerator storeKeyGenerator) { + this.storeName = storeName; + this.grace = grace; + this.storeKeyGenerator = storeKeyGenerator; + } + + @Override + public void init(final ProcessorContext context) { + this.reorderStore = context.getStateStore(this.storeName); + this.context = context; + context.schedule( + this.grace, + PunctuationType.STREAM_TIME, + this::punctuate + ); + } + + /** + * 1) read the timestamp from the message value + * 2) inserts into a KeyValueStore using (timestamp, message-key) pair + * as the key and the message-value as the value, this also provides + * de-duplication. + * + * @param key + * @param value + * @return + */ + @Override + public KeyValue transform(final K key, final V value) { + // Keys need to contain and be sortable by time + final K storeKey = storeKeyGenerator.getStoreKey(key, value); + final V storeValue = reorderStore.get(storeKey); + if(storeValue == null) { + reorderStore.put(storeKey, value); + } + return null; // null suppresses sending to downstream + } + + /** + * Scheduled to be called automatically when the period + * within which message reordering occurs expires. + * + * Outputs downstream accumulated records sorted by their timestamp. + * + * 1) read the entire store + * 2) send the fetched messages in order using context.forward() and delete + * them from the store + * + * @param timestamp – stream time of the punctuate function call + */ + void punctuate(final long timestamp) { + try(KeyValueIterator it = reorderStore.all()) { + while (it.hasNext()) { + final KeyValue kv = it.next(); + context.forward(kv.key, kv.value); + reorderStore.delete(kv.key); + } + } + } + + @Override + public void close() { + } + } + + + public static class TestTimestampExtractor implements TimestampExtractor { + @Override + public long extract(final ConsumerRecord record, final long partitionTime) { + return (long) record.value(); + } + } + + private static long ts(final String timeString) throws ParseException { + return Instant.parse(timeString).toEpochMilli(); + } + + @Test + public void shouldReorderTheInput() throws ParseException { + + // Input not ordered by time + final List inputValues = Arrays.asList( + ts("2021-11-03T23:00:00Z"), // stream time calibration + ts("2021-11-04T01:05:00Z"), // 10 hours interval border is at "2021-11-04T00:00:00Z" + ts("2021-11-04T01:10:00Z"), + ts("2021-11-04T01:40:00Z"), + ts("2021-11-04T02:25:00Z"), + ts("2021-11-04T01:20:00Z"), + ts("2021-11-04T02:45:00Z"), + ts("2021-11-04T02:00:00Z"), + ts("2021-11-04T03:00:00Z"), + ts("2021-11-04T02:40:00Z"), + ts("2021-11-04T02:20:00Z"), // 10 hours interval border is at "2021-11-04T10:00:00Z" + ts("2021-11-05T00:00:00Z") // stream time calibration + ); + + // Expected ordered by time + final List expectedValues = Arrays.asList( + ts("2021-11-03T23:00:00Z"), // stream time calibration + ts("2021-11-04T01:05:00Z"), + ts("2021-11-04T01:10:00Z"), + ts("2021-11-04T01:20:00Z"), + ts("2021-11-04T01:40:00Z"), + ts("2021-11-04T02:00:00Z"), + ts("2021-11-04T02:20:00Z"), + ts("2021-11-04T02:25:00Z"), + ts("2021-11-04T02:40:00Z"), + ts("2021-11-04T02:45:00Z"), + ts("2021-11-04T03:00:00Z"), + ts("2021-11-05T00:00:00Z") // stream time calibration + ); + + // + // Step 1: Configure and start the processor topology. + // + final StreamsBuilder builder = new StreamsBuilder(); + + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "reorder-integration-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName()); + + final String inputTopic = "inputTopic"; + final String outputTopic = "outputTopic"; + final String persistentStore = "reorderStore"; + final String transformerName = "reorderTransformer"; + + final StoreBuilder> countStoreSupplier = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(persistentStore), + Serdes.String(), + Serdes.Long()); + + final KStream stream = builder.stream(inputTopic); + final KStream reordered = stream + .transform(() -> new ReorderTransformer<>(persistentStore, + Duration.of(10, HOURS), (k,v) -> String.format("key-%d", v)), + Named.as(transformerName)); + reordered.to(outputTopic); + + final Topology topology = builder.build(); + topology.addStateStore(countStoreSupplier, transformerName); + + try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration)) { + // + // Step 2: Setup input and output topics. + // + final TestInputTopic input = topologyTestDriver + .createInputTopic(inputTopic, + new Serdes.StringSerde().serializer(), + new LongSerializer()); + final TestOutputTopic output = topologyTestDriver + .createOutputTopic(outputTopic, + new Serdes.StringSerde().deserializer(), + new LongDeserializer()); + + // + // Step 3: Produce some input data to the input topic. + // + input.pipeValueList(inputValues); + + // + // Step 4: Verify the application's output data. + // + assertThat(output.readValuesToList(), equalTo(expectedValues)); + } + } +} \ No newline at end of file diff --git a/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java b/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java index 23b73934f2..8dcf4d3ca7 100644 --- a/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java @@ -113,7 +113,7 @@ public void shouldRoundTripSpecificAvroDataThroughKafka() throws Exception { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); producerConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); diff --git a/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java b/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java index 0510f22a4d..3625e25588 100644 --- a/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java @@ -102,7 +102,7 @@ public static void createTopicsAndProduceDataToInputTopics() throws Exception { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously( diff --git a/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java b/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java index 141f8491d6..cd697baefd 100644 --- a/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java +++ b/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java @@ -92,10 +92,17 @@ public void shouldProcessOrdersWithSufficientStockAndRejectOrdersWithInsufficien private List> readInventoryStateStore(final int numberOfRecordsToWaitFor) throws InterruptedException { + // named topologies are experimental, using `null` for now return IntegrationTestUtils - .waitUntilMinKeyValueRecordsReceived(inventoryConsumerProperties(CLUSTER), - ProcessorStateManager.storeChangelogTopic(InventoryService.SERVICE_APP_ID, - InventoryService.RESERVED_STOCK_STORE_NAME), numberOfRecordsToWaitFor); + .waitUntilMinKeyValueRecordsReceived( + inventoryConsumerProperties(CLUSTER), + ProcessorStateManager.storeChangelogTopic( + InventoryService.SERVICE_APP_ID, + InventoryService.RESERVED_STOCK_STORE_NAME, + null + ), + numberOfRecordsToWaitFor + ); } private static Properties inventoryConsumerProperties(final EmbeddedSingleNodeKafkaCluster cluster) { diff --git a/src/test/java/io/confluent/examples/streams/microservices/util/MicroserviceTestUtils.java b/src/test/java/io/confluent/examples/streams/microservices/util/MicroserviceTestUtils.java index 46f79c6a77..f27ddeb666 100644 --- a/src/test/java/io/confluent/examples/streams/microservices/util/MicroserviceTestUtils.java +++ b/src/test/java/io/confluent/examples/streams/microservices/util/MicroserviceTestUtils.java @@ -71,7 +71,7 @@ protected static Properties producerConfig(final EmbeddedSingleNodeKafkaCluster final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1); return producerConfig; } diff --git a/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala index e5d5d6b1f6..df903bf47c 100644 --- a/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala @@ -99,7 +99,7 @@ class GenericAvroScalaIntegrationTest extends AssertionsForJUnit { val p = new Properties() p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") + p.put(ProducerConfig.RETRIES_CONFIG, "1") p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer]) p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) diff --git a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala index 56e0a426c2..cc082ae08a 100644 --- a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala @@ -17,9 +17,9 @@ package io.confluent.examples.streams import java.util import java.util.Properties - import io.confluent.examples.streams.algebird.{CMSStoreBuilder, ProbabilisticCounter} import org.apache.kafka.common.serialization._ +import org.apache.kafka.streams.kstream.Named import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.scala.StreamsBuilder @@ -105,7 +105,7 @@ class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { val textLines: KStream[String, String] = builder.stream[String, String](inputTopic) val approximateWordCounts: KStream[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) - .transform(() => new ProbabilisticCounter(cmsStoreName), cmsStoreName) + .transform(() => new ProbabilisticCounter(cmsStoreName), Named.as("cms-store"), cmsStoreName) approximateWordCounts.to(outputTopic) builder } diff --git a/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala index 176425629d..b7d3eabe41 100644 --- a/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala +++ b/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala @@ -92,7 +92,7 @@ class SpecificAvroScalaIntegrationTest extends AssertionsForJUnit { val p = new Properties() p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") + p.put(ProducerConfig.RETRIES_CONFIG, "1") p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer]) p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl) diff --git a/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala b/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala index 8cc792d060..542a5557c3 100644 --- a/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala +++ b/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala @@ -15,8 +15,6 @@ */ package io.confluent.examples.streams.algebird -import java.lang.Long - import com.twitter.algebird.TopCMS import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.header.Headers @@ -24,7 +22,7 @@ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.serialization.{Serdes, Serializer} import org.apache.kafka.common.utils.LogContext import org.apache.kafka.streams.processor.StateStoreContext -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics +import org.apache.kafka.streams.processor.internals.{InternalProcessorContext, MockStreamsMetrics} import org.apache.kafka.streams.state.KeyValueStoreTestDriver import org.apache.kafka.streams.state.internals.ThreadCache import org.apache.kafka.test.{InternalMockProcessorContext, MockRecordCollector, TestUtils} @@ -172,9 +170,11 @@ class CMSStoreTest extends AssertionsForJUnit with MockitoSugar { value: V, headers: Headers, partition: Integer, - timestamp: Long, + timestamp: java.lang.Long, keySerializer: Serializer[K], - valueSerializer: Serializer[V]): Unit = { + valueSerializer: Serializer[V], + processorNodeId: String, + internalContext: InternalProcessorContext[Void,Void]): Unit = { observedChangelogRecords.add(new ProducerRecord[K, V](topic, partition, timestamp, key, value)) } }