diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java index 63a212204..1e780ec92 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java @@ -16,10 +16,9 @@ package dev.responsive.kafka.api.async; -import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncFixedKeyProcessor; import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; -import dev.responsive.kafka.api.async.internals.AsyncProcessor; +import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; import java.util.Map; @@ -75,8 +74,11 @@ private AsyncFixedKeyProcessorSupplier( } @Override - public AsyncProcessor get() { - return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + public MaybeAsyncProcessor get() { + return MaybeAsyncProcessor.createFixedKeyProcessor( + userProcessorSupplier.get(), + asyncStoreBuilders + ); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java index ddcb30e4b..353558714 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java @@ -16,10 +16,9 @@ package dev.responsive.kafka.api.async; -import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncProcessor; import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; -import dev.responsive.kafka.api.async.internals.AsyncProcessor; +import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; import java.util.Map; @@ -155,8 +154,8 @@ private AsyncProcessorSupplier( } @Override - public AsyncProcessor get() { - return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + public MaybeAsyncProcessor get() { + return MaybeAsyncProcessor.createProcessor(userProcessorSupplier.get(), asyncStoreBuilders); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 748653cea..ad8e02c29 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -52,11 +52,8 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.api.FixedKeyProcessor; -import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.ProcessingContext; -import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -69,12 +66,18 @@ * -Coordinates the handoff of records between the StreamThread and AyncThreads * -The starting and ending point of all async events -- see {@link AsyncEvent} */ -public class AsyncProcessor - implements Processor, FixedKeyProcessor { +public class AsyncProcessor { - // Exactly one of these is non-null and the other is null - private final Processor userProcessor; - private final FixedKeyProcessor userFixedKeyProcessor; + private final String logPrefix; + private final Logger log; + + private final String streamThreadName; + private final String asyncProcessorName; + private final TaskId taskId; + + private final AsyncThreadPool threadPool; + private final SchedulingQueue schedulingQueue; + private final FinalizingQueue finalizingQueue; private final Map> connectedStoreBuilders; @@ -84,106 +87,32 @@ public class AsyncProcessor // the stream thread should access this. private final Map pendingEvents = new ConcurrentHashMap<>(); - - // This is set at most once. When its set, the thread should immediately throw, and no longer - // try to process further events for this processor. This minimizes the chance of producing - // bad results, particularly with ALOS. - private FatalAsyncException fatalException = null; - - // Everything below this line is effectively final and just has to be initialized in #init // - - private String logPrefix; - private Logger log; - - private String streamThreadName; - private String asyncProcessorName; - private TaskId taskId; - - private AsyncThreadPool threadPool; - private SchedulingQueue schedulingQueue; - private FinalizingQueue finalizingQueue; - - private Cancellable punctuator; + private final Cancellable punctuator; // the context passed to us in init, ie the one created for this task and owned by Kafka Streams - private ProcessingContext taskContext; + private final ProcessingContext taskContext; // the async context owned by the StreamThread that is running this processor/task - private StreamThreadProcessorContext streamThreadContext; + private final StreamThreadProcessorContext streamThreadContext; // the context we pass in to the user so it routes to the actual context based on calling thread - private AsyncUserProcessorContext userContext; - private boolean hasProcessedSomething = false; + private final AsyncUserProcessorContext userContext; - private AsyncProcessorMetricsRecorder metricsRecorder; + private final AsyncProcessorMetricsRecorder metricsRecorder; - public static AsyncProcessor createAsyncProcessor( - final Processor userProcessor, - final Map> connectedStoreBuilders - ) { - return new AsyncProcessor<>(userProcessor, null, connectedStoreBuilders); - } + private boolean hasProcessedSomething = false; - public static AsyncProcessor createAsyncFixedKeyProcessor( - final FixedKeyProcessor userProcessor, - final Map> connectedStoreBuilders - ) { - return new AsyncProcessor<>(null, userProcessor, connectedStoreBuilders); - } + // This is set at most once. When its set, the thread should immediately throw, and no longer + // try to process further events for this processor. This minimizes the chance of producing + // bad results, particularly with ALOS. + private FatalAsyncException fatalException = null; - // Note: the constructor will be called from the main application thread (ie the - // one that creates/starts the KafkaStreams object) so we have to delay the creation - // of most objects until #init since (a) that will be invoked by the actual - // StreamThread processing this, and (b) we need the context supplied to init for - // some of the setup - private AsyncProcessor( - final Processor userProcessor, - final FixedKeyProcessor userFixedKeyProcessor, - final Map> connectedStoreBuilders + public AsyncProcessor( + final Map> connectedStoreBuilders, + final InternalProcessorContext internalContext, + final Runnable userInit ) { - this.userProcessor = userProcessor; - this.userFixedKeyProcessor = userFixedKeyProcessor; this.connectedStoreBuilders = connectedStoreBuilders; - - if (userProcessor == null && userFixedKeyProcessor == null) { - throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null"); - } else if (userProcessor != null && userFixedKeyProcessor != null) { - throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null"); - } - } - - @Override - public void init(final ProcessorContext context) { - - initFields((InternalProcessorContext) context); - - userProcessor.init(userContext); - - completeInitialization(); - } - - // Note: we have to cast and suppress warnings in this version of #init but - // not the other due to the KOut parameter being squashed into KIn in the - // fixed-key version of the processor. However, we know this cast is safe, - // since by definition KIn and KOut are the same type - @SuppressWarnings("unchecked") - @Override - public void init(final FixedKeyProcessorContext context) { - - initFields((InternalProcessorContext) context); - - userFixedKeyProcessor.init((FixedKeyProcessorContext) userContext); - - completeInitialization(); - } - - /** - * Performs the first half of initialization by setting all the class fields - * that have to wait for the context to be passed in to #init to be initialized. - */ - private void initFields( - final InternalProcessorContext internalContext - ) { this.taskContext = internalContext; this.streamThreadName = Thread.currentThread().getName(); @@ -235,6 +164,10 @@ private void initFields( PunctuationType.WALL_CLOCK_TIME, this::punctuate ); + + userInit.run(); + + completeInitialization(); } /** @@ -278,8 +211,7 @@ void assertQueuesEmpty() { } } - @Override - public void process(final Record record) { + public void processRegular(final Record record, final Runnable userProcess) { assertQueuesEmptyOnFirstProcess(); final AsyncEvent newEvent = new AsyncEvent( @@ -290,15 +222,14 @@ public void process(final Record record) { extractRecordContext(taskContext), taskContext.currentStreamTimeMs(), taskContext.currentSystemTimeMs(), - () -> userProcessor.process(record), + userProcess, List.of(metricsRecorder::recordStateTransition) ); - processNewAsyncEvent(newEvent); + process(newEvent); } - @Override - public void process(final FixedKeyRecord record) { + public void processFixedKey(final FixedKeyRecord record, final Runnable userProcess) { assertQueuesEmptyOnFirstProcess(); final AsyncEvent newEvent = new AsyncEvent( @@ -309,14 +240,14 @@ public void process(final FixedKeyRecord record) { extractRecordContext(taskContext), taskContext.currentStreamTimeMs(), taskContext.currentSystemTimeMs(), - () -> userFixedKeyProcessor.process(record), + userProcess, List.of(metricsRecorder::recordStateTransition) ); - processNewAsyncEvent(newEvent); + process(newEvent); } - private void processNewAsyncEvent(final AsyncEvent event) { + private void process(final AsyncEvent event) { if (fatalException != null) { log.error("process called when processor already hit fatal exception", fatalException); throw new IllegalStateException( @@ -351,8 +282,7 @@ private ProcessorRecordContext extractRecordContext(final ProcessingContext cont return ((InternalProcessorContext) context).recordContext(); } - @Override - public void close() { + public void close(final Runnable userClose) { if (!isCleared()) { // This doesn't necessarily indicate an issue; it just should only ever // happen if the task is closed dirty, but unfortunately we can't tell @@ -369,11 +299,7 @@ public void close() { punctuator.cancel(); threadPool.removeProcessor(asyncProcessorName, taskId.partition()); - if (userProcessor != null) { - userProcessor.close(); - } else { - userFixedKeyProcessor.close(); - } + userClose.run(); } private static void registerFlushListenerForStoreBuilders( @@ -660,7 +586,7 @@ private StreamThreadProcessorContext.PreviousRecordContextAndNode preFinalize( throw new IllegalStateException(String.format( "routed event from %d to the wrong processor for %s", event.partition(), - taskId.toString())); + taskId)); } // Make sure to check for a failed event before preparing finalization. prepareToFinalizeEvent diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 04abdf898..ff81c0886 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -91,6 +91,7 @@ public static boolean isStreamThreadOrAsyncThread( ); } else { + // TODO: lift this restriction by finding another way to get the storeType from the builder throw new IllegalStateException(String.format( "Detected the StoreBuilder for %s was not created via the ResponsiveStores factory, " + "please ensure that all store builders and suppliers are provided through the " diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java new file mode 100644 index 000000000..3567d777e --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java @@ -0,0 +1,150 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.api.async.internals; + +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.responsiveConfig; + +import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; + +/** + * This class is in charge of two things: + * 1. deduplicating and delegating code between the regular and fixed-key processor versions + * 2. disabling async processing altogether if the thread pool size is 0 + */ +public class MaybeAsyncProcessor + implements Processor, FixedKeyProcessor { + + // Exactly one of these is non-null and the other is null + private final Processor userProcessor; + private final FixedKeyProcessor userFixedKeyProcessor; + + private final Map> connectedStoreBuilders; + + private Optional> asyncProcessor = null; + + public static MaybeAsyncProcessor createProcessor( + final Processor userProcessor, + final Map> connectedStoreBuilders + ) { + return new MaybeAsyncProcessor<>(userProcessor, null, connectedStoreBuilders); + } + + public static MaybeAsyncProcessor createFixedKeyProcessor( + final FixedKeyProcessor userProcessor, + final Map> connectedStoreBuilders + ) { + return new MaybeAsyncProcessor<>(null, userProcessor, connectedStoreBuilders); + } + + private MaybeAsyncProcessor( + final Processor userProcessor, + final FixedKeyProcessor userFixedKeyProcessor, + final Map> connectedStoreBuilders + ) { + this.userProcessor = userProcessor; + this.userFixedKeyProcessor = userFixedKeyProcessor; + this.connectedStoreBuilders = connectedStoreBuilders; + + if (userProcessor == null && userFixedKeyProcessor == null) { + throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null"); + } else if (userProcessor != null && userFixedKeyProcessor != null) { + throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null"); + } + } + + @Override + public void init(final ProcessorContext context) { + sharedInit( + (InternalProcessorContext) context, + () -> userProcessor.init(context) + ); + } + + @Override + @SuppressWarnings("unchecked") // needed for fixed-key only since KOut is replaced with KIn + public void init(final FixedKeyProcessorContext context) { + sharedInit( + (InternalProcessorContext) context, + () -> userFixedKeyProcessor.init(context) + ); + } + + private void sharedInit( + final InternalProcessorContext context, + final Runnable userInit + ) { + final int asyncThreadPoolSize = + responsiveConfig(context.appConfigs()).getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); + + if (asyncThreadPoolSize > 0) { + this.asyncProcessor = Optional.of( + new AsyncProcessor<>(connectedStoreBuilders, context, userInit) + ); + } else { + this.asyncProcessor = Optional.empty(); + userInit.run(); + } + } + + @Override + public void process(final Record record) { + final Runnable userProcess = () -> userProcessor.process(record); + + if (asyncProcessor.isPresent()) { + asyncProcessor.get().processRegular(record, userProcess); + } else { + userProcess.run(); + } + } + + @Override + public void process(final FixedKeyRecord record) { + final Runnable userProcess = () -> userFixedKeyProcessor.process(record); + + if (asyncProcessor.isPresent()) { + asyncProcessor.get().processFixedKey(record, userProcess); + } else { + userProcess.run(); + } + } + + @Override + public void close() { + final Runnable userClose; + if (userProcessor != null) { + userClose = userProcessor::close; + } else { + userClose = userFixedKeyProcessor::close; + } + + if (asyncProcessor.isPresent()) { + asyncProcessor.get().close(userClose); + } else { + userClose.run(); + } + } +}