Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add example of reordering the events of a Kafka Streams application #53

Merged
merged 6 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
implementation 'org.slf4j:slf4j-simple:2.0.7'
implementation('org.apache.kafka:kafka-clients') {
version {
strictly '3.6.0'
strictly '3.7.0'
}
}
implementation 'io.confluent:kafka-streams-avro-serde:7.5.1'
Expand Down
116 changes: 116 additions & 0 deletions reordering-streams/kstreams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<!-- title: How to reorder out-of-order events in Kafka Streams -->
<!-- description: In this tutorial, learn reorder out-of-order events Kafka Streams, with step-by-step instructions and supporting code. -->

# How to reorder events in Kafka Streams

Consider the case where the events in a Kafka topic are out of order.
Specifically, the producer delivered the events in order, but they are out of order from the perspective of the timestamps embedded in the event payload.

In this tutorial, we'll cover how you can reorder these records in the event stream using the embedded event timestamps.
The reordering will only occur per-partition and within a specific time window provided at startup.

NOTE: This tutorial was adapted from an [original contribution](https://github.com/confluentinc/kafka-streams-examples/pull/411) by [Sergey Shcherbakov](https://github.com/sshcherbakov)

## Setup

To accomplish the reordering, we'll leverage the fact that RocksDB stores all entries sorted by key.
So we'll use the [KStream.process](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.api.ProcessorSupplier-java.lang.String...-) method that will store incoming records into a state store using an embedded timestamp for the key. Then, we'll schedule a [punctuation](https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#defining-a-stream-processor) to occur at a given interval that will iterate over the contents of the store and forward them to downstream operators, but now in order with respect to the embedded timestamps.

## Reordering by event timestamp

While the code is fairly straightforward, let's take a step-by-step walk-through of the key parts of the application.
First we'll look at the `Processor.init` method details:

```java
@Override
public void init(ProcessorContext<K, V> context) {
this.reorderStore = context.getStateStore(this.storeName);
this.context = context;
context.schedule(
this.reorderWindow,
PunctuationType.STREAM_TIME,
this::forwardOrderedByEventTime
);
}
```

Kafka Streams calls the`Processor.init` method when creating the topology and the method performs setup actions defined by the developer.
In this case, the initialization steps are:
1. Store a reference to the state store used to reorder the records.
2. Store a [ProcessorContext](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/processor/api/ProcessorContext.html) reference which you'll use to forward records to downstream operators.
3. Using the `ProcessorContext` to schedule a punctuation - the main part of this tutorial.

```java
@Override
public void process(Record<K, V> kvRecord) {
final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value());
final V storeValue = reorderStore.get(storeKey);

if (storeValue == null) {
reorderStore.put(storeKey, kvRecord.value());
}
}
```
Here is the `process` method, which is where the `Processor` takes action for each incoming record.
There's a `ReorderKeyGenerator` interface that takes the incoming key and value and returns the new key to order the records. In our case, it simply returns the
timestamp embedded in the event. We'll discuss the `ReorderKeyGenerator` interface later in the tutorial.

Having seen how to update the key needed for sorting, now let's take a look at how Kafka Streams propagates this new order to any downstream
operators:
```java
void forwardOrderedByEventTime(final long timestamp) {
try (KeyValueIterator<KOrder, V> it = reorderStore.all()) {
while (it.hasNext()) {
final KeyValue<KOrder, V> kv = it.next();
K origKey = originalKeyExtractor.key(kv.key, kv.value);
context.forward(new Record<>(origKey, kv.value, timestamp));
reorderStore.delete(kv.key);
}
}
}
```

The `forwardOrderedByEventTime` method does the following:
1. Iterate over the current contents of the store.
2. Perform the reverse operation of the `ReorderKeyGenerator` with a `OriginalKeyExtractor` interface and provide the original key
3. Forward each record to the next downstream operator, and then delete it.

It's critical whatever operation you use to extract the key for the sorting, you must be able to
reverse the operation, so you can forward records with the original key. This is essential because if you do any downstream
aggregations or writing results out to a topic, the record will remain on the correct partition.

Now let's take a look at how you'll write the Kafka Steams application:

```java
StreamBuider builder = new StreamBuilder();
builder.stream(INPUT, Consumed.with(stringSerde, eventSerde))
.process(new ReorderingProcessorSupplier<>(reorderStore,
Duration.ofHours(10),
(k, v) -> v.eventTime(),
(k, v) -> v.name(),
Serdes.Long(),
eventSerde))
.to(OUTPUT, Produced.with(stringSerde, eventSerde));
```

This is a simple Kafka Streams topology, in the `process` operator you pass in a [ProcessorSupplier]() which Kafka Streams will use to extract your `Processor` implementation. The third parameter is a lambda implementation of the `ReorderKeyGenerator` interface and the fourth is same for the `OriginalKeyExtractor` interface. The `ReorderingProcessorSupplier` defines these two interfaces you've see before in the tutorial:

```java
public class ReorderingProcessorSupplier<KOrder, K, V> implements ProcessorSupplier<K, V, K, V> {
// Details left out for clarity
public interface ReorderKeyGenerator<K, V, KOrder> {
KOrder key(K key, V val);
}

public interface OriginalKeyExtractor<KOrder, V, K> {
K key(KOrder key, V val);
}
}
```
## Important Notes

You've seen in this tutorial how to reorder events in the stream by timestamps on the event object, but you're not limited to timestamps only -- you could use the same approach to order events in the stream by any attribute on the event. There are a couple of points you need to keep in mind when doing so:

1. It's essential to have a way to restore the incoming key, you don't want to lose the original key-partition mapping.
2. This reordering strategy only applies to a single partition, not across multiple partitions.
3. Since an event stream is infinite, re-ordering can only be applied to distinct windows of time, and you'll balance the trade-off of large windows and iterating over the entire contents of a state store.
65 changes: 65 additions & 0 deletions reordering-streams/kstreams/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id "java"
id "application"
id 'com.github.johnrengelman.shadow' version '8.1.1'
}

java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

application {
mainClass = "io.confluent.developer.ReorderStreams"
}

repositories {
mavenCentral()

maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
implementation project(':common')
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation 'org.apache.kafka:kafka-streams:3.7.0'
implementation 'org.apache.kafka:kafka-clients:3.7.0'

testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.7.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
testImplementation 'org.hamcrest:hamcrest:2.2'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
}

test {
useJUnitPlatform()
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR"
exceptionFormat = "full"
}
}


jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" ")
)
}
}

shadowJar {
archiveBaseName = "reorder-streams"
archiveClassifier = ''
}
3 changes: 3 additions & 0 deletions reordering-streams/kstreams/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rootProject.name = 'reorder-streams'
include ':common'
project(':common').projectDir = file('../../common')
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.confluent.developer;

public record Event(String name, long eventTime) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ReorderStreams {
private static final Logger LOG = LoggerFactory.getLogger(ReorderStreams.class);
public static final String INPUT = "input";
public static final String OUTPUT = "output";


public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();

Serde<String> stringSerde = Serdes.String();
Serde<Event> eventSerde = StreamsSerde.serdeFor(Event.class);
String reorderStore = "reorder-store";
builder.stream(INPUT, Consumed.with(stringSerde, eventSerde))
.peek((key, value) -> LOG.info("Incoming event key[{}] value[{}]", key, value))
.process(new ReorderingProcessorSupplier<>(reorderStore,
Duration.ofHours(10),
(k, v) -> v.eventTime(),
(k, v) -> v.name(),
Serdes.Long(),
eventSerde))
.to(OUTPUT, Produced.with(stringSerde, eventSerde));

return builder.build(allProps);
}

public static void main(String[] args) {
Properties properties;
if (args.length > 0) {
properties = Utils.loadProperties(args[0]);
} else {
properties = Utils.loadProperties();
}
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-streams");
ReorderStreams reorderStreams = new ReorderStreams();

Topology topology = reorderStreams.buildTopology(properties);

try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
CountDownLatch countDownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
kafkaStreams.close(Duration.ofSeconds(5));
countDownLatch.countDown();
}));
// For local running only don't do this in production as it wipes out all local state
kafkaStreams.cleanUp();
kafkaStreams.start();
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;

public class ReorderingProcessorSupplier<KOrder, K, V> implements ProcessorSupplier<K, V, K, V> {
private final String storeName;
private final Duration reorderWindow;
private final ReorderKeyGenerator<K, V, KOrder> storeKeyGenerator;
private final OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor;
private final Serde<KOrder> keySerde;
private final Serde<V> valueSerde;


public interface ReorderKeyGenerator<K, V, KOrder> {
KOrder key(K key, V val);
}

public interface OriginalKeyExtractor<KOrder, V, K> {
K key(KOrder key, V val);
}

public ReorderingProcessorSupplier(String storeName,
Duration reorderWindow,
ReorderKeyGenerator<K, V, KOrder> storeKeyGenerator,
OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor,
Serde<KOrder> keySerde,
Serde<V> valueSerde) {
this.storeName = storeName;
this.reorderWindow = reorderWindow;
this.storeKeyGenerator = storeKeyGenerator;
this.originalKeyExtractor = originalKeyExtractor;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}

public Processor<K, V, K, V> get() {
return new ReorderProcessor(storeName, reorderWindow, storeKeyGenerator, originalKeyExtractor);
}

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),keySerde, valueSerde));
}

private class ReorderProcessor implements Processor<K, V, K, V> {
private final String storeName;
private final Duration reorderWindow;
private ProcessorContext<K, V> context;
private KeyValueStore<KOrder, V> reorderStore;
private final ReorderKeyGenerator<K, V, KOrder> storeKeyGenerator;
private final OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor;

public ReorderProcessor(String storeName,
Duration reorderWindow,
ReorderKeyGenerator<K, V, KOrder> reorderKeyGenerator,
OriginalKeyExtractor<KOrder, V, K> originalKeyExtractor) {

this.storeName = storeName;
this.reorderWindow = reorderWindow;
this.storeKeyGenerator = reorderKeyGenerator;
this.originalKeyExtractor = originalKeyExtractor;
}

@Override
public void init(ProcessorContext<K, V> context) {
this.reorderStore = context.getStateStore(this.storeName);
this.context = context;
context.schedule(
this.reorderWindow,
PunctuationType.STREAM_TIME,
this::forwardOrderedByEventTime
);
}

@Override
public void process(Record<K, V> kvRecord) {
final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value());
final V storeValue = reorderStore.get(storeKey);
if (storeValue == null) {
reorderStore.put(storeKey, kvRecord.value());
}
}


/**
* Scheduled to be called automatically when the period
* within which message reordering occurs expires.
* <p>
* Outputs downstream accumulated records sorted by their timestamp.
* <p>
* 1) read the store
* 2) send the fetched messages in order using context.forward() and deletes
* them from the store
*
* @param timestamp – stream time of the punctuate function call
*/
void forwardOrderedByEventTime(final long timestamp) {
try (KeyValueIterator<KOrder, V> it = reorderStore.all()) {
while (it.hasNext()) {
final KeyValue<KOrder, V> kv = it.next();
K origKey = originalKeyExtractor.key(kv.key, kv.value);
context.forward(new Record<>(origKey, kv.value, timestamp));
reorderStore.delete(kv.key);
}
}
}
}
}
Loading