Skip to content

Commit

Permalink
feat: Add example of reordering the events of a Kafka Streams applica…
Browse files Browse the repository at this point in the history
…tion (#53)

* Add stream reordering functionality for Kafka Streams

Added functionality to reorder Kafka Streams by the timestamp embedded in the message payload. Created new Java classes ReorderStreams, ReorderProcessorSupplier and Event, and respective unit tests to support the functionality. Updated the project structure and build.gradle as needed.

* Refactor test method and update dependencies in ReorderStreams module

Imported java.time.Instant and refactored the test method in the ReorderProcessorTest class for better readability. Also updated the version of Kafka streams and clients dependencies from 3.6.0 to 3.7.0 in the build.gradle files of both the ReorderStreams and common modules.

* Update README and ReorderingProcessorSupplier for reordering out-of-order events

The README for the reordering-streams/kstreams module has been significantly improved to provide more detailed instructions on how to reorder out-of-order events in Kafka Streams. Additionally, the ReorderingProcessorSupplier.java file has been slightly edited to improve its readability.

* Update reordering streams code and documentation

The method name `punctuate` was updated to `forwardOrderedByEventTime` to more accurately reflect its function. The variable `grace` was also renamed to `reorderWindow` for more clarity. Both code and README file were updated with a detailed explanation of the reordering strategy for events in the stream; including aspects such as restoring the original key, limitations to a single partition, and considerations for choosing the window size for reordering.

* Remove unnecessary comments and redundant lines

The unnecessary comments from several files, specifically Event.java and ReorderingProcessorSupplier.java, are removed. Also, redundant lines in the markdown file README.md are deleted for a cleaner and more concise document.

* Apply suggestions from code review

Co-authored-by: Dave Troiano <[email protected]>

---------

Co-authored-by: Dave Troiano <[email protected]>
  • Loading branch information
bbejeck and davetroiano authored Jul 1, 2024
1 parent d7c1a7e commit 6984828
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 1 deletion.
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
implementation 'org.slf4j:slf4j-simple:2.0.7'
implementation('org.apache.kafka:kafka-clients') {
version {
strictly '3.6.0'
strictly '3.7.0'
}
}
implementation 'io.confluent:kafka-streams-avro-serde:7.5.1'
Expand Down
116 changes: 116 additions & 0 deletions reordering-streams/kstreams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<!-- title: How to reorder out-of-order events in Kafka Streams -->
<!-- description: In this tutorial, learn reorder out-of-order events Kafka Streams, with step-by-step instructions and supporting code. -->

# How to reorder events in Kafka Streams

Consider the case where the events in a Kafka topic are out of order.
Specifically, the producer delivered the events in order, but they are out of order from the perspective of the timestamps embedded in the event payload.

In this tutorial, we'll cover how you can reorder these records in the event stream using the embedded event timestamps.
The reordering will only occur per-partition and within a specific time window provided at startup.

NOTE: This tutorial was adapted from an [original contribution](https://github.com/confluentinc/kafka-streams-examples/pull/411) by [Sergey Shcherbakov](https://github.com/sshcherbakov)

## Setup

To accomplish the reordering, we'll leverage the fact that RocksDB stores all entries sorted by key.
So we'll use the [KStream.process](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.api.ProcessorSupplier-java.lang.String...-) method that will store incoming records into a state store using an embedded timestamp for the key. Then, we'll schedule a [punctuation](https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#defining-a-stream-processor) to occur at a given interval that will iterate over the contents of the store and forward them to downstream operators, but now in order with respect to the embedded timestamps.

## Reordering by event timestamp

While the code is fairly straightforward, let's take a step-by-step walk-through of the key parts of the application.
First we'll look at the `Processor.init` method details:

```java
@Override
public void init(ProcessorContext<K, V> context) {
this.reorderStore = context.getStateStore(this.storeName);
this.context = context;
context.schedule(
this.reorderWindow,
PunctuationType.STREAM_TIME,
this::forwardOrderedByEventTime
);
}
```

Kafka Streams calls the`Processor.init` method when creating the topology and the method performs setup actions defined by the developer.
In this case, the initialization steps are:
1. Store a reference to the state store used to reorder the records.
2. Store a [ProcessorContext](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/processor/api/ProcessorContext.html) reference which you'll use to forward records to downstream operators.
3. Using the `ProcessorContext` to schedule a punctuation - the main part of this tutorial.

```java
@Override
public void process(Record<K, V> kvRecord) {
final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value());
final V storeValue = reorderStore.get(storeKey);

if (storeValue == null) {
reorderStore.put(storeKey, kvRecord.value());
}
}
```
Here is the `process` method, which is where the `Processor` takes action for each incoming record.
There's a `ReorderKeyGenerator` interface that takes the incoming key and value and returns the new key to order the records. In our case, it simply returns the
timestamp embedded in the event. We'll discuss the `ReorderKeyGenerator` interface later in the tutorial.

Having seen how to update the key needed for sorting, now let's take a look at how Kafka Streams propagates this new order to any downstream
operators:
```java
void forwardOrderedByEventTime(final long timestamp) {
try (KeyValueIterator<KOrder, V> it = reorderStore.all()) {
while (it.hasNext()) {
final KeyValue<KOrder, V> kv = it.next();
K origKey = originalKeyExtractor.key(kv.key, kv.value);
context.forward(new Record<>(origKey, kv.value, timestamp));
reorderStore.delete(kv.key);
}
}
}
```

The `forwardOrderedByEventTime` method does the following:
1. Iterate over the current contents of the store.
2. Perform the reverse operation of the `ReorderKeyGenerator` with a `OriginalKeyExtractor` interface and provide the original key
3. Forward each record to the next downstream operator, and then delete it.

It's critical whatever operation you use to extract the key for the sorting, you must be able to
reverse the operation, so you can forward records with the original key. This is essential because if you do any downstream
aggregations or writing results out to a topic, the record will remain on the correct partition.

Now let's take a look at how you'll write the Kafka Steams application:

```java
StreamBuider builder = new StreamBuilder();
builder.stream(INPUT, Consumed.with(stringSerde, eventSerde))
.process(new ReorderingProcessorSupplier<>(reorderStore,
Duration.ofHours(10),
(k, v) -> v.eventTime(),
(k, v) -> v.name(),
Serdes.Long(),
eventSerde))
.to(OUTPUT, Produced.with(stringSerde, eventSerde));
```

This is a simple Kafka Streams topology, in the `process` operator you pass in a [ProcessorSupplier]() which Kafka Streams will use to extract your `Processor` implementation. The third parameter is a lambda implementation of the `ReorderKeyGenerator` interface and the fourth is same for the `OriginalKeyExtractor` interface. The `ReorderingProcessorSupplier` defines these two interfaces you've see before in the tutorial:

```java
public class ReorderingProcessorSupplier<KOrder, K, V> implements ProcessorSupplier<K, V, K, V> {
// Details left out for clarity
public interface ReorderKeyGenerator<K, V, KOrder> {
KOrder key(K key, V val);
}

public interface OriginalKeyExtractor<KOrder, V, K> {
K key(KOrder key, V val);
}
}
```
## Important Notes

You've seen in this tutorial how to reorder events in the stream by timestamps on the event object, but you're not limited to timestamps only -- you could use the same approach to order events in the stream by any attribute on the event. There are a couple of points you need to keep in mind when doing so:

1. It's essential to have a way to restore the incoming key, you don't want to lose the original key-partition mapping.
2. This reordering strategy only applies to a single partition, not across multiple partitions.
3. Since an event stream is infinite, re-ordering can only be applied to distinct windows of time, and you'll balance the trade-off of large windows and iterating over the entire contents of a state store.
65 changes: 65 additions & 0 deletions reordering-streams/kstreams/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id "java"
id "application"
id 'com.github.johnrengelman.shadow' version '8.1.1'
}

java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

application {
mainClass = "io.confluent.developer.ReorderStreams"
}

repositories {
mavenCentral()

maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
implementation project(':common')
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation 'org.apache.kafka:kafka-streams:3.7.0'
implementation 'org.apache.kafka:kafka-clients:3.7.0'

testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.7.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
testImplementation 'org.hamcrest:hamcrest:2.2'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
}

test {
useJUnitPlatform()
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR"
exceptionFormat = "full"
}
}


jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" ")
)
}
}

