From 3aff8c2d3a09ec9f7a8169bc60ffe154bbc04122 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 6 Jun 2024 18:14:48 -0700 Subject: [PATCH 1/3] add wrapper around AsyncProcessor --- .../api/async/internals/AsyncProcessor.java | 128 +++++++-------- .../async/internals/MaybeAsyncProcessor.java | 146 ++++++++++++++++++ 2 files changed, 198 insertions(+), 76 deletions(-) create mode 100644 kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 748653cea..0b40bb8cb 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -69,12 +69,7 @@ * -Coordinates the handoff of records between the StreamThread and AyncThreads * -The starting and ending point of all async events -- see {@link AsyncEvent} */ -public class AsyncProcessor - implements Processor, FixedKeyProcessor { - - // Exactly one of these is non-null and the other is null - private final Processor userProcessor; - private final FixedKeyProcessor userFixedKeyProcessor; +public class AsyncProcessor { private final Map> connectedStoreBuilders; @@ -84,12 +79,36 @@ public class AsyncProcessor // the stream thread should access this. private final Map pendingEvents = new ConcurrentHashMap<>(); + private final String logPrefix; + private final Logger log; + + private final String streamThreadName; + private final String asyncProcessorName; + private final TaskId taskId; + + private final AsyncThreadPool threadPool; + private final SchedulingQueue schedulingQueue; + private final FinalizingQueue finalizingQueue; + + private final Cancellable punctuator; + + // the context passed to us in init, ie the one created for this task and owned by Kafka Streams + private final ProcessingContext taskContext; + + // the async context owned by the StreamThread that is running this processor/task + private final StreamThreadProcessorContext streamThreadContext; + + // the context we pass in to the user so it routes to the actual context based on calling thread + private final AsyncUserProcessorContext userContext; + + private boolean hasProcessedSomething = false; // This is set at most once. When its set, the thread should immediately throw, and no longer // try to process further events for this processor. This minimizes the chance of producing // bad results, particularly with ALOS. private FatalAsyncException fatalException = null; +<<<<<<< Updated upstream // Everything below this line is effectively final and just has to be initialized in #init // private String logPrefix; @@ -120,70 +139,15 @@ public class AsyncProcessor public static AsyncProcessor createAsyncProcessor( final Processor userProcessor, final Map> connectedStoreBuilders +======= + public AsyncProcessor( + final Map> connectedStoreBuilders, + final InternalProcessorContext internalContext, + final Runnable userInit +>>>>>>> Stashed changes ) { - return new AsyncProcessor<>(userProcessor, null, connectedStoreBuilders); - } - - public static AsyncProcessor createAsyncFixedKeyProcessor( - final FixedKeyProcessor userProcessor, - final Map> connectedStoreBuilders - ) { - return new AsyncProcessor<>(null, userProcessor, connectedStoreBuilders); - } - - // Note: the constructor will be called from the main application thread (ie the - // one that creates/starts the KafkaStreams object) so we have to delay the creation - // of most objects until #init since (a) that will be invoked by the actual - // StreamThread processing this, and (b) we need the context supplied to init for - // some of the setup - private AsyncProcessor( - final Processor userProcessor, - final FixedKeyProcessor userFixedKeyProcessor, - final Map> connectedStoreBuilders - ) { - this.userProcessor = userProcessor; - this.userFixedKeyProcessor = userFixedKeyProcessor; this.connectedStoreBuilders = connectedStoreBuilders; - if (userProcessor == null && userFixedKeyProcessor == null) { - throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null"); - } else if (userProcessor != null && userFixedKeyProcessor != null) { - throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null"); - } - } - - @Override - public void init(final ProcessorContext context) { - - initFields((InternalProcessorContext) context); - - userProcessor.init(userContext); - - completeInitialization(); - } - - // Note: we have to cast and suppress warnings in this version of #init but - // not the other due to the KOut parameter being squashed into KIn in the - // fixed-key version of the processor. However, we know this cast is safe, - // since by definition KIn and KOut are the same type - @SuppressWarnings("unchecked") - @Override - public void init(final FixedKeyProcessorContext context) { - - initFields((InternalProcessorContext) context); - - userFixedKeyProcessor.init((FixedKeyProcessorContext) userContext); - - completeInitialization(); - } - - /** - * Performs the first half of initialization by setting all the class fields - * that have to wait for the context to be passed in to #init to be initialized. - */ - private void initFields( - final InternalProcessorContext internalContext - ) { this.taskContext = internalContext; this.streamThreadName = Thread.currentThread().getName(); @@ -235,6 +199,11 @@ private void initFields( PunctuationType.WALL_CLOCK_TIME, this::punctuate ); + + // calls #init on the user's processor + userInit.run(); + + completeInitialization(); } /** @@ -278,8 +247,7 @@ void assertQueuesEmpty() { } } - @Override - public void process(final Record record) { + public void process(final Record record, final Runnable userProcess) { assertQueuesEmptyOnFirstProcess(); final AsyncEvent newEvent = new AsyncEvent( @@ -290,15 +258,18 @@ public void process(final Record record) { extractRecordContext(taskContext), taskContext.currentStreamTimeMs(), taskContext.currentSystemTimeMs(), +<<<<<<< Updated upstream () -> userProcessor.process(record), List.of(metricsRecorder::recordStateTransition) +======= + userProcess +>>>>>>> Stashed changes ); processNewAsyncEvent(newEvent); } - @Override - public void process(final FixedKeyRecord record) { + public void process(final FixedKeyRecord record, final Runnable userProcess) { assertQueuesEmptyOnFirstProcess(); final AsyncEvent newEvent = new AsyncEvent( @@ -309,8 +280,12 @@ public void process(final FixedKeyRecord record) { extractRecordContext(taskContext), taskContext.currentStreamTimeMs(), taskContext.currentSystemTimeMs(), +<<<<<<< Updated upstream () -> userFixedKeyProcessor.process(record), List.of(metricsRecorder::recordStateTransition) +======= + userProcess +>>>>>>> Stashed changes ); processNewAsyncEvent(newEvent); @@ -351,8 +326,13 @@ private ProcessorRecordContext extractRecordContext(final ProcessingContext cont return ((InternalProcessorContext) context).recordContext(); } +<<<<<<< Updated upstream @Override public void close() { +======= + public void close(final Runnable userClose) { + +>>>>>>> Stashed changes if (!isCleared()) { // This doesn't necessarily indicate an issue; it just should only ever // happen if the task is closed dirty, but unfortunately we can't tell @@ -369,11 +349,7 @@ public void close() { punctuator.cancel(); threadPool.removeProcessor(asyncProcessorName, taskId.partition()); - if (userProcessor != null) { - userProcessor.close(); - } else { - userFixedKeyProcessor.close(); - } + userClose.run(); } private static void registerFlushListenerForStoreBuilders( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java new file mode 100644 index 000000000..ebb7bd1ae --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java @@ -0,0 +1,146 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.api.async.internals; + +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.responsiveConfig; + +import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; + +/** + * This class is in charge of two things: + * 1. deduplicating and delegating code between the regular and fixed-key processor versions + * 2. disabling async processing altogether if the thread pool size is 0 + */ +public class MaybeAsyncProcessor + implements Processor, FixedKeyProcessor { + + // Exactly one of these is non-null and the other is null + private final Processor userProcessor; + private final FixedKeyProcessor userFixedKeyProcessor; + + private final Map> connectedStoreBuilders; + + private Optional> asyncProcessor = null; + + public static MaybeAsyncProcessor createProcessor( + final Processor userProcessor, + final Map> connectedStoreBuilders + ) { + return new MaybeAsyncProcessor<>(userProcessor, null, connectedStoreBuilders); + } + + public static MaybeAsyncProcessor createFixedKeyProcessor( + final FixedKeyProcessor userProcessor, + final Map> connectedStoreBuilders + ) { + return new MaybeAsyncProcessor<>(null, userProcessor, connectedStoreBuilders); + } + + private MaybeAsyncProcessor( + final Processor userProcessor, + final FixedKeyProcessor userFixedKeyProcessor, + final Map> connectedStoreBuilders + ) { + this.userProcessor = userProcessor; + this.userFixedKeyProcessor = userFixedKeyProcessor; + this.connectedStoreBuilders = connectedStoreBuilders; + + if (userProcessor == null && userFixedKeyProcessor == null) { + throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null"); + } else if (userProcessor != null && userFixedKeyProcessor != null) { + throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null"); + } + } + + @Override + public void init(final ProcessorContext context) { + sharedInit( + (InternalProcessorContext) context, + () -> userProcessor.init(context) + ); + } + + @Override + @SuppressWarnings("unchecked") // needed for fixed-key only since KOut is replaced with KIn + public void init(final FixedKeyProcessorContext context) { + sharedInit( + (InternalProcessorContext) context, + () -> userFixedKeyProcessor.init(context) + ); + } + + private void sharedInit(final InternalProcessorContext context, final Runnable userInit) { + final int asyncThreadPoolSize = + responsiveConfig(context.appConfigs()).getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); + + if (asyncThreadPoolSize > 0) { + this.asyncProcessor = Optional.of(new AsyncProcessor<>(connectedStoreBuilders, context, userInit)); + } else { + this.asyncProcessor = Optional.empty(); + userInit.run(); + } + } + + @Override + public void process(final Record record) { + final Runnable userProcess = () -> userProcessor.process(record); + + if (asyncProcessor.isPresent()) { + asyncProcessor.get().process(record, userProcess); + } else { + userProcess.run(); + } + } + + @Override + public void process(final FixedKeyRecord record) { + final Runnable userProcess = () -> userFixedKeyProcessor.process(record); + + if (asyncProcessor.isPresent()) { + asyncProcessor.get().process(record, userProcess); + } else { + userProcess.run(); + } + } + + @Override + public void close() { + final Runnable userClose; + if (userProcessor != null) { + userClose = userProcessor::close; + } else { + userClose = userFixedKeyProcessor::close; + } + + if (asyncProcessor.isPresent()) { + asyncProcessor.get().close(userClose); + } else { + userClose.run(); + } + } +} From a20139524cb52f1bf8a6c01c01ef526d1a997001 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 6 Jun 2024 20:51:48 -0700 Subject: [PATCH 2/3] fix git issues and update everywhere --- .../async/AsyncFixedKeyProcessorSupplier.java | 7 +- .../api/async/AsyncProcessorSupplier.java | 6 +- .../api/async/internals/AsyncProcessor.java | 86 ++++--------------- .../async/internals/MaybeAsyncProcessor.java | 5 +- 4 files changed, 26 insertions(+), 78 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java index 63a212204..05ac889e1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java @@ -16,10 +16,9 @@ package dev.responsive.kafka.api.async; -import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncFixedKeyProcessor; import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; -import dev.responsive.kafka.api.async.internals.AsyncProcessor; +import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; import java.util.Map; @@ -75,8 +74,8 @@ private AsyncFixedKeyProcessorSupplier( } @Override - public AsyncProcessor get() { - return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + public MaybeAsyncProcessor get() { + return MaybeAsyncProcessor.createFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java index ddcb30e4b..180ee787c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java @@ -16,10 +16,10 @@ package dev.responsive.kafka.api.async; -import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncProcessor; import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; import dev.responsive.kafka.api.async.internals.AsyncProcessor; +import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; import java.util.Map; @@ -155,8 +155,8 @@ private AsyncProcessorSupplier( } @Override - public AsyncProcessor get() { - return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + public MaybeAsyncProcessor get() { + return MaybeAsyncProcessor.createProcessor(userProcessorSupplier.get(), asyncStoreBuilders); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 0b40bb8cb..ad8e02c29 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -52,11 +52,8 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.api.FixedKeyProcessor; -import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.ProcessingContext; -import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -71,14 +68,6 @@ */ public class AsyncProcessor { - private final Map> connectedStoreBuilders; - - // Tracks all pending events, ie from moment of creation to end of finalization/transition - // to "DONE" state. Used to make sure all events are flushed during a commit. - // We use a concurrent map here so metrics readers can query its size. Otherwise, only - // the stream thread should access this. - private final Map pendingEvents = new ConcurrentHashMap<>(); - private final String logPrefix; private final Logger log; @@ -90,6 +79,14 @@ public class AsyncProcessor { private final SchedulingQueue schedulingQueue; private final FinalizingQueue finalizingQueue; + private final Map> connectedStoreBuilders; + + // Tracks all pending events, ie from moment of creation to end of finalization/transition + // to "DONE" state. Used to make sure all events are flushed during a commit. + // We use a concurrent map here so metrics readers can query its size. Otherwise, only + // the stream thread should access this. + private final Map pendingEvents = new ConcurrentHashMap<>(); + private final Cancellable punctuator; // the context passed to us in init, ie the one created for this task and owned by Kafka Streams @@ -101,6 +98,8 @@ public class AsyncProcessor { // the context we pass in to the user so it routes to the actual context based on calling thread private final AsyncUserProcessorContext userContext; + private final AsyncProcessorMetricsRecorder metricsRecorder; + private boolean hasProcessedSomething = false; // This is set at most once. When its set, the thread should immediately throw, and no longer @@ -108,46 +107,12 @@ public class AsyncProcessor { // bad results, particularly with ALOS. private FatalAsyncException fatalException = null; -<<<<<<< Updated upstream - // Everything below this line is effectively final and just has to be initialized in #init // - - private String logPrefix; - private Logger log; - - private String streamThreadName; - private String asyncProcessorName; - private TaskId taskId; - - private AsyncThreadPool threadPool; - private SchedulingQueue schedulingQueue; - private FinalizingQueue finalizingQueue; - - private Cancellable punctuator; - - // the context passed to us in init, ie the one created for this task and owned by Kafka Streams - private ProcessingContext taskContext; - - // the async context owned by the StreamThread that is running this processor/task - private StreamThreadProcessorContext streamThreadContext; - - // the context we pass in to the user so it routes to the actual context based on calling thread - private AsyncUserProcessorContext userContext; - private boolean hasProcessedSomething = false; - - private AsyncProcessorMetricsRecorder metricsRecorder; - - public static AsyncProcessor createAsyncProcessor( - final Processor userProcessor, - final Map> connectedStoreBuilders -======= public AsyncProcessor( final Map> connectedStoreBuilders, final InternalProcessorContext internalContext, final Runnable userInit ->>>>>>> Stashed changes ) { this.connectedStoreBuilders = connectedStoreBuilders; - this.taskContext = internalContext; this.streamThreadName = Thread.currentThread().getName(); @@ -200,7 +165,6 @@ public AsyncProcessor( this::punctuate ); - // calls #init on the user's processor userInit.run(); completeInitialization(); @@ -247,7 +211,7 @@ void assertQueuesEmpty() { } } - public void process(final Record record, final Runnable userProcess) { + public void processRegular(final Record record, final Runnable userProcess) { assertQueuesEmptyOnFirstProcess(); final AsyncEvent newEvent = new AsyncEvent( @@ -258,18 +222,14 @@ public void process(final Record record, final Runnable userProcess) { extractRecordContext(taskContext), taskContext.currentStreamTimeMs(), taskContext.currentSystemTimeMs(), -<<<<<<< Updated upstream - () -> userProcessor.process(record), + userProcess, List.of(metricsRecorder::recordStateTransition) -======= - userProcess ->>>>>>> Stashed changes ); - processNewAsyncEvent(newEvent); + process(newEvent); } - public void process(final FixedKeyRecord record, final Runnable userProcess) { + public void processFixedKey(final FixedKeyRecord record, final Runnable userProcess) { assertQueuesEmptyOnFirstProcess(); final AsyncEvent newEvent = new AsyncEvent( @@ -280,18 +240,14 @@ public void process(final FixedKeyRecord record, final Runnable userPr extractRecordContext(taskContext), taskContext.currentStreamTimeMs(), taskContext.currentSystemTimeMs(), -<<<<<<< Updated upstream - () -> userFixedKeyProcessor.process(record), + userProcess, List.of(metricsRecorder::recordStateTransition) -======= - userProcess ->>>>>>> Stashed changes ); - processNewAsyncEvent(newEvent); + process(newEvent); } - private void processNewAsyncEvent(final AsyncEvent event) { + private void process(final AsyncEvent event) { if (fatalException != null) { log.error("process called when processor already hit fatal exception", fatalException); throw new IllegalStateException( @@ -326,13 +282,7 @@ private ProcessorRecordContext extractRecordContext(final ProcessingContext cont return ((InternalProcessorContext) context).recordContext(); } -<<<<<<< Updated upstream - @Override - public void close() { -======= public void close(final Runnable userClose) { - ->>>>>>> Stashed changes if (!isCleared()) { // This doesn't necessarily indicate an issue; it just should only ever // happen if the task is closed dirty, but unfortunately we can't tell @@ -636,7 +586,7 @@ private StreamThreadProcessorContext.PreviousRecordContextAndNode preFinalize( throw new IllegalStateException(String.format( "routed event from %d to the wrong processor for %s", event.partition(), - taskId.toString())); + taskId)); } // Make sure to check for a failed event before preparing finalization. prepareToFinalizeEvent diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java index ebb7bd1ae..a86115e41 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java @@ -20,7 +20,6 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.responsiveConfig; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; -import dev.responsive.kafka.api.config.ResponsiveConfig; import java.util.Map; import java.util.Optional; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; @@ -111,7 +110,7 @@ public void process(final Record record) { final Runnable userProcess = () -> userProcessor.process(record); if (asyncProcessor.isPresent()) { - asyncProcessor.get().process(record, userProcess); + asyncProcessor.get().processRegular(record, userProcess); } else { userProcess.run(); } @@ -122,7 +121,7 @@ public void process(final FixedKeyRecord record) { final Runnable userProcess = () -> userFixedKeyProcessor.process(record); if (asyncProcessor.isPresent()) { - asyncProcessor.get().process(record, userProcess); + asyncProcessor.get().processFixedKey(record, userProcess); } else { userProcess.run(); } From 4016bc7b093501c0fea4f5af7189788db83ed3a2 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 6 Jun 2024 22:10:16 -0700 Subject: [PATCH 3/3] checkstyle --- .../kafka/api/async/AsyncFixedKeyProcessorSupplier.java | 5 ++++- .../kafka/api/async/AsyncProcessorSupplier.java | 1 - .../responsive/kafka/api/async/internals/AsyncUtils.java | 1 + .../kafka/api/async/internals/MaybeAsyncProcessor.java | 9 +++++++-- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java index 05ac889e1..1e780ec92 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java @@ -75,7 +75,10 @@ private AsyncFixedKeyProcessorSupplier( @Override public MaybeAsyncProcessor get() { - return MaybeAsyncProcessor.createFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); + return MaybeAsyncProcessor.createFixedKeyProcessor( + userProcessorSupplier.get(), + asyncStoreBuilders + ); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java index 180ee787c..353558714 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java @@ -18,7 +18,6 @@ import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders; -import dev.responsive.kafka.api.async.internals.AsyncProcessor; import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import java.util.HashSet; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 04abdf898..ff81c0886 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -91,6 +91,7 @@ public static boolean isStreamThreadOrAsyncThread( ); } else { + // TODO: lift this restriction by finding another way to get the storeType from the builder throw new IllegalStateException(String.format( "Detected the StoreBuilder for %s was not created via the ResponsiveStores factory, " + "please ensure that all store builders and suppliers are provided through the " diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java index a86115e41..3567d777e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/MaybeAsyncProcessor.java @@ -93,12 +93,17 @@ public void init(final FixedKeyProcessorContext context) { ); } - private void sharedInit(final InternalProcessorContext context, final Runnable userInit) { + private void sharedInit( + final InternalProcessorContext context, + final Runnable userInit + ) { final int asyncThreadPoolSize = responsiveConfig(context.appConfigs()).getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); if (asyncThreadPoolSize > 0) { - this.asyncProcessor = Optional.of(new AsyncProcessor<>(connectedStoreBuilders, context, userInit)); + this.asyncProcessor = Optional.of( + new AsyncProcessor<>(connectedStoreBuilders, context, userInit) + ); } else { this.asyncProcessor = Optional.empty(); userInit.run();