Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add example of reordering the events of a Kafka Streams application #53

Merged
merged 6 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
implementation 'org.slf4j:slf4j-simple:2.0.7'
implementation('org.apache.kafka:kafka-clients') {
version {
strictly '3.6.0'
strictly '3.7.0'
}
}
implementation 'io.confluent:kafka-streams-avro-serde:7.5.1'
Expand Down
124 changes: 124 additions & 0 deletions reordering-streams/kstreams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<!-- title: How to reorder out-of-order events in Kafka Streams -->
<!-- description: In this tutorial, learn reorder out-of-order events Kafka Streams, with step-by-step instructions and supporting code. -->

# How to reorder out-of-order events in Kafka Streams
bbejeck marked this conversation as resolved.
Show resolved Hide resolved

Consider the case when the order of the events in a topic is out-of-order.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
To be clear, the producer delivered the events in-order, but they are out-of-order from the perspective of the timestamps embedded in the event payload.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved

In this tutorial, we'll cover how you can re-order these records in the event stream using the embedded event timestamps.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
The reordering will only occur per-partition and within a specific time window provided at startup.

NOTE: This tutorial was adapted from an [original contribution](https://github.com/confluentinc/kafka-streams-examples/pull/411) by [Sergey Shcherbakov](https://github.com/sshcherbakov)

## Setup

To accomplish the re-ordering, we'll leverage the fact that RocksDB stores all entries sorted by key.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
So you'll use [KStream.process](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.api.ProcessorSupplier-java.lang.String...-) method that will store incoming records into a state store using an embedded timestamp for the key, then schedule a [punctuation](https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#defining-a-stream-processor) to occur at a given interval that will iterate over the contents of the store and forward them to downstream operators, but now in-order with respect to the embedded timestamps.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved

## Reordering by event timestamp

While the code is fairly straightforward, let's take a step-by-step walk through of the key parts of the application.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
First we'll look at the `Processor.init` method details:

```java
@Override
public void init(ProcessorContext<K, V> context) {
this.reorderStore = context.getStateStore(this.storeName);
this.context = context;
context.schedule(
this.reorderWindow,
PunctuationType.STREAM_TIME,
this::forwardOrderedByEventTime
);
}
```

Kafka Streams calls the`Procerssor.init` method when creating the topology and the method performs setup actions defined by the developer.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
In this case, the initialization steps are:
1. Store a reference to the state store used to re-order the records.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
2. Store a [ProcessorContext](https://javadoc.io/static/org.apache.kafka/kafka-streams/3.7.0/org/apache/kafka/streams/processor/api/ProcessorContext.html) reference which you'll use to forward records to downstream operators.
3. Using the `ProcessorContext` to schedule a punctuation - the main part of this tutorial.

```java
@Override
public void process(Record<K, V> kvRecord) {
final KOrder storeKey = storeKeyGenerator.key(kvRecord.key(), kvRecord.value());
final V storeValue = reorderStore.get(storeKey);

if (storeValue == null) {
reorderStore.put(storeKey, kvRecord.value());
}
}
```
Here is the `process` method which is where the `Processor` takes action for each incoming record.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
There's a `ReorderKeyGenerator` interface that takes the incoming key and value and returns the new key to order the records. In our case, it simply returns the
timestamp embedded in the event. We'll discuss the `ReorderKeyGenerator` interface later in the tutorial.

So we've seen how you'll update the key needed for sorting, now let's take a look at how Kafka Streams propagates this new order to any downstream
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
operators:
```java
void forwardOrderedByEventTime(final long timestamp) {
try (KeyValueIterator<KOrder, V> it = reorderStore.all()) {
while (it.hasNext()) {
final KeyValue<KOrder, V> kv = it.next();
K origKey = originalKeyExtractor.key(kv.key, kv.value);
context.forward(new Record<>(origKey, kv.value, timestamp));
reorderStore.delete(kv.key);
}
}
}
```

The `forwardOrderedByEventTime` method does the following:
1. Iterate over the current contents of the store.
2. Perform the reverse operation of the `ReorderKeyGenerator` with a `OriginalKeyExtractor` interface and provide the original key
3. Forward each record to the next downstream operator then delete it.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved

It's critical whatever operation you use to extract the key for the sorting, you must be able to
reverse the operation, so you can forward records with the original key. This is essential because if you do any downstream
aggregations or writing results out to a topic, the record will remain on the correct partition.

Now let's take a look at how you'll write the Kafka Steams application:

```java
StreamBuider builder = new StreamBuilder();
builder.stream(INPUT, Consumed.with(stringSerde, eventSerde))
.process(new ReorderingProcessorSupplier<>(reorderStore,
Duration.ofHours(10),
(k, v) -> v.eventTime(),
(k, v) -> v.name(),
Serdes.Long(),
eventSerde))
.to(OUTPUT, Produced.with(stringSerde, eventSerde));
```

This is a simple Kafka Streams topology, in the `process` operator you pass in a [ProcessorSupplier]() which Kafka Streams will use to extract your `Processor` implementation. The third parameter is a lambda implementation of the `ReorderKeyGenerator` interface and the fourth is same for the `OriginalKeyExtractor` interface. The `ReorderingProcessorSupplier` defines these two interfaces you've see before in the tutorial:

```java
public class ReorderingProcessorSupplier<KOrder, K, V> implements ProcessorSupplier<K, V, K, V> {
// Details left out for clarity
public interface ReorderKeyGenerator<K, V, KOrder> {
KOrder key(K key, V val);
}

public interface OriginalKeyExtractor<KOrder, V, K> {
K key(KOrder key, V val);
}
}
```
## Important Notes

You've seen in this tutorial how to re-order events in the stream by timestamps on the event object. But you're not limited to timestamps only, you could use the same approach to order events in the stream by any attribute on the event. There are a couple of points you need to keep in mind when doing so:
bbejeck marked this conversation as resolved.
Show resolved Hide resolved

1. It's essential to have a way to restore the incoming key, you don't want to lose the original key-partition mapping.
2. This re-ordering strategy only applies to a single partition, not across multiple partitions.
bbejeck marked this conversation as resolved.
Show resolved Hide resolved
3. Since an event stream is infinite, re-ordering can only be applied to distinct windows of time, and you'll balance the trade-off of large windows and iterating over the entire contents of a state store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
3. Since an event stream is infinite, re-ordering can only be applied to distinct windows of time, and you'll balance the trade-off of large windows and iterating over the entire contents of a state store.
3. Since an event stream is infinite, reordering can only be applied to distinct windows of time, and you'll balance the trade-off of large windows and iterating over the entire contents of a state store.









Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete whitespace

65 changes: 65 additions & 0 deletions reordering-streams/kstreams/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id "java"
id "application"
id 'com.github.johnrengelman.shadow' version '8.1.1'
}

java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}

