Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-level TTL PR 9: TTD test with ttl deduplicator #381

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ tasks.publish {
dependsOn(tasks[writeVersionPropertiesFile])
}

configurations {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll admit I had to ask chatGPT how to access the stuff in the testutils of kafka-client from the tests in responsive-test-utils, and this was how it said to do it. Not sure it's necessary the right way to do so, if you have a different suggestion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 when it comes to build systems, if it works it works

create("testArtifacts")
}

tasks.register<Jar>("testJar") {
from(sourceSets["test"].output)
archiveClassifier.set("test")
}

artifacts {
add("testArtifacts", tasks["testJar"])
}

/********************************************/

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutputWithTimestamps;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning;
import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
Expand All @@ -39,33 +40,23 @@
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import dev.responsive.kafka.api.stores.TtlProvider;
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.testutils.KeyValueTimestamp;
import dev.responsive.kafka.testutils.ResponsiveConfigParam;
import dev.responsive.kafka.testutils.ResponsiveExtension;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -202,35 +193,13 @@ public void shouldApplyRowLevelTtlForKeyAndValue() throws Exception {
}

private ResponsiveKafkaStreams buildStreams(final Map<String, Object> properties) {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> input = builder.stream(inputTopic());
input.process(new TtlProcessorSupplier(), STORE_NAME)
.to(outputTopic());

return new ResponsiveKafkaStreams(builder.build(), properties);
}

@SuppressWarnings("checkstyle:linelength")
private static class TtlProcessorSupplier implements ProcessorSupplier<String, String, String, String> {

@Override
public Processor<String, String, String, String> get() {
return new TtlDeduplicator();
}
final var params = ResponsiveKeyValueParams.fact(STORE_NAME).withTtlProvider(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no actual changes to this test at all, just moving some things to the testutils and getting rid of the duplicate code here

TtlProvider.<String, ValueAndTimestamp<String>>withDefault(DEFAULT_TTL)
.fromKeyAndValue(RowLevelTtlIntegrationTest::ttlForKeyAndValue)
);
final Topology topology = deduplicatorApp(inputTopic(), outputTopic(), params);

@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(ResponsiveStores.timestampedKeyValueStoreBuilder(
ResponsiveStores.keyValueStore(
ResponsiveKeyValueParams.fact(STORE_NAME).withTtlProvider(
TtlProvider.<String, ValueAndTimestamp<String>>withDefault(DEFAULT_TTL)
.fromKeyAndValue(RowLevelTtlIntegrationTest::ttlForKeyAndValue)
)),
Serdes.String(),
Serdes.String()
));
}
return new ResponsiveKafkaStreams(topology, properties);
}

