Skip to content

Commit

Permalink
[FLINK-35029][state/forst] Store timer in JVM heap when use async sta…
Browse files Browse the repository at this point in the history
…te backend (#25501)
  • Loading branch information
fredia authored Oct 23, 2024
1 parent a83f24f commit 039a07d
Show file tree
Hide file tree
Showing 25 changed files with 630 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager();
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));
final StreamTask<?, ?> containingTask = checkNotNull(getContainingTask());
environment = containingTask.getEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public AbstractAsyncStateStreamOperatorV2(
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
this.timeServiceManager = stateHandler.getAsyncInternalTimerServiceManager();
getRuntimeContext().setKeyedStateStoreV2(stateHandler.getKeyedStateStoreV2().orElse(null));

final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.util.Disposable;

Expand All @@ -41,6 +42,7 @@
public interface AsyncKeyedStateBackend<K>
extends Snapshotable<SnapshotResult<KeyedStateHandle>>,
InternalCheckpointListener,
PriorityQueueSetFactory,
Disposable,
Closeable,
AsyncExecutionController.SwitchContextListener<K> {
Expand Down Expand Up @@ -83,10 +85,24 @@ <N, S extends State, SV> S createState(
@Nonnull
StateExecutor createStateExecutor();

/** Returns the key groups which this state backend is responsible for. */
KeyGroupRange getKeyGroupRange();

/** By default, a state backend does nothing when a key is switched in async processing. */
@Override
default void switchContext(RecordContext<K> context) {}

// TODO remove this once heap-based timers are working with ForSt incremental snapshots!
/**
* Whether the keyed state backend requires legacy synchronous timer snapshots.
*
* @param checkpointType
* @return true as default in case of AsyncKeyedStateBackend
*/
default boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) {
return true;
}

@Override
void dispose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
Expand Down Expand Up @@ -96,6 +103,11 @@ public StateExecutor createStateExecutor() {
return null;
}

@Override
public KeyGroupRange getKeyGroupRange() {
return keyedStateBackend.getKeyGroupRange();
}

@Override
public void switchContext(RecordContext<K> context) {
keyedStateBackend.setCurrentKeyAndKeyGroup(context.getKey(), context.getKeyGroup());
Expand Down Expand Up @@ -139,4 +151,32 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
return keyedStateBackend.snapshot(
checkpointId, timestamp, streamFactory, checkpointOptions);
}

@Nonnull
@Override
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
return keyedStateBackend.create(stateName, byteOrderedElementSerializer);
}

@Override
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T> create(
@Nonnull String stateName,
@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
boolean allowFutureMetadataUpdates) {
return keyedStateBackend.create(
stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
}

@Override
public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) {
if (keyedStateBackend instanceof AbstractKeyedStateBackend) {
return ((AbstractKeyedStateBackend) keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(checkpointType);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
Expand Down Expand Up @@ -110,7 +111,8 @@ void snapshotToRawKeyedState(
interface Provider extends Serializable {
<K> InternalTimeServiceManager<K> create(
TaskIOMetricGroup taskIOMetricGroup,
CheckpointableKeyedStateBackend<K> keyedStatedBackend,
PriorityQueueSetFactory factory,
KeyGroupRange keyGroupRange,
ClassLoader userClassloader,
KeyContext keyContext,
ProcessingTimeService processingTimeService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
Expand Down Expand Up @@ -101,21 +100,21 @@ private InternalTimeServiceManagerImpl(
*/
public static <K> InternalTimeServiceManagerImpl<K> create(
TaskIOMetricGroup taskIOMetricGroup,
CheckpointableKeyedStateBackend<K> keyedStateBackend,
PriorityQueueSetFactory factory,
KeyGroupRange keyGroupRange,
ClassLoader userClassloader,
KeyContext keyContext,
ProcessingTimeService processingTimeService,
Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates,
StreamTaskCancellationContext cancellationContext)
throws Exception {
final KeyGroupRange keyGroupRange = keyedStateBackend.getKeyGroupRange();

final InternalTimeServiceManagerImpl<K> timeServiceManager =
new InternalTimeServiceManagerImpl<>(
taskIOMetricGroup,
keyGroupRange,
keyContext,
keyedStateBackend,
factory,
processingTimeService,
cancellationContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ default boolean isRestored() {
*/
InternalTimeServiceManager<?> internalTimerServiceManager();

/**
* Returns the internal timer service manager create by async state backend for the stream
* operator. This method returns null for non-keyed operators.
*/
InternalTimeServiceManager<?> asyncInternalTimerServiceManager();

/**
* Returns an iterable to obtain input streams for previously stored operator state partitions
* that are assigned to this stream operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ void snapshotState(
&& ((AbstractKeyedStateBackend<?>) keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());
requiresLegacyRawKeyedStateSnapshots |=
keyedStateBackend instanceof AsyncKeyedStateBackend
&& ((AsyncKeyedStateBackend<?>) keyedStateBackend)
.requiresLegacySynchronousTimerSnapshots(
checkpointOptions.getCheckpointType());

if (requiresLegacyRawKeyedStateSnapshots) {
checkState(
Expand Down Expand Up @@ -459,6 +464,10 @@ public Object getCurrentKey() {
}
}

public InternalTimeServiceManager<?> getAsyncInternalTimerServiceManager() {
return context.asyncInternalTimerServiceManager();
}

public Optional<KeyedStateStore> getKeyedStateStore() {
return Optional.ofNullable(keyedStateStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public StreamOperatorStateContext streamOperatorStateContext(
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
InternalTimeServiceManager<?> timeServiceManager = null;
InternalTimeServiceManager<?> asyncTimeServiceManager = null;

final StateObject.StateObjectSizeStatsCollector statsCollector =
StateObject.StateObjectSizeStatsCollector.create();
Expand Down Expand Up @@ -241,34 +242,42 @@ public StreamOperatorStateContext streamOperatorStateContext(
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

// -------------- Internal Timer Service Manager --------------
// if the operator indicates that it is using custom raw keyed state,
// then whatever was written in the raw keyed state snapshot was NOT written
// by the internal timer services (because there is only ever one user of raw keyed
// state);
// in this case, timers should not attempt to restore timers from the raw keyed
// state.
final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
(prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState)
? rawKeyedStateInputs
: Collections.emptyList();
if (keyedStatedBackend != null) {

// if the operator indicates that it is using custom raw keyed state,
// then whatever was written in the raw keyed state snapshot was NOT written
// by the internal timer services (because there is only ever one user of raw keyed
// state);
// in this case, timers should not attempt to restore timers from the raw keyed
// state.
final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =
(prioritizedOperatorSubtaskStates.isRestored()
&& !isUsingCustomRawKeyedState)
? rawKeyedStateInputs
: Collections.emptyList();

timeServiceManager =
timeServiceManagerProvider.create(
environment.getMetricGroup().getIOMetricGroup(),
keyedStatedBackend,
keyedStatedBackend.getKeyGroupRange(),
environment.getUserCodeClassLoader().asClassLoader(),
keyContext,
processingTimeService,
restoredRawKeyedStateTimers,
cancellationContext);
}
if (stateBackend.supportsAsyncKeyedStateBackend()) {
asyncTimeServiceManager =
timeServiceManagerProvider.create(
environment.getMetricGroup().getIOMetricGroup(),
asyncKeyedStateBackend,
asyncKeyedStateBackend.getKeyGroupRange(),
environment.getUserCodeClassLoader().asClassLoader(),
keyContext,
processingTimeService,
restoredRawKeyedStateTimers,
cancellationContext);
} else {
timeServiceManager = null;
asyncTimeServiceManager = timeServiceManager;
}
// TODO: Support Timer for AsyncKeyedStateBackend

// Add stats for input channel and result partition state
Stream.concat(
prioritizedOperatorSubtaskStates.getPrioritizedInputChannelState()
Expand All @@ -295,6 +304,7 @@ public StreamOperatorStateContext streamOperatorStateContext(
keyedStatedBackend,
asyncKeyedStateBackend,
timeServiceManager,
asyncTimeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
} catch (Exception ex) {
Expand Down Expand Up @@ -778,6 +788,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta
private final CheckpointableKeyedStateBackend<?> keyedStateBackend;
private final AsyncKeyedStateBackend asyncKeyedStateBackend;
private final InternalTimeServiceManager<?> internalTimeServiceManager;
private final InternalTimeServiceManager<?> asyncInternalTimeServiceManager;

private final CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
private final CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;
Expand All @@ -788,6 +799,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta
CheckpointableKeyedStateBackend<?> keyedStateBackend,
AsyncKeyedStateBackend asyncKeyedStateBackend,
InternalTimeServiceManager<?> internalTimeServiceManager,
InternalTimeServiceManager<?> asyncInternalTimeServiceManager,
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs,
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) {

Expand All @@ -796,6 +808,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta
this.keyedStateBackend = keyedStateBackend;
this.asyncKeyedStateBackend = asyncKeyedStateBackend;
this.internalTimeServiceManager = internalTimeServiceManager;
this.asyncInternalTimeServiceManager = asyncInternalTimeServiceManager;
this.rawOperatorStateInputs = rawOperatorStateInputs;
this.rawKeyedStateInputs = rawKeyedStateInputs;
}
Expand Down Expand Up @@ -827,6 +840,11 @@ public InternalTimeServiceManager<?> internalTimerServiceManager() {
return internalTimeServiceManager;
}

@Override
public InternalTimeServiceManager<?> asyncInternalTimerServiceManager() {
return asyncInternalTimeServiceManager;
}

@Override
public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
return rawOperatorStateInputs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
Expand Down Expand Up @@ -106,19 +107,21 @@ public void snapshotToRawKeyedState(

public static <K> InternalTimeServiceManager<K> create(
TaskIOMetricGroup taskIOMetricGroup,
CheckpointableKeyedStateBackend<K> keyedStatedBackend,
PriorityQueueSetFactory factory,
KeyGroupRange keyGroupRange,
ClassLoader userClassloader,
KeyContext keyContext, // the operator
ProcessingTimeService processingTimeService,
Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates,
StreamTaskCancellationContext cancellationContext) {
checkState(
keyedStatedBackend instanceof BatchExecutionKeyedStateBackend,
factory instanceof BatchExecutionKeyedStateBackend,
"Batch execution specific time service can work only with BatchExecutionKeyedStateBackend");

BatchExecutionInternalTimeServiceManager<K> timeServiceManager =
new BatchExecutionInternalTimeServiceManager<>(processingTimeService);
keyedStatedBackend.registerKeySelectionListener(timeServiceManager);
((BatchExecutionKeyedStateBackend) factory)
.registerKeySelectionListener(timeServiceManager);
return timeServiceManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -222,7 +221,6 @@ void testCheckpointDrain() throws Exception {
}
}

@Disabled("Support Timer for AsyncKeyedStateBackend")
@Test
void testTimerServiceIsAsync() throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
Expand Down
Loading

0 comments on commit 039a07d

Please sign in to comment.