Skip to content

Commit

Permalink
Support async in Responsive TTD (#400)
Browse files Browse the repository at this point in the history
Combination of #398 and #399 for the full fix
  • Loading branch information
ableegoldman authored Dec 10, 2024
1 parent d7f3641 commit ee47f12
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

package dev.responsive.kafka.api;

import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG;
Expand All @@ -32,6 +30,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers;
Expand Down Expand Up @@ -510,26 +509,16 @@ public Params withTime(final Time time) {
// that it's impossible to use a Params instance that hasn't called build(),
// but that felt a little extra
public Params build() {
final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG);

final KafkaClientSupplier delegateKafkaClientSupplier;
if (asyncThreadPoolSize > 0) {

final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry(
streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG),
asyncThreadPoolSize,
responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG),
metrics
);
delegateKafkaClientSupplier = new AsyncStreamsKafkaClientSupplier(
innerClientSupplier,
asyncRegistry
);
this.asyncThreadPoolRegistry = Optional.of(asyncRegistry);
} else {
delegateKafkaClientSupplier = innerClientSupplier;
this.asyncThreadPoolRegistry = Optional.empty();
}
this.asyncThreadPoolRegistry = AsyncUtils.configuredAsyncThreadPool(
responsiveConfig,
streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG),
metrics
);
final KafkaClientSupplier delegateKafkaClientSupplier =
asyncThreadPoolRegistry.isPresent() ? new AsyncStreamsKafkaClientSupplier(
innerClientSupplier,
asyncThreadPoolRegistry.get()
) : innerClientSupplier;

this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier(
delegateKafkaClientSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

package dev.responsive.kafka.api.async.internals;

import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_FLUSH_INTERVAL_MS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry;

import dev.responsive.kafka.api.async.AsyncProcessorSupplier;
import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext;
Expand All @@ -40,7 +40,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
Expand Down Expand Up @@ -218,7 +217,7 @@ private void initFields(
final long punctuationInterval = configs.getLong(ASYNC_FLUSH_INTERVAL_MS_CONFIG);
final int maxEventsPerKey = configs.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG);

this.asyncThreadPoolRegistration = getAsyncThreadPool(taskContext, streamThreadName);
this.asyncThreadPoolRegistration = getAsyncThreadPool(appConfigs, streamThreadName);
asyncThreadPoolRegistration.registerAsyncProcessor(taskId, this::flushPendingEventsForCommit);
asyncThreadPoolRegistration.threadPool().maybeInitThreadPoolMetrics();

Expand Down Expand Up @@ -756,17 +755,5 @@ private void verifyConnectedStateStores(
}
}