application {
mainClass = "io.confluent.developer.ReorderStreams"
}

repositories {
mavenCentral()

maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
implementation project(':common')
implementation "org.slf4j:slf4j-simple:2.0.7"
implementation 'org.apache.kafka:kafka-streams:3.7.0'
implementation 'org.apache.kafka:kafka-clients:3.7.0'

testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.7.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2'
testImplementation 'org.hamcrest:hamcrest:2.2'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2'
}

test {
useJUnitPlatform()
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR"
exceptionFormat = "full"
}
}


jar {
manifest {
attributes(
"Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" ")
)
}
}

shadowJar {
archiveBaseName = "reorder-streams"
archiveClassifier = ''
}
3 changes: 3 additions & 0 deletions reordering-streams/kstreams/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rootProject.name = 'reorder-streams'
include ':common'
project(':common').projectDir = file('../../common')
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.confluent.developer;

/**
* Bill Bejeck
* 6/7/24
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove? did IDE add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I don't want to be forgotten! Ha!
Yep it's IDE generated, I'll have to disable for this repo


public record Event(String name, long eventTime) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ReorderStreams {
private static final Logger LOG = LoggerFactory.getLogger(ReorderStreams.class);
public static final String INPUT = "input";
public static final String OUTPUT = "output";


public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();

Serde<String> stringSerde = Serdes.String();
Serde<Event> eventSerde = StreamsSerde.serdeFor(Event.class);
String reorderStore = "reorder-store";
builder.stream(INPUT, Consumed.with(stringSerde, eventSerde))
.peek((key, value) -> LOG.info("Incoming event key[{}] value[{}]", key, value))
.process(new ReorderingProcessorSupplier<>(reorderStore,
Duration.ofHours(10),
(k, v) -> v.eventTime(),
(k, v) -> v.name(),
Serdes.Long(),
eventSerde))
.to(OUTPUT, Produced.with(stringSerde, eventSerde));

return builder.build(allProps);
}

public static void main(String[] args) {
Properties properties;
if (args.length > 0) {
properties = Utils.loadProperties(args[0]);
} else {
properties = Utils.loadProperties();
}
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "merge-streams");
ReorderStreams reorderStreams = new ReorderStreams();

Topology topology = reorderStreams.buildTopology(properties);

try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
CountDownLatch countDownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
kafkaStreams.close(Duration.ofSeconds(5));
countDownLatch.countDown();
}));
// For local running only don't do this in production as it wipes out all local state
kafkaStreams.cleanUp();
kafkaStreams.start();
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Loading