diff --git a/common/build.gradle b/common/build.gradle index 9e87c18..cbc07b7 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -39,7 +39,7 @@ dependencies { implementation 'org.slf4j:slf4j-simple:2.0.7' implementation('org.apache.kafka:kafka-clients') { version { - strictly '3.6.0' + strictly '3.7.0' } } implementation 'io.confluent:kafka-streams-avro-serde:7.5.1' diff --git a/reordering-streams/kstreams/README.md b/reordering-streams/kstreams/README.md new file mode 100644 index 0000000..0e2668c --- /dev/null +++ b/reordering-streams/kstreams/README.md @@ -0,0 +1,116 @@ + + + +# How to reorder events in Kafka Streams + +Consider the case where the events in a Kafka topic are out of order. +Specifically, the producer delivered the events in order, but they are out of order from the perspective of the timestamps embedded in the event payload. + +In this tutorial, we'll cover how you can reorder these records in the event stream using the embedded event timestamps. +The reordering will only occur per-partition and within a specific time window provided at startup. + +NOTE: This tutorial was adapted from an [original contribution](https://github.com/confluentinc/kafka-streams-examples/pull/411) by [Sergey Shcherbakov](https://github.com/sshcherbakov) + +## Setup + +To accomplish the reordering, we'll leverage the fact that RocksDB stores all entries sorted by key. +So we'll use the [KStream.process](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.api.ProcessorSupplier-java.lang.String...-) method that will store incoming records into a state store using an embedded timestamp for the key. Then, we'll schedule a [punctuation](https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#defining-a-stream-processor) to occur at a given interval that will iterate over the contents of the store and forward them to downstream operators, but now in order with respect to the embedded timestamps. + +## Reordering by event timestamp + +While the code is fairly straightforward, let's take a step-by-step walk-through of the key parts of the application. +First we'll look at the `Processor.init` method details: + +```java + @Override +public void init(ProcessorContext context) { + this.reorderStore = context.getStateStore(this.storeName); + this.context = context; + context.schedule( + this.reorderWindow, + PunctuationType.STREAM_TIME, + this::forwardOrderedByEventTime + ); +} +``` + +Kafka Streams calls the`Processor.init` method when creating the topology and the method performs setup actions defined by the developer. +In this case, the initialization steps are: +1. Store a reference to the state store used to reorder the records. +2. Store a [ProcessorContext](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/processor/api/ProcessorContext.html) reference which you'll use to forward records to downstream operators. +3. Using the `ProcessorContext` to schedule a punctuation - the main part of this tutorial. + +```java +@Override +public void process(Record kvRecord) { + final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value()); + final V storeValue = reorderStore.get(storeKey); + + if (storeValue == null) { + reorderStore.put(storeKey, kvRecord.value()); + } + } +``` +Here is the `process` method, which is where the `Processor` takes action for each incoming record. +There's a `ReorderKeyGenerator` interface that takes the incoming key and value and returns the new key to order the records. In our case, it simply returns the +timestamp embedded in the event. We'll discuss the `ReorderKeyGenerator` interface later in the tutorial. + +Having seen how to update the key needed for sorting, now let's take a look at how Kafka Streams propagates this new order to any downstream +operators: +```java +void forwardOrderedByEventTime(final long timestamp) { + try (KeyValueIterator it = reorderStore.all()) { + while (it.hasNext()) { + final KeyValue kv = it.next(); + K origKey = originalKeyExtractor.key(kv.key, kv.value); + context.forward(new Record<>(origKey, kv.value, timestamp)); + reorderStore.delete(kv.key); + } + } +} +``` + +The `forwardOrderedByEventTime` method does the following: +1. Iterate over the current contents of the store. +2. Perform the reverse operation of the `ReorderKeyGenerator` with a `OriginalKeyExtractor` interface and provide the original key +3. Forward each record to the next downstream operator, and then delete it. + +It's critical whatever operation you use to extract the key for the sorting, you must be able to +reverse the operation, so you can forward records with the original key. This is essential because if you do any downstream +aggregations or writing results out to a topic, the record will remain on the correct partition. + +Now let's take a look at how you'll write the Kafka Steams application: + +```java +StreamBuider builder = new StreamBuilder(); + builder.stream(INPUT, Consumed.with(stringSerde, eventSerde)) + .process(new ReorderingProcessorSupplier<>(reorderStore, + Duration.ofHours(10), + (k, v) -> v.eventTime(), + (k, v) -> v.name(), + Serdes.Long(), + eventSerde)) + .to(OUTPUT, Produced.with(stringSerde, eventSerde)); +``` + +This is a simple Kafka Streams topology, in the `process` operator you pass in a [ProcessorSupplier]() which Kafka Streams will use to extract your `Processor` implementation. The third parameter is a lambda implementation of the `ReorderKeyGenerator` interface and the fourth is same for the `OriginalKeyExtractor` interface. The `ReorderingProcessorSupplier` defines these two interfaces you've see before in the tutorial: + +```java +public class ReorderingProcessorSupplier implements ProcessorSupplier { + // Details left out for clarity + public interface ReorderKeyGenerator { + KOrder key(K key, V val); + } + + public interface OriginalKeyExtractor { + K key(KOrder key, V val); + } +} +``` +## Important Notes + +You've seen in this tutorial how to reorder events in the stream by timestamps on the event object, but you're not limited to timestamps only -- you could use the same approach to order events in the stream by any attribute on the event. There are a couple of points you need to keep in mind when doing so: + +1. It's essential to have a way to restore the incoming key, you don't want to lose the original key-partition mapping. +2. This reordering strategy only applies to a single partition, not across multiple partitions. +3. Since an event stream is infinite, re-ordering can only be applied to distinct windows of time, and you'll balance the trade-off of large windows and iterating over the entire contents of a state store. \ No newline at end of file diff --git a/reordering-streams/kstreams/build.gradle b/reordering-streams/kstreams/build.gradle new file mode 100644 index 0000000..7365e07 --- /dev/null +++ b/reordering-streams/kstreams/build.gradle @@ -0,0 +1,65 @@ +buildscript { + repositories { + mavenCentral() + } +} + +plugins { + id "java" + id "application" + id 'com.github.johnrengelman.shadow' version '8.1.1' +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +application { + mainClass = "io.confluent.developer.ReorderStreams" +} + +repositories { + mavenCentral() + + maven { + url "https://packages.confluent.io/maven" + } +} + +dependencies { + implementation project(':common') + implementation "org.slf4j:slf4j-simple:2.0.7" + implementation 'org.apache.kafka:kafka-streams:3.7.0' + implementation 'org.apache.kafka:kafka-clients:3.7.0' + + testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.7.0' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2' + testImplementation 'org.hamcrest:hamcrest:2.2' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2' +} + +test { + useJUnitPlatform() + testLogging { + outputs.upToDateWhen { false } + showStandardStreams = true + events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR" + exceptionFormat = "full" + } +} + + +jar { + manifest { + attributes( + "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" ") + ) + } +} + +shadowJar { + archiveBaseName = "reorder-streams" + archiveClassifier = '' +} diff --git a/reordering-streams/kstreams/settings.gradle b/reordering-streams/kstreams/settings.gradle new file mode 100644 index 0000000..e3cf887 --- /dev/null +++ b/reordering-streams/kstreams/settings.gradle @@ -0,0 +1,3 @@ +rootProject.name = 'reorder-streams' +include ':common' +project(':common').projectDir = file('../../common') diff --git a/reordering-streams/kstreams/src/main/java/io/confluent/developer/Event.java b/reordering-streams/kstreams/src/main/java/io/confluent/developer/Event.java new file mode 100644 index 0000000..c364da1 --- /dev/null +++ b/reordering-streams/kstreams/src/main/java/io/confluent/developer/Event.java @@ -0,0 +1,4 @@ +package io.confluent.developer; + +public record Event(String name, long eventTime) { +} diff --git a/reordering-streams/kstreams/src/main/java/io/confluent/developer/ReorderStreams.java b/reordering-streams/kstreams/src/main/java/io/confluent/developer/ReorderStreams.java new file mode 100644 index 0000000..77f0b82 --- /dev/null +++ b/reordering-streams/kstreams/src/main/java/io/confluent/developer/ReorderStreams.java @@ -0,0 +1,69 @@ +package io.confluent.developer; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class ReorderStreams { + private static final Logger LOG = LoggerFactory.getLogger(ReorderStreams.class); + public static final String INPUT = "input"; + public static final String OUTPUT = "output"; + + + public Topology buildTopology(Properties allProps) { + final StreamsBuilder builder = new StreamsBuilder(); + + Serde stringSerde = Serdes.String(); + Serde eventSerde = StreamsSerde.serdeFor(Event.class); + String reorderStore = "reorder-store"; + builder.stream(INPUT, Consumed.with(stringSerde, eventSerde)) + .peek((key, value) -> LOG.info("Incoming event key[{}] value[{}]", key, value)) + .process(new ReorderingProcessorSupplier<>(reorderStore, + Duration.ofHours(10), + (k, v) -> v.eventTime(), + (k, v) -> v.name(), + Serdes.Long(), + eventSerde)) + .to(OUTPUT, Produced.with(stringSerde, eventSerde)); + + return builder.build(allProps); + } + + public static void main(String[] args) { + Properties properties; + if (args.length > 0) { + properties = Utils.loadProperties(args[0]); + } else { + properties = Utils.loadProperties(); + } + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-streams"); + ReorderStreams reorderStreams = new ReorderStreams(); + + Topology topology = reorderStreams.buildTopology(properties); + + try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) { + CountDownLatch countDownLatch = new CountDownLatch(1); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + kafkaStreams.close(Duration.ofSeconds(5)); + countDownLatch.countDown(); + })); + // For local running only don't do this in production as it wipes out all local state + kafkaStreams.cleanUp(); + kafkaStreams.start(); + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/reordering-streams/kstreams/src/main/java/io/confluent/developer/ReorderingProcessorSupplier.java b/reordering-streams/kstreams/src/main/java/io/confluent/developer/ReorderingProcessorSupplier.java new file mode 100644 index 0000000..08af2b7 --- /dev/null +++ b/reordering-streams/kstreams/src/main/java/io/confluent/developer/ReorderingProcessorSupplier.java @@ -0,0 +1,122 @@ +package io.confluent.developer; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; + +import java.time.Duration; +import java.util.Collections; +import java.util.Set; + +public class ReorderingProcessorSupplier implements ProcessorSupplier { + private final String storeName; + private final Duration reorderWindow; + private final ReorderKeyGenerator storeKeyGenerator; + private final OriginalKeyExtractor originalKeyExtractor; + private final Serde keySerde; + private final Serde valueSerde; + + + public interface ReorderKeyGenerator { + KOrder key(K key, V val); + } + + public interface OriginalKeyExtractor { + K key(KOrder key, V val); + } + + public ReorderingProcessorSupplier(String storeName, + Duration reorderWindow, + ReorderKeyGenerator storeKeyGenerator, + OriginalKeyExtractor originalKeyExtractor, + Serde keySerde, + Serde valueSerde) { + this.storeName = storeName; + this.reorderWindow = reorderWindow; + this.storeKeyGenerator = storeKeyGenerator; + this.originalKeyExtractor = originalKeyExtractor; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + public Processor get() { + return new ReorderProcessor(storeName, reorderWindow, storeKeyGenerator, originalKeyExtractor); + } + + @Override + public Set> stores() { + return Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),keySerde, valueSerde)); + } + + private class ReorderProcessor implements Processor { + private final String storeName; + private final Duration reorderWindow; + private ProcessorContext context; + private KeyValueStore reorderStore; + private final ReorderKeyGenerator storeKeyGenerator; + private final OriginalKeyExtractor originalKeyExtractor; + + public ReorderProcessor(String storeName, + Duration reorderWindow, + ReorderKeyGenerator reorderKeyGenerator, + OriginalKeyExtractor originalKeyExtractor) { + + this.storeName = storeName; + this.reorderWindow = reorderWindow; + this.storeKeyGenerator = reorderKeyGenerator; + this.originalKeyExtractor = originalKeyExtractor; + } + + @Override + public void init(ProcessorContext context) { + this.reorderStore = context.getStateStore(this.storeName); + this.context = context; + context.schedule( + this.reorderWindow, + PunctuationType.STREAM_TIME, + this::forwardOrderedByEventTime + ); + } + + @Override + public void process(Record kvRecord) { + final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value()); + final V storeValue = reorderStore.get(storeKey); + if (storeValue == null) { + reorderStore.put(storeKey, kvRecord.value()); + } + } + + + /** + * Scheduled to be called automatically when the period + * within which message reordering occurs expires. + *

+ * Outputs downstream accumulated records sorted by their timestamp. + *

+ * 1) read the store + * 2) send the fetched messages in order using context.forward() and deletes + * them from the store + * + * @param timestamp – stream time of the punctuate function call + */ + void forwardOrderedByEventTime(final long timestamp) { + try (KeyValueIterator it = reorderStore.all()) { + while (it.hasNext()) { + final KeyValue kv = it.next(); + K origKey = originalKeyExtractor.key(kv.key, kv.value); + context.forward(new Record<>(origKey, kv.value, timestamp)); + reorderStore.delete(kv.key); + } + } + } + } +} diff --git a/reordering-streams/kstreams/src/test/java/io/confluent/developer/ReorderProcessorTest.java b/reordering-streams/kstreams/src/test/java/io/confluent/developer/ReorderProcessorTest.java new file mode 100644 index 0000000..7d60fbf --- /dev/null +++ b/reordering-streams/kstreams/src/test/java/io/confluent/developer/ReorderProcessorTest.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.confluent.developer; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.Test; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * End-to-end integration test that demonstrates how to reorder the stream of incoming messages + * by the timestamp embedded in the message payload. + *