private static AsyncThreadPoolRegistration getAsyncThreadPool(
final ProcessingContext context,
final String streamThreadName
) {
try {
final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(context.appConfigs());
return registry.asyncThreadPoolForStreamThread(streamThreadName);
} catch (final Exception e) {
throw new ConfigException(
"Unable to locate async thread pool registry. Make sure to configure "
+ ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public <KOut, VOut> void scheduleForProcessing(
= inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>());

for (final AsyncEvent event : events) {
log.trace("Scheduled event {}", event.inputRecord());
try {
queueSemaphore.acquire();
} catch (final InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@
package dev.responsive.kafka.api.async.internals;

import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry;

import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder;
import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.StoreBuilder;
Expand Down Expand Up @@ -130,4 +137,38 @@ public static int processorRecordContextHashCode(
return result;
}

public static Optional<AsyncThreadPoolRegistry> configuredAsyncThreadPool(
final ResponsiveConfig responsiveConfig,
final int numStreamThreads,
final ResponsiveMetrics metrics
) {
final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG);

if (asyncThreadPoolSize > 0) {
final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry(
numStreamThreads,
asyncThreadPoolSize,
responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG),
metrics
);
return Optional.of(asyncRegistry);
} else {
return Optional.empty();
}
}

public static AsyncThreadPoolRegistration getAsyncThreadPool(
final Map<String, Object> appConfigs,
final String streamThreadName
) {
try {
final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(appConfigs);
return registry.asyncThreadPoolForStreamThread(streamThreadName);
} catch (final Exception e) {
throw new ConfigException(
"Unable to locate async thread pool registry. Make sure to configure "
+ ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public Segmenter(
);
}

public long retentionPeriodMs() {
return retentionPeriodMs;
}

public long segmentIntervalMs() {
return segmentIntervalMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,42 @@

package dev.responsive.kafka.api;

import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool;
import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG;
import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration;
import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.clients.TTDCassandraClient;
import dev.responsive.kafka.internal.clients.TTDMockAdmin;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.metrics.ClientVersionMetadata;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.utils.SessionClients;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TTDUtils.TopologyTestDriverAccessor;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;

public class ResponsiveTopologyTestDriver extends TopologyTestDriver {
public class ResponsiveTopologyTestDriver extends TopologyTestDriverAccessor {
public static final String RESPONSIVE_TTD_ORG = "Responsive";
public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver";

private final TTDCassandraClient client;
private final Optional<AsyncThreadPoolRegistration> asyncThreadPool;

/**
* Create a new test diver instance with default test properties.
Expand Down Expand Up @@ -91,7 +105,7 @@ public ResponsiveTopologyTestDriver(
) {
this(
topology,
config,
baseProps(config),
initialWallClockTime,
new TTDCassandraClient(
new TTDMockAdmin(baseProps(config), topology),
Expand All @@ -109,9 +123,16 @@ public ResponsiveTopologyTestDriver(
@Override
public void advanceWallClockTime(final Duration advance) {
client.advanceWallClockTime(advance);
client.flush();
super.advanceWallClockTime(advance);
}

public void flush() {
asyncThreadPool.ifPresent(AsyncThreadPoolRegistration::flushAllAsyncEvents);
client.flush();
super.advanceWallClockTime(Duration.ZERO);
}

private ResponsiveTopologyTestDriver(
final Topology topology,
final Properties config,
Expand All @@ -124,28 +145,55 @@ private ResponsiveTopologyTestDriver(
initialWallClockTime
);
this.client = cassandraClient;
this.asyncThreadPool = getAsyncThreadPoolRegistration(super.props());
}

@Override
protected <K, V> void pipeRecord(
final String topic,
final TestRecord<K, V> record,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final Instant time
) {
super.pipeRecord(topic, record, keySerializer, valueSerializer, time);
flush();
}

private static Properties testDriverProps(
final Properties userProps,
final Properties baseProps,
final TopologyDescription topologyDescription,
final TTDCassandraClient client
) {
final Properties props = baseProps(userProps);

final SessionClients sessionClients = new SessionClients(
Optional.empty(), Optional.of(client), Optional.empty(), false, client.mockAdmin()
);
final var restoreListener = mockRestoreListener(props);
final var restoreListener = mockRestoreListener(baseProps);
sessionClients.initialize(restoreListener.metrics(), restoreListener);

props.putAll(new InternalSessionConfigs.Builder()
final var metrics = new ResponsiveMetrics(new Metrics());
final String appId = baseProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
metrics.initializeTags(
appId,
appId + "-client",
ClientVersionMetadata.loadVersionMetadata(),
Collections.emptyMap()
);

final var sessionConfig = new InternalSessionConfigs.Builder()
.withSessionClients(sessionClients)
.withStoreRegistry(client.storeRegistry())
.withTopologyDescription(topologyDescription)
.build()
);
return props;
.withMetrics(metrics)
.withTopologyDescription(topologyDescription);

AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(baseProps), 1, metrics)
.ifPresent(threadPoolRegistry -> {
threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName());
sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry);
});

baseProps.putAll(sessionConfig.build());
return baseProps;
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -175,6 +223,20 @@ private static MockTime mockTime(final Instant initialWallClockTime) {
return mockTime;
}

private static Optional<AsyncThreadPoolRegistration> getAsyncThreadPoolRegistration(
final Properties props
) {
final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0);

if (asyncThreadPoolSize > 0) {
final Map<String, Object> configMap = new HashMap<>();
// stupid conversion to deal with Map<String, Object> vs Properties type discrepancy
props.forEach((key, value) -> configMap.put(key.toString(), value));

return Optional.of(getAsyncThreadPool(configMap, Thread.currentThread().getName()));
} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public long currentWallClockTimeMs() {
}

public void advanceWallClockTime(final Duration advance) {
flush();
time.sleep(advance.toMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public TTDWindowTable(
) {
super(client);
this.name = spec.tableName();
this.stub = new WindowStoreStub();
this.stub = new WindowStoreStub(partitioner.segmenter().retentionPeriodMs());
this.partitioner = partitioner;
}

Expand Down Expand Up @@ -207,17 +207,17 @@ protected RemoteWriteResult<SegmentPartition> updateOffsetAndStreamTime(
final long consumedOffset,
final long streamTime
) {
return null;
return RemoteWriteResult.success(null);
}

@Override
protected RemoteWriteResult<SegmentPartition> createSegment(final SegmentPartition partition) {
return null;
return RemoteWriteResult.success(null);
}

@Override
protected RemoteWriteResult<SegmentPartition> deleteSegment(final SegmentPartition partition) {
return null;
return RemoteWriteResult.success(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public class WindowStoreStub {
private final long retentionPeriod;
private long observedStreamTime = 0L;

public WindowStoreStub() {
// TODO: how can we pass the actual retention period through to the store stub?
this.retentionPeriod = 15L;
public WindowStoreStub(final long retentionPeriod) {
this.retentionPeriod = retentionPeriod;
}

public void put(final WindowedKey key, final byte[] value) {
Expand All @@ -48,7 +47,7 @@ public byte[] fetch(
final long windowStart
) {
final WindowedKey windowedKey = new WindowedKey(key, windowStart);
if (windowStart < minValidTimestamp() && records.containsKey(windowedKey)) {
if (windowStart > minValidTimestamp() && records.containsKey(windowedKey)) {
return records.get(windowedKey);
} else {
return null;
Expand Down
Loading

0 comments on commit ee47f12

Please sign in to comment.