From 156a78d6f1dfc9eb666accbda45e00fa4df1084d Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 23 May 2024 15:24:27 -0700 Subject: [PATCH 1/4] bump AK to 3.8 and default DSL stores to Responsive --- .../kafka/api/ResponsiveKafkaStreams.java | 7 +- .../stores/ResponsiveDslStoreSuppliers.java | 56 ++++++ .../api/stores/ResponsiveSessionParams.java | 31 ++-- .../kafka/api/stores/ResponsiveStores.java | 13 ++ .../internal/clients/DelegatingConsumer.java | 6 + .../internal/clients/ResponsiveConsumer.java | 1 + .../internal/clients/ResponsiveProducer.java | 6 + .../integration/MinimalIntegrationTest.java | 3 +- ...ResponsiveKafkaStreamsIntegrationTest.java | 171 ++++++++++++++++++ ...esponsiveKeyValueStoreIntegrationTest.java | 58 +----- .../kafka/testutils/IntegrationTestUtils.java | 40 ++++ .../kafka/testutils/ResponsiveExtension.java | 4 + .../GlobalStreamThreadIntegrationTest.java | 13 +- settings.gradle.kts | 2 +- 14 files changed, 330 insertions(+), 81 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 59c08a990..932a3b27b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -34,6 +34,7 @@ import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier; import dev.responsive.kafka.internal.config.ConfigUtils; import dev.responsive.kafka.internal.config.InternalSessionConfigs; @@ -271,7 +272,6 @@ private static Properties propsWithOverrides( ) { final Properties propsWithOverrides = new Properties(); - final InternalSessionConfigs.Builder internalConfBuilder = new InternalSessionConfigs.Builder() .withSessionClients(sessionClients) .withStoreRegistry(storeRegistry) @@ -313,6 +313,11 @@ private static Properties propsWithOverrides( throw new ConfigException(errorMsg); } + propsWithOverrides.putIfAbsent( + StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, + ResponsiveDslStoreSuppliers.class.getCanonicalName() + ); + return propsWithOverrides; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java new file mode 100644 index 000000000..2c9d7c693 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.api.stores; + +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.DslSessionParams; +import org.apache.kafka.streams.state.DslStoreSuppliers; +import org.apache.kafka.streams.state.DslWindowParams; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; + +public class ResponsiveDslStoreSuppliers implements DslStoreSuppliers { + + @Override + public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams dslKeyValueParams) { + return ResponsiveStores.keyValueStore( + ResponsiveKeyValueParams.keyValue(dslKeyValueParams.name()) + ); + } + + @Override + public WindowBytesStoreSupplier windowStore(final DslWindowParams dslWindowParams) { + return ResponsiveStores.windowStoreSupplier( + ResponsiveWindowParams.window( + dslWindowParams.name(), + dslWindowParams.windowSize(), + dslWindowParams.retentionPeriod().minus(dslWindowParams.windowSize()) + ) + ); + } + + @Override + public SessionBytesStoreSupplier sessionStore(final DslSessionParams dslSessionParams) { + return ResponsiveStores.sessionStoreSupplier( + ResponsiveSessionParams.session( + dslSessionParams.name(), + dslSessionParams.retentionPeriod().toMillis() + ) + ); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java index 43b18ad8f..348c097dd 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java @@ -28,35 +28,36 @@ public final class ResponsiveSessionParams { private final TableName name; private final SessionSchema schemaType; - private final long inactivityGapMs; - private final long gracePeriodMs; private final long retentionPeriodMs; - - private long numSegments; + private final long numSegments; private ResponsiveSessionParams( final String name, final SessionSchema schemaType, - final Duration inactivityGap, - final Duration gracePeriod + final long retentionPeriodMs ) { this.name = new TableName(name); this.schemaType = schemaType; - this.inactivityGapMs = durationToMillis(inactivityGap, "inactivityGap"); - this.gracePeriodMs = durationToMillis(gracePeriod, "gracePeriod"); - - this.retentionPeriodMs = this.inactivityGapMs + this.gracePeriodMs; + this.retentionPeriodMs = retentionPeriodMs; this.numSegments = computeDefaultNumSegments(retentionPeriodMs); } + public static ResponsiveSessionParams session( + final String name, + final long retentionPeriodMs + ) { + return new ResponsiveSessionParams(name, SessionSchema.SESSION, retentionPeriodMs); + } + public static ResponsiveSessionParams session( final String name, final Duration inactivityGap, final Duration gracePeriod ) { - return new ResponsiveSessionParams( - name, SessionSchema.SESSION, inactivityGap, gracePeriod - ); + final long inactivityGapMs = durationToMillis(inactivityGap, "inactivityGap"); + final long gracePeriodMs = durationToMillis(gracePeriod, "gracePeriod"); + final long retentionPeriodMs = inactivityGapMs + gracePeriodMs; + return new ResponsiveSessionParams(name, SessionSchema.SESSION, retentionPeriodMs); } public SessionSchema schemaType() { @@ -75,10 +76,6 @@ public long numSegments() { return this.numSegments; } - public long gracePeriodMs() { - return this.gracePeriodMs; - } - private static long computeDefaultNumSegments(final long retentionPeriodMs) { // TODO: Smart implementation. return DEFAULT_NUM_SEGMENTS; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java index d71a359c6..fc113c4dd 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java @@ -316,6 +316,19 @@ public static Materialized> windowMateri ); } + /** + * See for example {@link Stores#inMemorySessionStore(String, Duration)} + * + * @param params the {@link ResponsiveSessionParams} for this store + * @return a supplier for a session store with the given options + * that uses Responsive's storage for its backend + */ + public static SessionBytesStoreSupplier sessionStoreSupplier( + final ResponsiveSessionParams params + ) { + return new ResponsiveSessionStoreSupplier(params); + } + /** * Create a {@link StoreBuilder} that can be used to build a Responsive * {@link SessionStore} and connect it via the Processor API. If using the DSL, use diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java index b24cd8ba0..94a65f061 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; public abstract class DelegatingConsumer implements Consumer { @@ -292,4 +293,9 @@ public void close(final Duration timeout) { public void wakeup() { delegate.wakeup(); } + + @Override + public Uuid clientInstanceId(final Duration duration) { + return delegate.clientInstanceId(duration); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java index 948951cb3..01997bbd0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java index ad5d6890b..507342634 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,6 +147,11 @@ public List partitionsFor(final String topic) { return wrapped.metrics(); } + @Override + public Uuid clientInstanceId(final Duration duration) { + return wrapped.clientInstanceId(duration); + } + @Override public void close() { wrapped.close(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java index e93ea2b33..d3526bb88 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -138,7 +139,7 @@ private ResponsiveKafkaStreams buildStreams(final Map properties final KStream input = builder.stream(inputTopic()); input .groupByKey() - .count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("countStore"))) + .count(Named.as("count")) .toStream() .to(outputTopic()); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java new file mode 100644 index 000000000..4ba44fae5 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java @@ -0,0 +1,171 @@ +/* + * Copyright 2023 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.integration; + +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.internal.utils.SessionUtil; +import dev.responsive.kafka.testutils.IntegrationTestUtils; +import dev.responsive.kafka.testutils.KeyValueTimestamp; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import dev.responsive.kafka.testutils.ResponsiveExtension; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.bson.types.Binary; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.MongoDBContainer; + +public class ResponsiveKafkaStreamsIntegrationTest { + + public static final String COUNT_TABLE_NAME = "count"; + @RegisterExtension + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + + private final Map responsiveProps = new HashMap<>(); + + private String name; + private MongoDBContainer mongo; + + @BeforeEach + public void before( + final TestInfo info, + final Admin admin, + final MongoDBContainer mongo, + @ResponsiveConfigParam final Map responsiveProps + ) throws InterruptedException, ExecutionException { + // add displayName to name to account for parameterized tests + name = info.getTestMethod().orElseThrow().getName() + "-" + new Random().nextInt(); + this.mongo= mongo; + + this.responsiveProps.putAll(responsiveProps); + + final var result = admin.createTopics( + List.of( + new NewTopic(inputTopic(), Optional.of(1), Optional.empty()), + new NewTopic(outputTopic(), Optional.of(1), Optional.empty()) + ) + ); + result.all().get(); + } + + private String inputTopic() { + return name + "." + INPUT_TOPIC; + } + + private String outputTopic() { + return name + "." + OUTPUT_TOPIC; + } + + @Test + public void shouldDefaultToResponsiveStoresWhenUsingDsl() throws Exception { + // Given: + final List> inputEvents = Arrays.asList( + new KeyValueTimestamp<>("key", "a", 0L), + new KeyValueTimestamp<>("key", "b", 2_000L), + new KeyValueTimestamp<>("key", "c", 3_000L), + new KeyValueTimestamp<>("STOP", "ignored", 18_000L) + ); + final CountDownLatch outputLatch = new CountDownLatch(1); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = builder.stream(inputTopic()); + input + .groupByKey() + .count(Named.as(COUNT_TABLE_NAME)) + .toStream() + .peek((k, v) -> { + if (k.equals("STOP")) { + outputLatch.countDown(); + } + }) + .selectKey((k, v) -> k) + .to(outputTopic()); + + // When: + final Map properties = + IntegrationTestUtils.getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + properties.put(STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + final KafkaProducer producer = new KafkaProducer<>(properties); + try ( + final ResponsiveKafkaStreams kafkaStreams = + new ResponsiveKafkaStreams(builder.build(), properties) + ) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputEvents); + + final long maxWait = inputEvents.get(inputEvents.size() - 1).timestamp() + 2_000; + assertThat( + outputLatch.await(maxWait, TimeUnit.MILLISECONDS), + Matchers.equalTo(true) + ); + } + + // Then: + try ( + final var mongoClient = SessionUtil.connect(mongo.getConnectionString(), null, null); + final var deserializer = new StringDeserializer(); + ) { + final List dbs = new ArrayList<>(); + mongoClient.listDatabaseNames().into(dbs); + assertThat(dbs, hasItem("kstream_aggregate_state_store_0000000001")); + + final var db = mongoClient.getDatabase("kstream_aggregate_state_store_0000000001"); + final var collection = db.getCollection("kv_data"); + final long numDocs = collection.countDocuments(); + assertThat(numDocs, is(2L)); + + final List keys = new ArrayList<>(); + collection.find() + .map(doc -> doc.get("_id", Binary.class).getData()) + .map(doc -> deserializer.deserialize("", doc)) + .into(keys); + assertThat(keys, hasItems("key", "STOP")); + } + } + +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java index 907f72aa1..825221821 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java @@ -18,26 +18,14 @@ import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; -import static org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.hamcrest.MatcherAssert.assertThat; import dev.responsive.kafka.api.ResponsiveKafkaStreams; -import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; +import dev.responsive.kafka.testutils.IntegrationTestUtils; import dev.responsive.kafka.testutils.KeyValueTimestamp; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; @@ -55,11 +43,6 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; @@ -84,7 +67,6 @@ public class ResponsiveKeyValueStoreIntegrationTest { private final Map responsiveProps = new HashMap<>(); private String name; - private Admin admin; @BeforeEach public void before( @@ -97,7 +79,6 @@ public void before( this.responsiveProps.putAll(responsiveProps); - this.admin = admin; final var result = admin.createTopics( List.of( new NewTopic(inputTopic(), Optional.of(1), Optional.empty()), @@ -169,7 +150,8 @@ public void shouldMatchRocksDB() throws Exception { .to(outputTopic()); // When: - final Map properties = getMutablePropertiesWithStringSerdes(); + final Map properties = + IntegrationTestUtils.getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); properties.put(STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); final KafkaProducer producer = new KafkaProducer<>(properties); try ( @@ -187,38 +169,4 @@ public void shouldMatchRocksDB() throws Exception { } } - private Map getMutablePropertiesWithStringSerdes() { - final Map properties = getMutableProperties(); - properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - return properties; - } - - private Map getMutableProperties() { - final Map properties = new HashMap<>(responsiveProps); - - properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - - properties.put(APPLICATION_ID_CONFIG, name); - properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); - properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); - properties.put(NUM_STREAM_THREADS_CONFIG, 1); - properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); // commit as often as possible - - properties.put(consumerPrefix(SESSION_TIMEOUT_MS_CONFIG), 5_000 - 1); - - properties.put(consumerPrefix(MAX_POLL_RECORDS_CONFIG), 1); - - properties.put(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); - - return properties; - } - } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index 869860238..1e6dfaaec 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -1,6 +1,18 @@ package dev.responsive.kafka.testutils; +import static org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -34,6 +46,11 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -368,6 +385,29 @@ public static void startAppAndAwaitRunning( } } + public static Map getDefaultMutablePropertiesWithStringSerdes( + final Map responsiveProps, + final String name + ) { + final Map properties = new HashMap<>(responsiveProps); + properties.put(APPLICATION_ID_CONFIG, name); + properties.put(NUM_STREAM_THREADS_CONFIG, 1); + properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); // commit as often as possible + + properties.put(consumerPrefix(SESSION_TIMEOUT_MS_CONFIG), 5_000 - 1); + properties.put(consumerPrefix(MAX_POLL_RECORDS_CONFIG), 1); + properties.put(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); + + properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + + return properties; + } + private IntegrationTestUtils() { } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java index 5a903f253..3ee5ee515 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java @@ -89,6 +89,7 @@ public void beforeAll(final ExtensionContext context) throws Exception { @Override public void afterAll(final ExtensionContext context) throws Exception { cassandra.stop(); + mongo.stop(); kafka.stop(); admin.close(); } @@ -100,6 +101,7 @@ public boolean supportsParameter( ) throws ParameterResolutionException { return parameterContext.getParameter().getType().equals(CassandraContainer.class) || parameterContext.getParameter().getType().equals(KafkaContainer.class) + || parameterContext.getParameter().getType().equals(MongoDBContainer.class) || parameterContext.getParameter().getType().equals(Admin.class) || isContainerConfig(parameterContext); } @@ -111,6 +113,8 @@ public Object resolveParameter( ) throws ParameterResolutionException { if (parameterContext.getParameter().getType() == CassandraContainer.class) { return cassandra; + } else if (parameterContext.getParameter().getType() == MongoDBContainer.class) { + return mongo; } else if (parameterContext.getParameter().getType() == KafkaContainer.class) { return kafka; } else if (parameterContext.getParameter().getType() == Admin.class) { diff --git a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java index 62a329a18..3425f96eb 100644 --- a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java +++ b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java @@ -270,12 +270,13 @@ private GlobalStreamThread getThread( final Time time = new SystemTime(); final InternalTopologyBuilder builder = new InternalTopologyBuilder(); builder.addGlobalStore( - new KeyValueStoreBuilder<>( - storeSupplier, - new ByteArraySerde(), - new ByteArraySerde(), - time - ).withLoggingDisabled(), + new StoreBuilderWrapper( + new KeyValueStoreBuilder<>( + storeSupplier, + new ByteArraySerde(), + new ByteArraySerde(), + time).withLoggingDisabled() + ), "global", null, null, diff --git a/settings.gradle.kts b/settings.gradle.kts index 20b033a15..cb272c9f4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -45,7 +45,7 @@ dependencyResolutionManagement { versionCatalogs { create("libs") { version("jackson", "2.14.2") - version("kafka", "3.6.0") + version("kafka", "3.7.0") version("scylla", "4.15.0.0") version("javaoperatorsdk", "4.3.0") version("grpc", "1.52.1") From 10ab454b9b25d1771af2238d3e01c159c7ff1c52 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 23 May 2024 15:48:14 -0700 Subject: [PATCH 2/4] checkstyle --- .../responsive/kafka/internal/clients/ResponsiveConsumer.java | 1 - .../responsive/kafka/integration/MinimalIntegrationTest.java | 2 -- .../integration/ResponsiveKafkaStreamsIntegrationTest.java | 2 +- .../dev/responsive/kafka/testutils/IntegrationTestUtils.java | 2 -- 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java index 01997bbd0..948951cb3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java @@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java index d3526bb88..cba1aad00 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java @@ -37,8 +37,6 @@ import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.api.config.StorageBackend; -import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; -import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.time.Duration; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java index 4ba44fae5..66b0234ec 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java @@ -80,7 +80,7 @@ public void before( ) throws InterruptedException, ExecutionException { // add displayName to name to account for parameterized tests name = info.getTestMethod().orElseThrow().getName() + "-" + new Random().nextInt(); - this.mongo= mongo; + this.mongo = mongo; this.responsiveProps.putAll(responsiveProps); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index 1e6dfaaec..c36e984eb 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -46,8 +46,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; From b19ca6fb120e5d2fc8ba034fbbc0dcc27a24e57e Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 4 Jun 2024 08:32:18 -0700 Subject: [PATCH 3/4] address feedback --- .../java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java | 2 +- .../kafka/api/stores/ResponsiveDslStoreSuppliers.java | 2 +- .../responsive/kafka/api/stores/ResponsiveSessionParams.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 932a3b27b..ad02e7306 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -315,7 +315,7 @@ private static Properties propsWithOverrides( propsWithOverrides.putIfAbsent( StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, - ResponsiveDslStoreSuppliers.class.getCanonicalName() + ResponsiveDslStoreSuppliers.class.getName() ); return propsWithOverrides; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java index 2c9d7c693..fcfa97372 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java @@ -49,7 +49,7 @@ public SessionBytesStoreSupplier sessionStore(final DslSessionParams dslSessionP return ResponsiveStores.sessionStoreSupplier( ResponsiveSessionParams.session( dslSessionParams.name(), - dslSessionParams.retentionPeriod().toMillis() + dslSessionParams.retentionPeriod() ) ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java index 348c097dd..d7db019ea 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java @@ -44,9 +44,9 @@ private ResponsiveSessionParams( public static ResponsiveSessionParams session( final String name, - final long retentionPeriodMs + final Duration retention ) { - return new ResponsiveSessionParams(name, SessionSchema.SESSION, retentionPeriodMs); + return new ResponsiveSessionParams(name, SessionSchema.SESSION, retention.toMillis()); } public static ResponsiveSessionParams session( From d773320a8f1a5a9bbf8a3b7cf5be5a24315a645a Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Fri, 7 Jun 2024 12:09:39 -0600 Subject: [PATCH 4/4] KIP-924: Custom assignment integration test This PR includes a new integration test that uses a simple custom assignor. In this case, the simplest way to make sure the custom assignment is working is to not assign any of the tasks and make sure the streams application is not making progress. --- buildSrc/build.gradle.kts | 1 + ...ponsive.java-common-conventions.gradle.kts | 1 + .../integration/CustomAssignmentTest.java | 260 ++++++++++++++++++ .../GlobalStreamThreadIntegrationTest.java | 3 +- settings.gradle.kts | 2 +- 5 files changed, 265 insertions(+), 2 deletions(-) create mode 100644 kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index f55a2fb1a..b3eabb4d1 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -19,6 +19,7 @@ plugins { } repositories { + mavenLocal() gradlePluginPortal() } diff --git a/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts b/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts index af46b8cf5..5d362a331 100644 --- a/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts @@ -44,6 +44,7 @@ checkstyle { repositories { mavenCentral() + mavenLocal() } tasks.test { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java new file mode 100644 index 000000000..35405a60b --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java @@ -0,0 +1,260 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.integration; + +import static dev.responsive.kafka.testutils.IntegrationTestUtils.createTopicsAndWait; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeInput; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTION_TIMEOUT_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; +import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; +import dev.responsive.kafka.api.stores.ResponsiveStores; +import dev.responsive.kafka.internal.stores.SchemaTypes; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import dev.responsive.kafka.testutils.ResponsiveExtension; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ExtendWith(ResponsiveExtension.class) +public class CustomAssignmentTest { + private static final Logger LOG = LoggerFactory.getLogger(CustomAssignmentTest.class); + + private static final int MAX_POLL_MS = 5000; + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + + private final Map responsiveProps = new HashMap<>(); + + private String name; + private Admin admin; + private ScheduledExecutorService executor; + + @BeforeEach + public void before( + final TestInfo info, + final Admin admin, + @ResponsiveConfigParam final Map responsiveProps + ) { + // add displayName to name to account for parameterized tests + name = info.getTestMethod().orElseThrow().getName() + "-" + new Random().nextInt(); + executor = new ScheduledThreadPoolExecutor(2); + + this.responsiveProps.putAll(responsiveProps); + + this.admin = admin; + createTopicsAndWait(admin, Map.of(inputTopic(), 2, outputTopic(), 1)); + } + + @AfterEach + public void after() { + admin.deleteTopics(List.of(inputTopic(), outputTopic())); + } + + private String inputTopic() { + return name + "." + INPUT_TOPIC; + } + + private String outputTopic() { + return name + "." + OUTPUT_TOPIC; + } + + @Test + public void shouldUseCustomAssignorInRebalance() throws Exception { + // Given: + final Map properties = getMutableProperties(); + final KafkaProducer producer = new KafkaProducer<>(properties); + final SharedState state = new SharedState(); + + // When: + try ( + final ResponsiveKafkaStreams streamsA = buildStreams(properties, "a", state); + ) { + startAppAndAwaitRunning(Duration.ofSeconds(10), streamsA); + pipeInput(inputTopic(), 2, producer, System::currentTimeMillis, 0, 10, 0, 1, 2, 3); + + Thread.sleep(5_000); + assertThat(state.numRecords.get(), equalTo(0)); + } + } + + private Map getMutableProperties() { + final Map properties = new HashMap<>(responsiveProps); + + properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + + properties.put(APPLICATION_ID_CONFIG, name); + properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); + properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); + properties.put(PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); + properties.put(NUM_STREAM_THREADS_CONFIG, 1); + + // this ensures we can control the commits by explicitly requesting a commit + properties.put(COMMIT_INTERVAL_MS_CONFIG, 20_000); + properties.put(producerPrefix(TRANSACTION_TIMEOUT_CONFIG), 20_000); + + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + properties.put(consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + properties.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + properties.put(consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_MS); + properties.put(consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), MAX_POLL_MS); + properties.put(consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_MS - 1); + + properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, CompleteStallAssignor.class.getName()); + + return properties; + } + + private StoreBuilder> storeSupplier(SchemaTypes.KVSchema type) { + return ResponsiveStores.keyValueStoreBuilder( + ResponsiveStores.keyValueStore( + type == SchemaTypes.KVSchema.FACT + ? ResponsiveKeyValueParams.fact(name) + : ResponsiveKeyValueParams.keyValue(name) + ), + Serdes.Long(), + Serdes.Long() + ).withLoggingEnabled( + Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); + } + + private ResponsiveKafkaStreams buildStreams( + final Map originals, + final String instance, + final SharedState state + ) { + final Map properties = new HashMap<>(originals); + properties.put(APPLICATION_SERVER_CONFIG, instance + ":1024"); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.addStateStore(storeSupplier(SchemaTypes.KVSchema.KEY_VALUE)); + + final KStream input = builder.stream(inputTopic()); + input + .process(() -> new TestProcessor(instance, state, name), name) + .to(outputTopic()); + + return new ResponsiveKafkaStreams(builder.build(), properties); + } + + public static class CompleteStallAssignor implements TaskAssignor { + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final Map assignments = new HashMap<>(); + final Collection states = applicationState.kafkaStreamsStates(false).values(); + for (final KafkaStreamsState state : states) { + LOG.info("Client state: {} | {}", state.processId(), state.numProcessingThreads()); + assignments.put(state.processId(), KafkaStreamsAssignment.of(state.processId(), new HashSet<>())); + } + + return new TaskAssignment(assignments.values()); + } + } + + private static class SharedState { + private final AtomicInteger numRecords = new AtomicInteger(0); + } + + private static class TestProcessor implements Processor { + + private final String instance; + private final SharedState state; + private final String name; + private ProcessorContext context; + private KeyValueStore store; + + public TestProcessor(final String instance, final SharedState state, String name) { + this.instance = instance; + this.state = state; + this.name = name; + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + this.store = context.getStateStore(name); + } + + @Override + public void process(final Record record) { + state.numRecords.incrementAndGet(); + final long sum = updateSum(record.key(), record.value()); + context.forward(new Record<>(record.key(), sum, System.currentTimeMillis())); + } + + private long updateSum(final long key, final long value) { + Long sum = store.get(key); + sum = (sum == null) ? value : sum + value; + store.put(key, sum); + return sum; + } + } +} diff --git a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java index 3425f96eb..e293a8548 100644 --- a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java +++ b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java @@ -295,7 +295,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { global.put(record.key(), record.value()); } - } + }, + false ); final String baseDirectoryName = tempDir.getAbsolutePath(); diff --git a/settings.gradle.kts b/settings.gradle.kts index cb272c9f4..8a2ffb945 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -45,7 +45,7 @@ dependencyResolutionManagement { versionCatalogs { create("libs") { version("jackson", "2.14.2") - version("kafka", "3.7.0") + version("kafka", "3.9.0-SNAPSHOT") version("scylla", "4.15.0.0") version("javaoperatorsdk", "4.3.0") version("grpc", "1.52.1")