diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ForkJoinPoolOperationInitializer.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ForkJoinPoolOperationInitializer.java new file mode 100644 index 00000000000..6954380db88 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ForkJoinPoolOperationInitializer.java @@ -0,0 +1,94 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl; + +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.updategraph.OperationInitializer; +import org.jetbrains.annotations.NotNull; + +import java.util.Objects; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +/** + * Implementation of {@link OperationInitializer} that delegates to a {@link ForkJoinPool}. + */ +public class ForkJoinPoolOperationInitializer implements OperationInitializer { + + @NotNull + public static OperationInitializer fromCommonPool() { + return COMMON; + } + + private static final ForkJoinPoolOperationInitializer COMMON = + new ForkJoinPoolOperationInitializer(ForkJoinPool.commonPool()) { + + private final ExecutionContext executionContext = ExecutionContext.newBuilder() + .setOperationInitializer(NON_PARALLELIZABLE) + .build(); + + @Override + public @NotNull Future submit(@NotNull final Runnable task) { + return super.submit(() -> executionContext.apply(task)); + } + }; + + private final ForkJoinPool pool; + + private ForkJoinPoolOperationInitializer(@NotNull final ForkJoinPool pool) { + this.pool = Objects.requireNonNull(pool); + } + + @Override + public boolean canParallelize() { + return parallelismFactor() > 1 && ForkJoinTask.getPool() != pool; + } + + @Override + @NotNull + public Future submit(@NotNull final Runnable taskRunnable) { + return pool.submit(taskRunnable); + } + + @Override + public int parallelismFactor() { + return pool.getParallelism(); + } + + /** + * Ensure that {@code task} is parallelizable within the current {@link ExecutionContext}, by wrapping it with a new + * {@code ExecutionContext} that uses {@link #fromCommonPool()} if the current {@code ExecutionContext} does not + * {@link OperationInitializer#canParallelize() allow parallelization}. + * + * @param task The task to possible wrap + * @return The possibly-wrapped task + */ + public static Runnable ensureParallelizable(@NotNull final Runnable task) { + if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + return task; + } + return () -> ExecutionContext.getContext() + .withOperationInitializer(ForkJoinPoolOperationInitializer.fromCommonPool()) + .apply(task); + } + + /** + * Ensure that {@code task} is parallelizable within the current {@link ExecutionContext}, by wrapping it with a new + * {@code ExecutionContext} that uses {@link #fromCommonPool()} if the current {@code ExecutionContext} does not + * {@link OperationInitializer#canParallelize() allow parallelization}. + * + * @param task The task to possible wrap + * @return The possibly-wrapped task + */ + public static Supplier ensureParallelizable(@NotNull final Supplier task) { + if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) { + return task; + } + return () -> ExecutionContext.getContext() + .withOperationInitializer(ForkJoinPoolOperationInitializer.fromCommonPool()) + .apply(task); + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java index a48ab67b977..7759adcaa49 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/dataindex/TransformedDataIndex.java @@ -15,6 +15,8 @@ import io.deephaven.engine.table.DataIndex; import io.deephaven.engine.table.DataIndexTransformer; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.table.impl.select.FunctionalColumnLong; import io.deephaven.engine.table.impl.select.SelectColumn; @@ -79,25 +81,28 @@ public Table table() { if ((localIndexTable = indexTable) != null) { return localIndexTable; } + indexTable = localIndexTable = QueryPerformanceRecorder.withNugget("Transform Data Index", + ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable)); + // Don't hold onto the transformer after the index table is computed, we don't need to maintain + // reachability for its RowSets anymore. + transformer = null; + return localIndexTable; + } + } - try (final SafeCloseable ignored = parentIndex.isRefreshing() ? LivenessScopeStack.open() : null) { - localIndexTable = parentIndex.table(); - localIndexTable = maybeIntersectAndInvert(localIndexTable); - localIndexTable = maybeSortByFirstKey(localIndexTable); - localIndexTable = localIndexTable.isRefreshing() && transformer.snapshotResult() - ? localIndexTable.snapshot() - : localIndexTable; - - if (localIndexTable.isRefreshing()) { - manage(localIndexTable); - } - - indexTable = localIndexTable; - // Don't hold onto the transformer after the index table is computed, we don't need to maintain - // reachability for its RowSets anymore. - transformer = null; - return localIndexTable; + private Table buildTable() { + try (final SafeCloseable ignored = parentIndex.isRefreshing() ? LivenessScopeStack.open() : null) { + Table localIndexTable = parentIndex.table(); + localIndexTable = maybeIntersectAndInvert(localIndexTable); + localIndexTable = maybeSortByFirstKey(localIndexTable); + localIndexTable = localIndexTable.isRefreshing() && transformer.snapshotResult() + ? localIndexTable.snapshot() + : localIndexTable; + + if (localIndexTable.isRefreshing()) { + manage(localIndexTable); } + return localIndexTable; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java index 8bbe9f15532..44735004f52 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/MergedDataIndex.java @@ -15,6 +15,7 @@ import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.PartitionedTableFactory; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer; import io.deephaven.engine.table.impl.by.AggregationProcessor; import io.deephaven.engine.table.impl.by.AggregationRowLookup; import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex; @@ -33,7 +34,7 @@ /** * DataIndex that accumulates the individual per-{@link TableLocation} data indexes of a {@link Table} backed by a * {@link RegionedColumnSourceManager}. - * + * * @implNote This implementation is responsible for ensuring that the provided table accounts for the relative positions * of individual table locations in the provided table of indices. Work to coalesce the index table is * deferred until the first call to {@link #table()}. Refreshing inputs/indexes are not supported at this time @@ -123,7 +124,7 @@ public Table table() { try { return QueryPerformanceRecorder.withNugget( String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)), - this::buildTable); + ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable)); } catch (Throwable t) { isCorrupt = true; throw t;