-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: add wrapper around AsyncProcessor to enable async processing engine only with async threads configured #306
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,11 +52,8 @@ | |
import org.apache.kafka.streams.processor.Cancellable; | ||
import org.apache.kafka.streams.processor.PunctuationType; | ||
import org.apache.kafka.streams.processor.TaskId; | ||
import org.apache.kafka.streams.processor.api.FixedKeyProcessor; | ||
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; | ||
import org.apache.kafka.streams.processor.api.FixedKeyRecord; | ||
import org.apache.kafka.streams.processor.api.ProcessingContext; | ||
import org.apache.kafka.streams.processor.api.Processor; | ||
import org.apache.kafka.streams.processor.api.ProcessorContext; | ||
import org.apache.kafka.streams.processor.api.Record; | ||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; | ||
|
@@ -69,12 +66,18 @@ | |
* -Coordinates the handoff of records between the StreamThread and AyncThreads | ||
* -The starting and ending point of all async events -- see {@link AsyncEvent} | ||
*/ | ||
public class AsyncProcessor<KIn, VIn, KOut, VOut> | ||
implements Processor<KIn, VIn, KOut, VOut>, FixedKeyProcessor<KIn, VIn, VOut> { | ||
public class AsyncProcessor<KIn, VIn, KOut, VOut> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this no longer implements the |
||
|
||
// Exactly one of these is non-null and the other is null | ||
private final Processor<KIn, VIn, KOut, VOut> userProcessor; | ||
private final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor; | ||
private final String logPrefix; | ||
private final Logger log; | ||
|
||
private final String streamThreadName; | ||
private final String asyncProcessorName; | ||
private final TaskId taskId; | ||
|
||
private final AsyncThreadPool threadPool; | ||
private final SchedulingQueue<KIn> schedulingQueue; | ||
private final FinalizingQueue finalizingQueue; | ||
Comment on lines
+71
to
+80
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nice thing about this change is that we only instantiate the AsyncProcessor after the processor is initialized, so we can actually make all these variables final. I was hoping to do this anyway |
||
|
||
private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders; | ||
|
||
|
@@ -84,106 +87,32 @@ public class AsyncProcessor<KIn, VIn, KOut, VOut> | |
// the stream thread should access this. | ||
private final Map<AsyncEvent, Object> pendingEvents = new ConcurrentHashMap<>(); | ||
|
||
|
||
// This is set at most once. When its set, the thread should immediately throw, and no longer | ||
// try to process further events for this processor. This minimizes the chance of producing | ||
// bad results, particularly with ALOS. | ||
private FatalAsyncException fatalException = null; | ||
|
||
// Everything below this line is effectively final and just has to be initialized in #init // | ||
|
||
private String logPrefix; | ||
private Logger log; | ||
|
||
private String streamThreadName; | ||
private String asyncProcessorName; | ||
private TaskId taskId; | ||
|
||
private AsyncThreadPool threadPool; | ||
private SchedulingQueue<KIn> schedulingQueue; | ||
private FinalizingQueue finalizingQueue; | ||
|
||
private Cancellable punctuator; | ||
private final Cancellable punctuator; | ||
|
||
// the context passed to us in init, ie the one created for this task and owned by Kafka Streams | ||
private ProcessingContext taskContext; | ||
private final ProcessingContext taskContext; | ||
|
||
// the async context owned by the StreamThread that is running this processor/task | ||
private StreamThreadProcessorContext<KOut, VOut> streamThreadContext; | ||
private final StreamThreadProcessorContext<KOut, VOut> streamThreadContext; | ||
|
||
// the context we pass in to the user so it routes to the actual context based on calling thread | ||
private AsyncUserProcessorContext<KOut, VOut> userContext; | ||
private boolean hasProcessedSomething = false; | ||
private final AsyncUserProcessorContext<KOut, VOut> userContext; | ||
|
||
private AsyncProcessorMetricsRecorder metricsRecorder; | ||
private final AsyncProcessorMetricsRecorder metricsRecorder; | ||
|
||
public static <KIn, VIn, KOut, VOut> AsyncProcessor<KIn, VIn, KOut, VOut> createAsyncProcessor( | ||
final Processor<KIn, VIn, KOut, VOut> userProcessor, | ||
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders | ||
) { | ||
return new AsyncProcessor<>(userProcessor, null, connectedStoreBuilders); | ||
} | ||
private boolean hasProcessedSomething = false; | ||
|
||
public static <KIn, VIn, VOut> AsyncProcessor<KIn, VIn, KIn, VOut> createAsyncFixedKeyProcessor( | ||
final FixedKeyProcessor<KIn, VIn, VOut> userProcessor, | ||
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders | ||
) { | ||
return new AsyncProcessor<>(null, userProcessor, connectedStoreBuilders); | ||
} | ||
// This is set at most once. When its set, the thread should immediately throw, and no longer | ||
// try to process further events for this processor. This minimizes the chance of producing | ||
// bad results, particularly with ALOS. | ||
private FatalAsyncException fatalException = null; | ||
|
||
// Note: the constructor will be called from the main application thread (ie the | ||
// one that creates/starts the KafkaStreams object) so we have to delay the creation | ||
// of most objects until #init since (a) that will be invoked by the actual | ||
// StreamThread processing this, and (b) we need the context supplied to init for | ||
// some of the setup | ||
private AsyncProcessor( | ||
final Processor<KIn, VIn, KOut, VOut> userProcessor, | ||
final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor, | ||
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders | ||
public AsyncProcessor( | ||
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders, | ||
final InternalProcessorContext<KOut, VOut> internalContext, | ||
final Runnable userInit | ||
) { | ||
this.userProcessor = userProcessor; | ||
this.userFixedKeyProcessor = userFixedKeyProcessor; | ||
this.connectedStoreBuilders = connectedStoreBuilders; | ||
|
||
if (userProcessor == null && userFixedKeyProcessor == null) { | ||
throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null"); | ||
} else if (userProcessor != null && userFixedKeyProcessor != null) { | ||
throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null"); | ||
} | ||
} | ||
|
||
@Override | ||
public void init(final ProcessorContext<KOut, VOut> context) { | ||
|
||
initFields((InternalProcessorContext<KOut, VOut>) context); | ||
|
||
userProcessor.init(userContext); | ||
|
||
completeInitialization(); | ||
} | ||
|
||
// Note: we have to cast and suppress warnings in this version of #init but | ||
// not the other due to the KOut parameter being squashed into KIn in the | ||
// fixed-key version of the processor. However, we know this cast is safe, | ||
// since by definition KIn and KOut are the same type | ||
@SuppressWarnings("unchecked") | ||
@Override | ||
public void init(final FixedKeyProcessorContext<KIn, VOut> context) { | ||
|
||
initFields((InternalProcessorContext<KOut, VOut>) context); | ||
|
||
userFixedKeyProcessor.init((FixedKeyProcessorContext<KIn, VOut>) userContext); | ||
|
||
completeInitialization(); | ||
} | ||
|
||
/** | ||
* Performs the first half of initialization by setting all the class fields | ||
* that have to wait for the context to be passed in to #init to be initialized. | ||
*/ | ||
private void initFields( | ||
final InternalProcessorContext<KOut, VOut> internalContext | ||
) { | ||
this.taskContext = internalContext; | ||
|
||
this.streamThreadName = Thread.currentThread().getName(); | ||
|
@@ -235,6 +164,10 @@ private void initFields( | |
PunctuationType.WALL_CLOCK_TIME, | ||
this::punctuate | ||
); | ||
|
||
userInit.run(); | ||
|
||
completeInitialization(); | ||
} | ||
|
||
/** | ||
|
@@ -278,8 +211,7 @@ void assertQueuesEmpty() { | |
} | ||
} | ||
|
||
@Override | ||
public void process(final Record<KIn, VIn> record) { | ||
public void processRegular(final Record<KIn, VIn> record, final Runnable userProcess) { | ||
assertQueuesEmptyOnFirstProcess(); | ||
|
||
final AsyncEvent newEvent = new AsyncEvent( | ||
|
@@ -290,15 +222,14 @@ public void process(final Record<KIn, VIn> record) { | |
extractRecordContext(taskContext), | ||
taskContext.currentStreamTimeMs(), | ||
taskContext.currentSystemTimeMs(), | ||
() -> userProcessor.process(record), | ||
userProcess, | ||
List.of(metricsRecorder::recordStateTransition) | ||
); | ||
|
||
processNewAsyncEvent(newEvent); | ||
process(newEvent); | ||
} | ||
|
||
@Override | ||
public void process(final FixedKeyRecord<KIn, VIn> record) { | ||
public void processFixedKey(final FixedKeyRecord<KIn, VIn> record, final Runnable userProcess) { | ||
assertQueuesEmptyOnFirstProcess(); | ||
|
||
final AsyncEvent newEvent = new AsyncEvent( | ||
|
@@ -309,14 +240,14 @@ public void process(final FixedKeyRecord<KIn, VIn> record) { | |
extractRecordContext(taskContext), | ||
taskContext.currentStreamTimeMs(), | ||
taskContext.currentSystemTimeMs(), | ||
() -> userFixedKeyProcessor.process(record), | ||
userProcess, | ||
List.of(metricsRecorder::recordStateTransition) | ||
); | ||
|
||
processNewAsyncEvent(newEvent); | ||
process(newEvent); | ||
} | ||
|
||
private void processNewAsyncEvent(final AsyncEvent event) { | ||
private void process(final AsyncEvent event) { | ||
if (fatalException != null) { | ||
log.error("process called when processor already hit fatal exception", fatalException); | ||
throw new IllegalStateException( | ||
|
@@ -351,8 +282,7 @@ private ProcessorRecordContext extractRecordContext(final ProcessingContext cont | |
return ((InternalProcessorContext<KOut, VOut>) context).recordContext(); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
public void close(final Runnable userClose) { | ||
if (!isCleared()) { | ||
// This doesn't necessarily indicate an issue; it just should only ever | ||
// happen if the task is closed dirty, but unfortunately we can't tell | ||
|
@@ -369,11 +299,7 @@ public void close() { | |
punctuator.cancel(); | ||
threadPool.removeProcessor(asyncProcessorName, taskId.partition()); | ||
|
||
if (userProcessor != null) { | ||
userProcessor.close(); | ||
} else { | ||
userFixedKeyProcessor.close(); | ||
} | ||
userClose.run(); | ||
} | ||
|
||
private static void registerFlushListenerForStoreBuilders( | ||
|
@@ -660,7 +586,7 @@ private StreamThreadProcessorContext.PreviousRecordContextAndNode preFinalize( | |
throw new IllegalStateException(String.format( | ||
"routed event from %d to the wrong processor for %s", | ||
event.partition(), | ||
taskId.toString())); | ||
taskId)); | ||
} | ||
|
||
// Make sure to check for a failed event before preparing finalization. prepareToFinalizeEvent | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the one thing I haven't implemented yet is that we initialize the store builders as AsyncStoreBuilders right from the start. I still need to push that back so that we don't end up wrapping the user's store suppleirs in the async stuff if that's not enabled