+ * Makes sense only on per-partition basis. + *

+ * Reordering occurs within time windows defined by the + * + * Note: This example uses lambda expressions and thus works with Java 8+ only. + */ +class ReorderProcessorTest { + + private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private static long parseTimeStringToLong(final String timeString) throws ParseException { + return dateFormat.parse(timeString).getTime(); + } + + @Test + void shouldReorderTheInput() throws ParseException { + + String inputTopic = ReorderStreams.INPUT; + String outputTopic = ReorderStreams.OUTPUT; + ReorderStreams reorderStreams = new ReorderStreams(); + Serde stringSerde = Serdes.String(); + Serde eventSerde = StreamsSerde.serdeFor(Event.class); + Properties props = new Properties(); + try (TopologyTestDriver driver = new TopologyTestDriver(reorderStreams.buildTopology(props))) { + + TestInputTopic testInputTopic = driver.createInputTopic(inputTopic,stringSerde.serializer(), eventSerde.serializer()); + TestOutputTopic testOutputTopic = driver.createOutputTopic(outputTopic, stringSerde.deserializer(), eventSerde.deserializer()); + + // Input not ordered by time + final List> inputValues = Arrays.asList( + KeyValue.pair("A", new Event("A", parseTimeStringToLong("2021-11-03 23:00:00Z"))), // stream time calibration + KeyValue.pair("B", new Event("B", parseTimeStringToLong("2021-11-04 01:05:00Z"))), // 10-hours interval border is at "2021-11-04 01:00:00Z" + KeyValue.pair("C", new Event("C", parseTimeStringToLong("2021-11-04 01:10:00Z"))), + KeyValue.pair("D", new Event("D", parseTimeStringToLong("2021-11-04 01:40:00Z"))), + KeyValue.pair("E", new Event("E", parseTimeStringToLong("2021-11-04 02:25:00Z"))), + KeyValue.pair("F", new Event("F", parseTimeStringToLong("2021-11-04 01:20:00Z"))), + KeyValue.pair("G", new Event("G", parseTimeStringToLong("2021-11-04 02:45:00Z"))), + KeyValue.pair("H", new Event("H", parseTimeStringToLong("2021-11-04 02:00:00Z"))), + KeyValue.pair("I", new Event("I", parseTimeStringToLong("2021-11-04 03:00:00Z"))), + KeyValue.pair("J", new Event("J", parseTimeStringToLong("2021-11-04 02:40:00Z"))), + KeyValue.pair("K", new Event("K", parseTimeStringToLong("2021-11-04 02:20:00Z"))), // 10-hours interval border is at "2021-11-04 11:00:00Z" + KeyValue.pair("L", new Event("L", parseTimeStringToLong("2021-11-05 00:00:00Z"))) // stream time calibration + ); + + // Expected ordered by time + final List> expectedValues = Arrays.asList( + KeyValue.pair("A", new Event("A", parseTimeStringToLong("2021-11-03 23:00:00Z"))), // stream time calibration + KeyValue.pair("B", new Event("B", parseTimeStringToLong("2021-11-04 01:05:00Z"))), + KeyValue.pair("C", new Event("C", parseTimeStringToLong("2021-11-04 01:10:00Z"))), + KeyValue.pair("F", new Event("F", parseTimeStringToLong("2021-11-04 01:20:00Z"))), + KeyValue.pair("D", new Event("D", parseTimeStringToLong("2021-11-04 01:40:00Z"))), + KeyValue.pair("H", new Event("H", parseTimeStringToLong("2021-11-04 02:00:00Z"))), + KeyValue.pair("K", new Event("K", parseTimeStringToLong("2021-11-04 02:20:00Z"))), + KeyValue.pair("E", new Event("E", parseTimeStringToLong("2021-11-04 02:25:00Z"))), + KeyValue.pair("J", new Event("J", parseTimeStringToLong("2021-11-04 02:40:00Z"))), + KeyValue.pair("G", new Event("G", parseTimeStringToLong("2021-11-04 02:45:00Z"))), + KeyValue.pair("I", new Event("I", parseTimeStringToLong("2021-11-04 03:00:00Z"))), + KeyValue.pair("L", new Event("L", parseTimeStringToLong("2021-11-05 00:00:00Z"))) + ); + Instant now = Instant.now(); + var recordCount = 1; + var secondsToAdvance = 60L; + + for (KeyValue input : inputValues) { + testInputTopic.pipeInput(input.key, input.value, now.plusSeconds(recordCount++ * secondsToAdvance)); + if (recordCount == 11) { + // Need to advance stream-time beyond 10 hours to trigger punctuation + secondsToAdvance = 60 * 60 * 10; + } + } + List> actualValues = testOutputTopic.readKeyValuesToList(); + assertEquals(expectedValues.size(), actualValues.size()); + assertEquals(expectedValues, actualValues); + } + + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 6ed2b87..78d2cee 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,6 +40,7 @@ include 'multiple-event-types-protobuf:kafka' include 'naming-changelog-repartition-topics:kstreams' include 'over-aggregations:flinksql' include 'pattern-matching:flinksql' +include 'reordering-streams:kstreams' include 'schedule-ktable-ttl:kstreams' include 'schedule-ktable-ttl-aggregate:kstreams' include 'serialization:kstreams'