Skip to content

Commit

Permalink
perf: Add ForkJoinPool.commonPool()-based OperationInitializer for ne…
Browse files Browse the repository at this point in the history
…sted parallel DataIndex building (deephaven#5789)

Introduce a very simple `ForkJoinPool`-based `OperationInitializer`,
wrapping the common pool, and use that to ensure parallelism in
`MergedDataIndex` and `TransformedDataIndex` deferred initialization.

Fixes deephaven#5736
  • Loading branch information
rcaudy authored Jul 18, 2024
1 parent 9effbc8 commit d9d1b71
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import org.jetbrains.annotations.NotNull;

import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.function.Supplier;

/**
* Implementation of {@link OperationInitializer} that delegates to a {@link ForkJoinPool}.
*/
public class ForkJoinPoolOperationInitializer implements OperationInitializer {

@NotNull
public static OperationInitializer fromCommonPool() {
return COMMON;
}

private static final ForkJoinPoolOperationInitializer COMMON =
new ForkJoinPoolOperationInitializer(ForkJoinPool.commonPool()) {

private final ExecutionContext executionContext = ExecutionContext.newBuilder()
.setOperationInitializer(NON_PARALLELIZABLE)
.build();

@Override
public @NotNull Future<?> submit(@NotNull final Runnable task) {
return super.submit(() -> executionContext.apply(task));
}
};

private final ForkJoinPool pool;

private ForkJoinPoolOperationInitializer(@NotNull final ForkJoinPool pool) {
this.pool = Objects.requireNonNull(pool);
}

@Override
public boolean canParallelize() {
return parallelismFactor() > 1 && ForkJoinTask.getPool() != pool;
}

@Override
@NotNull
public Future<?> submit(@NotNull final Runnable taskRunnable) {
return pool.submit(taskRunnable);
}

@Override
public int parallelismFactor() {
return pool.getParallelism();
}

/**
* Ensure that {@code task} is parallelizable within the current {@link ExecutionContext}, by wrapping it with a new
* {@code ExecutionContext} that uses {@link #fromCommonPool()} if the current {@code ExecutionContext} does not
* {@link OperationInitializer#canParallelize() allow parallelization}.
*
* @param task The task to possible wrap
* @return The possibly-wrapped task
*/
public static Runnable ensureParallelizable(@NotNull final Runnable task) {
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
return task;
}
return () -> ExecutionContext.getContext()
.withOperationInitializer(ForkJoinPoolOperationInitializer.fromCommonPool())
.apply(task);
}

/**
* Ensure that {@code task} is parallelizable within the current {@link ExecutionContext}, by wrapping it with a new
* {@code ExecutionContext} that uses {@link #fromCommonPool()} if the current {@code ExecutionContext} does not
* {@link OperationInitializer#canParallelize() allow parallelization}.
*
* @param task The task to possible wrap
* @return The possibly-wrapped task
*/
public static <T> Supplier<T> ensureParallelizable(@NotNull final Supplier<T> task) {
if (ExecutionContext.getContext().getOperationInitializer().canParallelize()) {
return task;
}
return () -> ExecutionContext.getContext()
.withOperationInitializer(ForkJoinPoolOperationInitializer.fromCommonPool())
.apply(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.deephaven.engine.table.DataIndex;
import io.deephaven.engine.table.DataIndexTransformer;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.FunctionalColumnLong;
import io.deephaven.engine.table.impl.select.SelectColumn;
Expand Down Expand Up @@ -79,25 +81,28 @@ public Table table() {
if ((localIndexTable = indexTable) != null) {
return localIndexTable;
}
indexTable = localIndexTable = QueryPerformanceRecorder.withNugget("Transform Data Index",
ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable));
// Don't hold onto the transformer after the index table is computed, we don't need to maintain
// reachability for its RowSets anymore.
transformer = null;
return localIndexTable;
}
}

try (final SafeCloseable ignored = parentIndex.isRefreshing() ? LivenessScopeStack.open() : null) {
localIndexTable = parentIndex.table();
localIndexTable = maybeIntersectAndInvert(localIndexTable);
localIndexTable = maybeSortByFirstKey(localIndexTable);
localIndexTable = localIndexTable.isRefreshing() && transformer.snapshotResult()
? localIndexTable.snapshot()
: localIndexTable;

if (localIndexTable.isRefreshing()) {
manage(localIndexTable);
}

indexTable = localIndexTable;
// Don't hold onto the transformer after the index table is computed, we don't need to maintain
// reachability for its RowSets anymore.
transformer = null;
return localIndexTable;
private Table buildTable() {
try (final SafeCloseable ignored = parentIndex.isRefreshing() ? LivenessScopeStack.open() : null) {
Table localIndexTable = parentIndex.table();
localIndexTable = maybeIntersectAndInvert(localIndexTable);
localIndexTable = maybeSortByFirstKey(localIndexTable);
localIndexTable = localIndexTable.isRefreshing() && transformer.snapshotResult()
? localIndexTable.snapshot()
: localIndexTable;

if (localIndexTable.isRefreshing()) {
manage(localIndexTable);
}
return localIndexTable;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.PartitionedTableFactory;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.ForkJoinPoolOperationInitializer;
import io.deephaven.engine.table.impl.by.AggregationProcessor;
import io.deephaven.engine.table.impl.by.AggregationRowLookup;
import io.deephaven.engine.table.impl.dataindex.AbstractDataIndex;
Expand All @@ -33,7 +34,7 @@
/**
* DataIndex that accumulates the individual per-{@link TableLocation} data indexes of a {@link Table} backed by a
* {@link RegionedColumnSourceManager}.
*
*
* @implNote This implementation is responsible for ensuring that the provided table accounts for the relative positions
* of individual table locations in the provided table of indices. Work to coalesce the index table is
* deferred until the first call to {@link #table()}. Refreshing inputs/indexes are not supported at this time
Expand Down Expand Up @@ -123,7 +124,7 @@ public Table table() {
try {
return QueryPerformanceRecorder.withNugget(
String.format("Merge Data Indexes [%s]", String.join(", ", keyColumnNames)),
this::buildTable);
ForkJoinPoolOperationInitializer.ensureParallelizable(this::buildTable));
} catch (Throwable t) {
isCorrupt = true;
throw t;
Expand Down

0 comments on commit d9d1b71

Please sign in to comment.