shadowJar {
archiveBaseName = "reorder-streams"
archiveClassifier = ''
}
3 changes: 3 additions & 0 deletions reordering-streams/kstreams/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rootProject.name = 'reorder-streams'
include ':common'
project(':common').projectDir = file('../../common')
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.confluent.developer;

public record Event(String name, long eventTime) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ReorderStreams {
private static final Logger LOG = LoggerFactory.getLogger(ReorderStreams.class);
public static final String INPUT = "input";
public static final String OUTPUT = "output";


public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();

Serde<String> stringSerde = Serdes.String();
Serde<Event> eventSerde = StreamsSerde.serdeFor(Event.class);
String reorderStore = "reorder-store";
builder.stream(INPUT, Consumed.with(stringSerde, eventSerde))
.peek((key, value) -> LOG.info("Incoming event key[{}] value[{}]", key, value))
.process(new ReorderingProcessorSupplier<>(reorderStore,
Duration.ofHours(10),
(k, v) -> v.eventTime(),
(k, v) -> v.name(),
Serdes.Long(),
eventSerde))
.to(OUTPUT, Produced.with(stringSerde, eventSerde));

return builder.build(allProps);
}

public static void main(String[] args) {
Properties properties;
if (args.length > 0) {
properties = Utils.loadProperties(args[0]);
} else {
properties = Utils.loadProperties();
}
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-streams");
ReorderStreams reorderStreams = new ReorderStreams();

Topology topology = reorderStreams.buildTopology(properties);

try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
CountDownLatch countDownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
kafkaStreams.close(Duration.ofSeconds(5));
countDownLatch.countDown();
}));
// For local running only don't do this in production as it wipes out all local state
kafkaStreams.cleanUp();
kafkaStreams.start();
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;

