Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix async ttd code and add test #398

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers;
Expand Down Expand Up @@ -510,26 +511,16 @@ public Params withTime(final Time time) {
// that it's impossible to use a Params instance that hasn't called build(),
// but that felt a little extra
public Params build() {
final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG);

final KafkaClientSupplier delegateKafkaClientSupplier;
if (asyncThreadPoolSize > 0) {

final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry(
streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG),
asyncThreadPoolSize,
responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG),
metrics
);
delegateKafkaClientSupplier = new AsyncStreamsKafkaClientSupplier(
innerClientSupplier,
asyncRegistry
);
this.asyncThreadPoolRegistry = Optional.of(asyncRegistry);
} else {
delegateKafkaClientSupplier = innerClientSupplier;
this.asyncThreadPoolRegistry = Optional.empty();
}
this.asyncThreadPoolRegistry = AsyncUtils.configuredAsyncThreadPool(
responsiveConfig,
streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG),
metrics
);
final KafkaClientSupplier delegateKafkaClientSupplier =
asyncThreadPoolRegistry.isPresent() ? new AsyncStreamsKafkaClientSupplier(
innerClientSupplier,
asyncThreadPoolRegistry.get()
) : innerClientSupplier;

this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier(
delegateKafkaClientSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public <KOut, VOut> void scheduleForProcessing(
= inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>());

for (final AsyncEvent event : events) {
log.info("Scheduled event {}", event.inputRecord());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this really should be at DEBUG at the very least...

try {
queueSemaphore.acquire();
} catch (final InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
package dev.responsive.kafka.api.async.internals;

import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;

import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder;
import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
Expand Down Expand Up @@ -130,4 +135,24 @@ public static int processorRecordContextHashCode(
return result;
}

public static Optional<AsyncThreadPoolRegistry> configuredAsyncThreadPool(
final ResponsiveConfig responsiveConfig,
final int numStreamThreads,
final ResponsiveMetrics metrics
) {
final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG);

if (asyncThreadPoolSize > 0) {
final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry(
numStreamThreads,
asyncThreadPoolSize,
responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG),
metrics
);
return Optional.of(asyncRegistry);
} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@

import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.clients.TTDCassandraClient;
import dev.responsive.kafka.internal.clients.TTDMockAdmin;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.utils.SessionClients;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
Expand Down Expand Up @@ -139,12 +145,28 @@ private static Properties testDriverProps(
final var restoreListener = mockRestoreListener(props);
sessionClients.initialize(restoreListener.metrics(), restoreListener);

props.putAll(new InternalSessionConfigs.Builder()
final var metrics = new ResponsiveMetrics(new Metrics());
final String appId = userProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
metrics.initializeTags(
appId,
appId + "-client",
ClientVersionMetadata.loadVersionMetadata(),
Collections.emptyMap()
);

final var sessionConfig = new InternalSessionConfigs.Builder()
.withSessionClients(sessionClients)
.withStoreRegistry(client.storeRegistry())
.withTopologyDescription(topologyDescription)
.build()
);
.withMetrics(metrics)
.withTopologyDescription(topologyDescription);

AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(props), 1, metrics)
.ifPresent(threadPoolRegistry -> {
threadPoolRegistry.startNewAsyncThreadPool("Test worker");
sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry);
});

props.putAll(sessionConfig.build());
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@

package dev.responsive.kafka.api;

import static dev.responsive.kafka.api.async.AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier;
import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import dev.responsive.kafka.api.stores.TtlProvider;
Expand All @@ -38,6 +40,9 @@
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.hamcrest.MatcherAssert;
Expand Down Expand Up @@ -152,7 +157,8 @@ public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) {

@ParameterizedTest
@EnumSource(SchemaTypes.KVSchema.class)
public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) {
public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type)
throws InterruptedException {
// Given:
final Duration defaultTtl = Duration.ofMillis(15);

Expand Down Expand Up @@ -261,6 +267,7 @@ private ResponsiveTopologyTestDriver setupDriver(final Topology topology) {
final Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, 2);

return new ResponsiveTopologyTestDriver(topology, props, STARTING_TIME);
}
Expand All @@ -283,6 +290,20 @@ private Topology topology(final ResponsiveKeyValueParams storeParams) {
.join(people, (bid, person) -> bid + "," + person)
// state is the 6th column
.filter((k, v) -> v.split(",")[5].equals("CA"))
.processValues(createAsyncProcessorSupplier(() -> new FixedKeyProcessor<String, String, String>() {

private FixedKeyProcessorContext<String, String> context;

@Override
public void init(final FixedKeyProcessorContext<String, String> context) {
this.context = context;
}

@Override
public void process(final FixedKeyRecord<String, String> fixedKeyRecord) {
context.forward(fixedKeyRecord);
}
}))
.to("output");

return builder.build();
Expand Down
Loading