Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add wrapper around AsyncProcessor to enable async processing engine only with async threads configured #306

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package dev.responsive.kafka.api.async;

import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncFixedKeyProcessor;
import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders;

import dev.responsive.kafka.api.async.internals.AsyncProcessor;
import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor;
import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -75,8 +74,11 @@ private AsyncFixedKeyProcessorSupplier(
}

@Override
public AsyncProcessor<KIn, VIn, KIn, VOut> get() {
return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
public MaybeAsyncProcessor<KIn, VIn, KIn, VOut> get() {
return MaybeAsyncProcessor.createFixedKeyProcessor(
userProcessorSupplier.get(),
asyncStoreBuilders
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package dev.responsive.kafka.api.async;

import static dev.responsive.kafka.api.async.internals.AsyncProcessor.createAsyncProcessor;
import static dev.responsive.kafka.api.async.internals.AsyncUtils.initializeAsyncBuilders;

import dev.responsive.kafka.api.async.internals.AsyncProcessor;
import dev.responsive.kafka.api.async.internals.MaybeAsyncProcessor;
import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -155,8 +154,8 @@ private AsyncProcessorSupplier(
}

@Override
public AsyncProcessor<KIn, VIn, KOut, VOut> get() {
return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
public MaybeAsyncProcessor<KIn, VIn, KOut, VOut> get() {
return MaybeAsyncProcessor.createProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,8 @@
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
Expand All @@ -69,12 +66,18 @@
* -Coordinates the handoff of records between the StreamThread and AyncThreads
* -The starting and ending point of all async events -- see {@link AsyncEvent}
*/
public class AsyncProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VIn, KOut, VOut>, FixedKeyProcessor<KIn, VIn, VOut> {
public class AsyncProcessor<KIn, VIn, KOut, VOut> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this no longer implements the Processor interfaces, I think I'll rename it to "AsyncProcessingEngine" or something to that effect (suggestions welcome). And then rename the new wrapper class that creates/delegates to this class to "AsyncProcessor". But I'll wait until the PR has been reviewed/approved to do the renamings since that might impact a bunch of code


// Exactly one of these is non-null and the other is null
private final Processor<KIn, VIn, KOut, VOut> userProcessor;
private final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor;
private final String logPrefix;
private final Logger log;

private final String streamThreadName;
private final String asyncProcessorName;
private final TaskId taskId;

private final AsyncThreadPool threadPool;
private final SchedulingQueue<KIn> schedulingQueue;
private final FinalizingQueue finalizingQueue;
Comment on lines +71 to +80
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about this change is that we only instantiate the AsyncProcessor after the processor is initialized, so we can actually make all these variables final. I was hoping to do this anyway


private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders;

Expand All @@ -84,106 +87,32 @@ public class AsyncProcessor<KIn, VIn, KOut, VOut>
// the stream thread should access this.
private final Map<AsyncEvent, Object> pendingEvents = new ConcurrentHashMap<>();


// This is set at most once. When its set, the thread should immediately throw, and no longer
// try to process further events for this processor. This minimizes the chance of producing
// bad results, particularly with ALOS.
private FatalAsyncException fatalException = null;

// Everything below this line is effectively final and just has to be initialized in #init //

private String logPrefix;
private Logger log;

private String streamThreadName;
private String asyncProcessorName;
private TaskId taskId;

private AsyncThreadPool threadPool;
private SchedulingQueue<KIn> schedulingQueue;
private FinalizingQueue finalizingQueue;

private Cancellable punctuator;
private final Cancellable punctuator;

// the context passed to us in init, ie the one created for this task and owned by Kafka Streams
private ProcessingContext taskContext;
private final ProcessingContext taskContext;

// the async context owned by the StreamThread that is running this processor/task
private StreamThreadProcessorContext<KOut, VOut> streamThreadContext;
private final StreamThreadProcessorContext<KOut, VOut> streamThreadContext;

// the context we pass in to the user so it routes to the actual context based on calling thread
private AsyncUserProcessorContext<KOut, VOut> userContext;
private boolean hasProcessedSomething = false;
private final AsyncUserProcessorContext<KOut, VOut> userContext;

private AsyncProcessorMetricsRecorder metricsRecorder;
private final AsyncProcessorMetricsRecorder metricsRecorder;

public static <KIn, VIn, KOut, VOut> AsyncProcessor<KIn, VIn, KOut, VOut> createAsyncProcessor(
final Processor<KIn, VIn, KOut, VOut> userProcessor,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders
) {
return new AsyncProcessor<>(userProcessor, null, connectedStoreBuilders);
}
private boolean hasProcessedSomething = false;

public static <KIn, VIn, VOut> AsyncProcessor<KIn, VIn, KIn, VOut> createAsyncFixedKeyProcessor(
final FixedKeyProcessor<KIn, VIn, VOut> userProcessor,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders
) {
return new AsyncProcessor<>(null, userProcessor, connectedStoreBuilders);
}
// This is set at most once. When its set, the thread should immediately throw, and no longer
// try to process further events for this processor. This minimizes the chance of producing
// bad results, particularly with ALOS.
private FatalAsyncException fatalException = null;

// Note: the constructor will be called from the main application thread (ie the
// one that creates/starts the KafkaStreams object) so we have to delay the creation
// of most objects until #init since (a) that will be invoked by the actual
// StreamThread processing this, and (b) we need the context supplied to init for
// some of the setup
private AsyncProcessor(
final Processor<KIn, VIn, KOut, VOut> userProcessor,
final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders
public AsyncProcessor(
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders,
final InternalProcessorContext<KOut, VOut> internalContext,
final Runnable userInit
) {
this.userProcessor = userProcessor;
this.userFixedKeyProcessor = userFixedKeyProcessor;
this.connectedStoreBuilders = connectedStoreBuilders;

if (userProcessor == null && userFixedKeyProcessor == null) {
throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null");
} else if (userProcessor != null && userFixedKeyProcessor != null) {
throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null");
}
}

@Override
public void init(final ProcessorContext<KOut, VOut> context) {

initFields((InternalProcessorContext<KOut, VOut>) context);

userProcessor.init(userContext);

completeInitialization();
}

// Note: we have to cast and suppress warnings in this version of #init but
// not the other due to the KOut parameter being squashed into KIn in the
// fixed-key version of the processor. However, we know this cast is safe,
// since by definition KIn and KOut are the same type
@SuppressWarnings("unchecked")
@Override
public void init(final FixedKeyProcessorContext<KIn, VOut> context) {

initFields((InternalProcessorContext<KOut, VOut>) context);

userFixedKeyProcessor.init((FixedKeyProcessorContext<KIn, VOut>) userContext);

completeInitialization();
}

/**
* Performs the first half of initialization by setting all the class fields
* that have to wait for the context to be passed in to #init to be initialized.
*/
private void initFields(
final InternalProcessorContext<KOut, VOut> internalContext
) {
this.taskContext = internalContext;

this.streamThreadName = Thread.currentThread().getName();
Expand Down Expand Up @@ -235,6 +164,10 @@ private void initFields(
PunctuationType.WALL_CLOCK_TIME,
this::punctuate
);

userInit.run();

completeInitialization();
}

/**
Expand Down Expand Up @@ -278,8 +211,7 @@ void assertQueuesEmpty() {
}
}

@Override
public void process(final Record<KIn, VIn> record) {
public void processRegular(final Record<KIn, VIn> record, final Runnable userProcess) {
assertQueuesEmptyOnFirstProcess();

final AsyncEvent newEvent = new AsyncEvent(
Expand All @@ -290,15 +222,14 @@ public void process(final Record<KIn, VIn> record) {
extractRecordContext(taskContext),
taskContext.currentStreamTimeMs(),
taskContext.currentSystemTimeMs(),
() -> userProcessor.process(record),
userProcess,
List.of(metricsRecorder::recordStateTransition)
);

processNewAsyncEvent(newEvent);
process(newEvent);
}

@Override
public void process(final FixedKeyRecord<KIn, VIn> record) {
public void processFixedKey(final FixedKeyRecord<KIn, VIn> record, final Runnable userProcess) {
assertQueuesEmptyOnFirstProcess();

final AsyncEvent newEvent = new AsyncEvent(
Expand All @@ -309,14 +240,14 @@ public void process(final FixedKeyRecord<KIn, VIn> record) {
extractRecordContext(taskContext),
taskContext.currentStreamTimeMs(),
taskContext.currentSystemTimeMs(),
() -> userFixedKeyProcessor.process(record),
userProcess,
List.of(metricsRecorder::recordStateTransition)
);

processNewAsyncEvent(newEvent);
process(newEvent);
}

private void processNewAsyncEvent(final AsyncEvent event) {
private void process(final AsyncEvent event) {
if (fatalException != null) {
log.error("process called when processor already hit fatal exception", fatalException);
throw new IllegalStateException(
Expand Down Expand Up @@ -351,8 +282,7 @@ private ProcessorRecordContext extractRecordContext(final ProcessingContext cont
return ((InternalProcessorContext<KOut, VOut>) context).recordContext();
}

@Override
public void close() {
public void close(final Runnable userClose) {
if (!isCleared()) {
// This doesn't necessarily indicate an issue; it just should only ever
// happen if the task is closed dirty, but unfortunately we can't tell
Expand All @@ -369,11 +299,7 @@ public void close() {
punctuator.cancel();
threadPool.removeProcessor(asyncProcessorName, taskId.partition());

if (userProcessor != null) {
userProcessor.close();
} else {
userFixedKeyProcessor.close();
}
userClose.run();
}

private static void registerFlushListenerForStoreBuilders(
Expand Down Expand Up @@ -660,7 +586,7 @@ private StreamThreadProcessorContext.PreviousRecordContextAndNode preFinalize(
throw new IllegalStateException(String.format(
"routed event from %d to the wrong processor for %s",
event.partition(),
taskId.toString()));
taskId));
}

// Make sure to check for a failed event before preparing finalization. prepareToFinalizeEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public static boolean isStreamThreadOrAsyncThread(
);

} else {
// TODO: lift this restriction by finding another way to get the storeType from the builder
throw new IllegalStateException(String.format(
"Detected the StoreBuilder for %s was not created via the ResponsiveStores factory, "
+ "please ensure that all store builders and suppliers are provided through the "
Expand Down
Loading
Loading