public class ReorderingProcessorSupplier<KOrder, K, V> implements ProcessorSupplier<K, V, K, V> {
private final String storeName;
private final Duration reorderWindow;
private final ReorderKeyGenerator<K, V, KOrder> storeKeyGenerator;
private final OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor;
private final Serde<KOrder> keySerde;
private final Serde<V> valueSerde;


public interface ReorderKeyGenerator<K, V, KOrder> {
KOrder key(K key, V val);
}

public interface OriginalKeyExtractor<KOrder, V, K> {
K key(KOrder key, V val);
}

public ReorderingProcessorSupplier(String storeName,
Duration reorderWindow,
ReorderKeyGenerator<K, V, KOrder> storeKeyGenerator,
OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor,
Serde<KOrder> keySerde,
Serde<V> valueSerde) {
this.storeName = storeName;
this.reorderWindow = reorderWindow;
this.storeKeyGenerator = storeKeyGenerator;
this.originalKeyExtractor = originalKeyExtractor;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}

public Processor<K, V, K, V> get() {
return new ReorderProcessor(storeName, reorderWindow, storeKeyGenerator, originalKeyExtractor);
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),keySerde, valueSerde));
}

private class ReorderProcessor implements Processor<K, V, K, V> {
private final String storeName;
private final Duration reorderWindow;
private ProcessorContext<K, V> context;
private KeyValueStore<KOrder, V> reorderStore;
private final ReorderKeyGenerator<K, V, KOrder> storeKeyGenerator;
private final OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor;

public ReorderProcessor(String storeName,
Duration reorderWindow,
ReorderKeyGenerator<K, V, KOrder> reorderKeyGenerator,
OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor) {

this.storeName = storeName;
this.reorderWindow = reorderWindow;
this.storeKeyGenerator = reorderKeyGenerator;
this.originalKeyExtractor = originalKeyExtractor;
}

@Override
public void init(ProcessorContext<K, V> context) {
this.reorderStore = context.getStateStore(this.storeName);
this.context = context;
context.schedule(
this.reorderWindow,
PunctuationType.STREAM_TIME,
this::forwardOrderedByEventTime
);
}

@Override
public void process(Record<K, V> kvRecord) {
final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value());
final V storeValue = reorderStore.get(storeKey);
if (storeValue == null) {
reorderStore.put(storeKey, kvRecord.value());
}
}


/**
* Scheduled to be called automatically when the period
* within which message reordering occurs expires.
* <p>
* Outputs downstream accumulated records sorted by their timestamp.
* <p>
* 1) read the store
* 2) send the fetched messages in order using context.forward() and deletes
* them from the store
*
* @param timestamp – stream time of the punctuate function call
*/
void forwardOrderedByEventTime(final long timestamp) {
try (KeyValueIterator<KOrder, V> it = reorderStore.all()) {
while (it.hasNext()) {
final KeyValue<KOrder, V> kv = it.next();
K origKey = originalKeyExtractor.key(kv.key, kv.value);
context.forward(new Record<>(origKey, kv.value, timestamp));
reorderStore.delete(kv.key);
}
}
}
}
}
Loading

0 comments on commit 6984828

Please sign in to comment.