private static Optional<TtlDuration> ttlForKeyAndValue(
Expand Down Expand Up @@ -266,31 +235,6 @@ private static Optional<TtlDuration> ttlForKeyAndValue(
return Optional.empty();
}

private static class TtlDeduplicator implements Processor<String, String, String, String> {

private ProcessorContext<String, String> context;
private TimestampedKeyValueStore<String, String> ttlStore;

@Override
public void init(final ProcessorContext<String, String> context) {
this.context = context;
this.ttlStore = context.getStateStore(STORE_NAME);
}

@Override
public void process(final Record<String, String> record) {
final ValueAndTimestamp<String> previous = ttlStore.putIfAbsent(
record.key(),
ValueAndTimestamp.make(record.value(), record.timestamp())
);

if (previous == null) {
context.forward(record);
}

}
}

private Map<String, Object> getMutableProperties() {
final Map<String, Object> properties = new HashMap<>(responsiveProps);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.responsive.kafka.internal.db.inmemory;

import static dev.responsive.kafka.internal.db.testutils.Matchers.sameKeyValue;
import static dev.responsive.kafka.testutils.Matchers.sameKeyValue;
import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package dev.responsive.kafka.internal.db.mongo;

import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG;
import static dev.responsive.kafka.internal.db.testutils.Matchers.sameKeyValue;
import static dev.responsive.kafka.testutils.Matchers.sameKeyValue;
import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.responsive.kafka.internal.db.mongo;

import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG;
import static dev.responsive.kafka.internal.db.testutils.Matchers.sameKeyValue;
import static dev.responsive.kafka.testutils.Matchers.sameKeyValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
package dev.responsive.kafka.internal.db.testutils;
/*
* Copyright 2024 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.testutils;

import java.util.Arrays;
import org.apache.kafka.streams.KeyValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.testutils.processors;

import static dev.responsive.kafka.testutils.processors.GenericProcessorSuppliers.getFixedKeySupplier;

import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class Deduplicator {

public static Topology deduplicatorApp(
final String inputTopicName,
final String outputTopicName,
final ResponsiveKeyValueParams params
) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(inputTopicName);

final var storeBuilder = ResponsiveStores.timestampedKeyValueStoreBuilder(
ResponsiveStores.keyValueStore(params), Serdes.String(), Serdes.String()
);
final String storeName = params.name().kafkaName();
input
.processValues(getFixedKeySupplier(DeduplicatorProcessor::new, storeBuilder), storeName)
.to(outputTopicName);

return builder.build();
}

private static class DeduplicatorProcessor implements FixedKeyProcessor<String, String, String> {

private final String storeName;

private FixedKeyProcessorContext<String, String> context;
private TimestampedKeyValueStore<String, String> ttlStore;

public DeduplicatorProcessor(final String storeName) {
this.storeName = storeName;
}

@Override
public void init(final FixedKeyProcessorContext<String, String> context) {
this.context = context;
this.ttlStore = context.getStateStore(storeName);
}

@Override
public void process(final FixedKeyRecord<String, String> record) {
final ValueAndTimestamp<String> previous = ttlStore.putIfAbsent(
record.key(),
ValueAndTimestamp.make(record.value(), record.timestamp())
);

if (previous == null) {
context.forward(record);
}

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.testutils.processors;

import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

public class GenericProcessorSuppliers {

public static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> getSupplier(
final Function<String, Processor<KIn, VIn, KOut, VOut>> processorForStoreName,
final StoreBuilder<?> storeBuilder
) {
return new GenericProcessorSupplier<>(processorForStoreName, storeBuilder);
}

public static <KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> getFixedKeySupplier(
final Function<String, FixedKeyProcessor<KIn, VIn, VOut>> processorForStoreName,
final StoreBuilder<?> storeBuilder
) {
return new GenericFixedKeyProcessorSupplier<>(processorForStoreName, storeBuilder);
}

private static class GenericProcessorSupplier<KIn, VIn, KOut, VOut>
implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

private final Function<String, Processor<KIn, VIn, KOut, VOut>> processorForStoreName;
private final StoreBuilder<?> storeBuilder;

public GenericProcessorSupplier(
final Function<String, Processor<KIn, VIn, KOut, VOut>> processorForStoreName,
final StoreBuilder<?> storeBuilder
) {
this.processorForStoreName = processorForStoreName;
this.storeBuilder = storeBuilder;
}

@Override
public Processor<KIn, VIn, KOut, VOut> get() {
return processorForStoreName.apply(storeBuilder.name());
}

@Override
public Set<StoreBuilder<?>> stores() {
if (storeBuilder != null) {
return Collections.singleton(storeBuilder);
}
return null;
}
}

private static class GenericFixedKeyProcessorSupplier<KIn, VIn, VOut>
implements FixedKeyProcessorSupplier<KIn, VIn, VOut> {

private final Function<String, FixedKeyProcessor<KIn, VIn, VOut>> processorForStoreName;
private final StoreBuilder<?> storeBuilder;

public GenericFixedKeyProcessorSupplier(
final Function<String, FixedKeyProcessor<KIn, VIn, VOut>> processorForStoreName,
final StoreBuilder<?> storeBuilder
) {
this.processorForStoreName = processorForStoreName;
this.storeBuilder = storeBuilder;
}

@Override
public FixedKeyProcessor<KIn, VIn, VOut> get() {
return processorForStoreName.apply(storeBuilder.name());
}

@Override
public Set<StoreBuilder<?>> stores() {
if (storeBuilder != null) {
return Collections.singleton(storeBuilder);
}
return null;
}
}

}
4 changes: 3 additions & 1 deletion responsive-test-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ dependencies {
implementation(project(":kafka-client"))
api(libs.kafka.streams.test.utils)

implementation(libs.bundles.scylla)

implementation(variantOf(libs.kafka.clients) {
classifier("test")
})

implementation(libs.bundles.scylla)
testImplementation(project(":kafka-client", configuration = "testArtifacts"))

testImplementation(testlibs.bundles.base)
testImplementation(libs.bundles.logging)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.time.Instant;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
Expand Down
Loading
Loading