From 0974068a552893d77371a92fef8375f41f4f3b4d Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Tue, 28 Nov 2023 20:42:33 -0500 Subject: [PATCH] Add optional extra dependencies to PartitionedTable.transform and PartitionedTable.p (#4889) --- .../engine/table/PartitionedTable.java | 82 ++++++---- .../engine/table/PartitionedTableFactory.java | 8 +- .../by/ChunkedOperatorAggregationHelper.java | 4 + .../BaseTableTransformationColumn.java | 2 - .../BiTableTransformationColumn.java | 4 +- .../partitioned/PartitionedTableImpl.java | 144 +++++++++++------- .../PartitionedTableProxyImpl.java | 121 ++++++++++----- .../TableTransformationColumn.java | 5 +- .../updategraph/impl/PeriodicUpdateGraph.java | 26 +++- .../table/impl/PartitionedTableTest.java | 92 ++++++++++- 10 files changed, 342 insertions(+), 146 deletions(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTable.java b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTable.java index 4f1452f36fd..375fc291a69 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTable.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTable.java @@ -12,6 +12,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessNode; import io.deephaven.engine.liveness.LivenessReferent; +import io.deephaven.engine.updategraph.NotificationQueue.Dependency; import io.deephaven.util.annotations.FinalDefault; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -27,11 +28,11 @@ *

* A partitioned table is a {@link Table} with one or more columns containing non-{@code null}, like-defined constituent * tables, optionally with "key" columns defined to allow - * {@link #partitionedTransform(PartitionedTable, BinaryOperator)} or proxied joins with other like-keyed partitioned - * tables. + * {@link #partitionedTransform(PartitionedTable, BinaryOperator, Dependency...)} or proxied joins with other like-keyed + * partitioned tables. *

- * Note that partitioned tables should {@link io.deephaven.engine.updategraph.NotificationQueue.Dependency depend} on - * and {@link io.deephaven.engine.liveness.LivenessManager#manage(LivenessReferent) manage} their + * Note that partitioned tables should {@link Dependency depend} on and + * {@link io.deephaven.engine.liveness.LivenessManager#manage(LivenessReferent) manage} their * {@link Table#isRefreshing() refreshing} constituents. */ public interface PartitionedTable extends LivenessNode, LogOutputAppendable { @@ -151,12 +152,13 @@ default Proxy proxy() { * PartitionedTable. *

* Each operation thus applied will produce a new PartitionedTable with the results as in - * {@link #transform(UnaryOperator)} or {@link #partitionedTransform(PartitionedTable, BinaryOperator)}, and return - * a new proxy to that PartitionedTable. + * {@link #transform(UnaryOperator, Dependency...)} or + * {@link #partitionedTransform(PartitionedTable, BinaryOperator, Dependency...)}, and return a new proxy to that + * PartitionedTable. * * @param requireMatchingKeys Whether to ensure that both partitioned tables have all the same keys present when a * proxied operation uses {@code this} and another {@link PartitionedTable} as inputs for a - * {@link #partitionedTransform(PartitionedTable, BinaryOperator) partitioned transform} + * {@link #partitionedTransform(PartitionedTable, BinaryOperator, Dependency...) partitioned transform} * @param sanityCheckJoinOperations Whether to check that proxied join operations will only find a given join key in * one constituent table for {@code this} and the {@link Table table} argument if it is also a * {@link PartitionedTable.Proxy proxy} @@ -211,17 +213,25 @@ default Proxy proxy() { * PartitionedTable's {@link #table() underlying table} is refreshing. *

* - * @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for an - * empty input table. It is required to install an ExecutionContext to access any - * QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}. - * * @param transformer The {@link UnaryOperator} to apply to all constituent {@link Table tables} + * @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added + * or modified constituents during update processing; use this when {@code transformer} uses additional + * {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this} * @return The new PartitionedTable containing the resulting constituents * @throws IllegalStateException On instantiation or update if {@code !table().isRefreshing()} and * {@code transformer} produces a refreshing result for any constituent + * @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for an + * empty input table. It is required to install an ExecutionContext to access any + * QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}. */ - default PartitionedTable transform(@NotNull UnaryOperator transformer) { - return transform(ExecutionContext.getContextToRecord(), transformer, table().isRefreshing()); + default PartitionedTable transform( + @NotNull final UnaryOperator
transformer, + @NotNull final Dependency... dependencies) { + return transform( + ExecutionContext.getContextToRecord(), + transformer, + table().isRefreshing(), + dependencies); } /** @@ -239,6 +249,9 @@ default PartitionedTable transform(@NotNull UnaryOperator
transformer) { * backed by a refreshing {@link #table() table}. This hint is important for transforms to static inputs that * might produce refreshing output, in order to ensure correct liveness management; incorrectly specifying * {@code false} will result in exceptions. + * @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added + * or modified constituents during update processing; use this when {@code transformer} uses additional + * {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this} * @return The new PartitionedTable containing the resulting constituents * @throws IllegalStateException On instantiation or update if * {@code !table().isRefreshing() && !expectRefreshingResults} and {@code transformer} produces a refreshing @@ -247,7 +260,8 @@ default PartitionedTable transform(@NotNull UnaryOperator
transformer) { PartitionedTable transform( @Nullable ExecutionContext executionContext, @NotNull UnaryOperator
transformer, - boolean expectRefreshingResults); + boolean expectRefreshingResults, + @NotNull Dependency... dependencies); /** *

@@ -270,22 +284,30 @@ PartitionedTable transform( * {@code other} has a refreshing {@link #table() underlying table}. *

* - * @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for - * empty input tables. It is required to install an ExecutionContext to access any - * QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}. - * * @param other The other PartitionedTable to find constituents in * @param transformer The {@link BinaryOperator} to apply to all pairs of constituent {@link Table tables} + * @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added, + * modified, or newly-matched constituents during update processing; use this when {@code transformer} uses + * additional {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this} or + * {@code other} * @return The new PartitionedTable containing the resulting constituents * @throws IllegalStateException On instantiation or update if * {@code !table().isRefreshing() && !other.table().isRefreshing()} and {@code transformer} produces a * refreshing result for any constituent + * @apiNote {@code transformer} must be stateless, safe for concurrent use, and able to return a valid result for + * empty input tables. It is required to install an ExecutionContext to access any + * QueryLibrary/QueryScope/QueryCompiler functionality from the {@code transformer}. */ default PartitionedTable partitionedTransform( - @NotNull PartitionedTable other, - @NotNull BinaryOperator

transformer) { - return partitionedTransform(other, ExecutionContext.getContextToRecord(), transformer, - table().isRefreshing() || other.table().isRefreshing()); + @NotNull final PartitionedTable other, + @NotNull final BinaryOperator
transformer, + @NotNull final Dependency... dependencies) { + return partitionedTransform( + other, + ExecutionContext.getContextToRecord(), + transformer, + table().isRefreshing() || other.table().isRefreshing(), + dependencies); } /** @@ -317,16 +339,22 @@ default PartitionedTable partitionedTransform( * backed by a refreshing {@link #table() table}. This hint is important for transforms to static inputs that * might produce refreshing output, in order to ensure correct liveness management; incorrectly specifying * {@code false} will result in exceptions. + * @param dependencies Additional dependencies that must be satisfied before applying {@code transformer} to added, + * modified, or newly-matched constituents during update processing; use this when {@code transformer} uses + * additional {@code Table} or {@code PartitionedTable} inputs besides the constituents of {@code this} or + * {@code other} * @return The new PartitionedTable containing the resulting constituents * @throws IllegalStateException On instantiation or update if - * {@code !table().isRefreshing() && !other.table().isRefreshing() && !expectRefreshingResults} and - * {@code transformer} produces a refreshing result for any constituent + * {@code !table().isRefreshing() && !other.table().isRefreshing() && + * !expectRefreshingResults} and {@code transformer} produces a refreshing result for + * any constituent */ PartitionedTable partitionedTransform( @NotNull PartitionedTable other, @Nullable ExecutionContext executionContext, @NotNull BinaryOperator
transformer, - boolean expectRefreshingResults); + boolean expectRefreshingResults, + @NotNull Dependency... dependencies); /** *

@@ -343,10 +371,10 @@ PartitionedTable partitionedTransform( * times. * * @param keyColumnValues Ordered, boxed values for the key columns in the same order as {@link #keyColumnNames()} - * @throws IllegalArgumentException If {@code keyColumnValues.length != keyColumnNames().size()} - * @throws UnsupportedOperationException If multiple matching rows for the {@code keyColumnValues} were found * @return The {@link Table constituent} at the single row in {@link #table()} matching the {@code keyColumnValues}, * or {@code null} if no matches were found + * @throws IllegalArgumentException If {@code keyColumnValues.length != keyColumnNames().size()} + * @throws UnsupportedOperationException If multiple matching rows for the {@code keyColumnValues} were found */ @ConcurrentMethod Table constituentFor(@NotNull Object... keyColumnValues); diff --git a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java index 9aeea73fc3b..f9908a3dc87 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java @@ -7,8 +7,6 @@ import java.util.Collection; import java.util.ServiceLoader; -import java.util.function.BinaryOperator; -import java.util.function.UnaryOperator; /** * Factory for producing Deephaven engine {@link PartitionedTable} instances. @@ -73,9 +71,9 @@ private static Creator partitionedTableCreator() { * @param table The "raw" {@link Table table} of {@link Table tables}. Should be {@link Table#isRefreshing() * refreshing} if any constituents are. * @param keyColumnNames The "key" column names from {@code table}. Key columns are used in - * {@link PartitionedTable#transform(UnaryOperator)} to validate the safety and correctness of join - * operations and in {@link PartitionedTable#partitionedTransform(PartitionedTable, BinaryOperator)} to - * correlate tables that should be transformed together. Passing an ordered set is highly recommended. + * {@link PartitionedTable#transform transform} to validate the safety and correctness of join operations and + * in {@link PartitionedTable#partitionedTransform partitionedTransform} to correlate tables that should be + * transformed together. Passing an ordered set is highly recommended. * @param uniqueKeys Whether the keys (key column values for a row considered as a tuple) in {@code table} are * guaranteed to be unique * @param constituentColumnName The "constituent" column name from {@code table}. The constituent column contains diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java index 2d6c8f8dde6..5c600e2a63f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java @@ -1698,6 +1698,10 @@ private static OperatorAggregationStateManager makeInitializedStateManager( outputPosition.setValue(0); final OperatorAggregationStateManager stateManager = stateManagerSupplier.get(); + if (initialKeys.isEmpty()) { + return stateManager; + } + final ColumnSource[] keyColumnsToInsert; final boolean closeRowsToInsert; final RowSequence rowsToInsert; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java index 81776ba100b..be54278bcd9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BaseTableTransformationColumn.java @@ -14,8 +14,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.BiFunction; -import java.util.function.Function; /** * Base {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#transform} and diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java index a78458837af..796cef1b07b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/BiTableTransformationColumn.java @@ -23,8 +23,8 @@ import java.util.function.BinaryOperator; /** - * {@link SelectColumn} implementation to wrap transformer functions for - * {@link PartitionedTable#partitionedTransform(PartitionedTable, BinaryOperator) partitioned transformations}. + * {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#partitionedTransform + * partitioned transformations}. */ class BiTableTransformationColumn extends BaseTableTransformationColumn { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index ff4edfc1500..e3ecf0a6b03 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -29,8 +29,10 @@ import io.deephaven.engine.table.impl.sources.NullValueColumnSource; import io.deephaven.engine.table.impl.sources.UnionSourceManager; import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator; +import io.deephaven.engine.updategraph.NotificationQueue.Dependency; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.annotations.InternalUseOnly; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; @@ -64,10 +66,11 @@ public class PartitionedTableImpl extends LivenessArtifact implements Partitione private volatile WeakReference memoizedMerge; /** + * @apiNote Only engine-internal tools should call this constructor directly * @see PartitionedTableFactory#of(Table, Collection, boolean, String, TableDefinition, boolean) Factory method that * delegates to this method - * @apiNote Only engine-internal tools should call this constructor directly */ + @InternalUseOnly public PartitionedTableImpl( @NotNull final Table table, @NotNull final Collection keyColumnNames, @@ -239,14 +242,17 @@ public PartitionedTableImpl filter(@NotNull final Collection f throw new IllegalArgumentException("Unsupported filter against constituent column " + constituentColumnName + " found in filters: " + filters); } - return new PartitionedTableImpl( - table.where(Filter.and(whereFilters)), - keyColumnNames, - uniqueKeys, - constituentColumnName, - constituentDefinition, - constituentChangesPermitted || table.isRefreshing(), - false); + return LivenessScopeStack.computeEnclosed( + () -> new PartitionedTableImpl( + table.where(Filter.and(whereFilters)), + keyColumnNames, + uniqueKeys, + constituentColumnName, + constituentDefinition, + constituentChangesPermitted || table.isRefreshing(), + false), + table::isRefreshing, + pt -> pt.table().isRefreshing()); } @ConcurrentMethod @@ -259,59 +265,66 @@ public PartitionedTable sort(@NotNull final Collection sortColumns) throw new IllegalArgumentException("Unsupported sort on constituent column " + constituentColumnName + " found in sort columns: " + sortColumns); } - return new PartitionedTableImpl( - table.sort(sortColumns), - keyColumnNames, - uniqueKeys, - constituentColumnName, - constituentDefinition, - constituentChangesPermitted || table.isRefreshing(), - false); + return LivenessScopeStack.computeEnclosed( + () -> new PartitionedTableImpl( + table.sort(sortColumns), + keyColumnNames, + uniqueKeys, + constituentColumnName, + constituentDefinition, + constituentChangesPermitted || table.isRefreshing(), + false), + table::isRefreshing, + pt -> pt.table().isRefreshing()); } @ConcurrentMethod @Override - public PartitionedTableImpl transform( + public PartitionedTable transform( @Nullable final ExecutionContext executionContext, @NotNull final UnaryOperator

transformer, - final boolean expectRefreshingResults) { - final Table resultTable; + final boolean expectRefreshingResults, + @NotNull final Dependency... dependencies) { + final PartitionedTable resultPartitionedTable; final TableDefinition resultConstituentDefinition; final LivenessManager enclosingScope = LivenessScopeStack.peek(); try (final SafeCloseable ignored1 = executionContext == null ? null : executionContext.open(); final SafeCloseable ignored2 = LivenessScopeStack.open()) { - final Table asRefreshingIfNeeded = maybeCopyAsRefreshing(table, expectRefreshingResults); + final Table prepared = prepareForTransform(table, expectRefreshingResults, dependencies); // Perform the transformation - resultTable = asRefreshingIfNeeded.update(List.of(new TableTransformationColumn( - constituentColumnName, executionContext, - asRefreshingIfNeeded.isRefreshing() ? transformer : assertResultsStatic(transformer)))); - enclosingScope.manage(resultTable); + final Table resultTable = prepared.update(List.of(new TableTransformationColumn( + constituentColumnName, + executionContext, + prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition final Table emptyConstituent = emptyConstituent(constituentDefinition); final Table resultEmptyConstituent = transformer.apply(emptyConstituent); resultConstituentDefinition = resultEmptyConstituent.getDefinition(); - } - // Build the result partitioned table - return new PartitionedTableImpl( - resultTable, - keyColumnNames, - uniqueKeys, - constituentColumnName, - resultConstituentDefinition, - constituentChangesPermitted, - true); + // Build the result partitioned table + resultPartitionedTable = new PartitionedTableImpl( + resultTable, + keyColumnNames, + uniqueKeys, + constituentColumnName, + resultConstituentDefinition, + constituentChangesPermitted, + true); + enclosingScope.manage(resultPartitionedTable); + } + return resultPartitionedTable; } @Override - public PartitionedTableImpl partitionedTransform( + public PartitionedTable partitionedTransform( @NotNull final PartitionedTable other, @Nullable final ExecutionContext executionContext, @NotNull final BinaryOperator
transformer, - final boolean expectRefreshingResults) { + final boolean expectRefreshingResults, + @NotNull final Dependency... dependencies) { // Check safety before doing any extra work final UpdateGraph updateGraph = table.getUpdateGraph(other.table()); if (table.isRefreshing() || other.table().isRefreshing()) { @@ -321,7 +334,7 @@ public PartitionedTableImpl partitionedTransform( // Validate join compatibility final MatchPair[] joinPairs = matchKeyColumns(this, other); - final Table resultTable; + final PartitionedTable resultPartitionedTable; final TableDefinition resultConstituentDefinition; final LivenessManager enclosingScope = LivenessScopeStack.peek(); try (final SafeCloseable ignored1 = executionContext == null ? null : executionContext.open(); @@ -334,39 +347,55 @@ public PartitionedTableImpl partitionedTransform( .where(new MatchFilter(Inverted, RHS_CONSTITUENT, (Object) null)) : table.join(other.table(), Arrays.asList(joinPairs), Arrays.asList(joinAdditions)); - final Table asRefreshingIfNeeded = maybeCopyAsRefreshing(joined, expectRefreshingResults); + final Table prepared = prepareForTransform(joined, expectRefreshingResults, dependencies); - resultTable = asRefreshingIfNeeded - .update(List.of(new BiTableTransformationColumn(constituentColumnName, RHS_CONSTITUENT, + final Table resultTable = prepared + .update(List.of(new BiTableTransformationColumn( + constituentColumnName, + RHS_CONSTITUENT, executionContext, - asRefreshingIfNeeded.isRefreshing() ? transformer : assertResultsStatic(transformer)))) + prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); - enclosingScope.manage(resultTable); // Make sure we have a valid result constituent definition final Table emptyConstituent1 = emptyConstituent(constituentDefinition); final Table emptyConstituent2 = emptyConstituent(other.constituentDefinition()); final Table resultEmptyConstituent = transformer.apply(emptyConstituent1, emptyConstituent2); resultConstituentDefinition = resultEmptyConstituent.getDefinition(); - } - // Build the result partitioned table - return new PartitionedTableImpl( - resultTable, - keyColumnNames, - uniqueKeys, - constituentColumnName, - resultConstituentDefinition, - constituentChangesPermitted || other.constituentChangesPermitted(), - true); + // Build the result partitioned table + resultPartitionedTable = new PartitionedTableImpl( + resultTable, + keyColumnNames, + uniqueKeys, + constituentColumnName, + resultConstituentDefinition, + constituentChangesPermitted || other.constituentChangesPermitted(), + true); + enclosingScope.manage(resultPartitionedTable); + } + return resultPartitionedTable; } - private Table maybeCopyAsRefreshing(Table table, boolean expectRefreshingResults) { - if (!expectRefreshingResults || table.isRefreshing()) { + private static Table prepareForTransform( + @NotNull final Table table, + final boolean expectRefreshingResults, + @Nullable final Dependency[] dependencies) { + + final boolean addDependencies = dependencies != null && dependencies.length > 0; + final boolean setRefreshing = (expectRefreshingResults || addDependencies) && !table.isRefreshing(); + + if (!addDependencies && !setRefreshing) { return table; } + final Table copied = ((QueryTable) table.coalesce()).copy(); - copied.setRefreshing(true); + if (setRefreshing) { + copied.setRefreshing(true); + } + if (addDependencies) { + Arrays.stream(dependencies).forEach(copied::addParentReference); + } return copied; } @@ -468,7 +497,8 @@ private Table[] fetchConstituents(final boolean usePrev) { /** * Validate that {@code lhs} and {@code rhs} have compatible key columns, allowing - * {@link #partitionedTransform(PartitionedTable, BinaryOperator)}. Compute the matching pairs of key column names. + * {@link PartitionedTable#partitionedTransform partitionedTransform}. Compute the matching pairs of key column + * names. * * @param lhs The first partitioned table * @param rhs The second partitioned table diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java index 224ffe086d5..89e789cea6e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java @@ -13,20 +13,18 @@ import io.deephaven.api.updateby.UpdateByControl; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.liveness.LivenessArtifact; +import io.deephaven.engine.liveness.LivenessScopeStack; +import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.MatchPair; -import io.deephaven.engine.table.PartitionedTable; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.*; import io.deephaven.engine.table.impl.select.MatchFilter; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.select.SourceColumn; import io.deephaven.engine.table.impl.select.WhereFilter; import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzer; +import io.deephaven.engine.updategraph.NotificationQueue.Dependency; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.TableTools; -import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -182,12 +180,20 @@ private PartitionedTable.Proxy complexTransform( if (refreshingResults && joinMatches != null) { updateGraph.checkInitiateSerialTableOperation(); } - try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - return new PartitionedTableProxyImpl( - target.transform(context, ct -> transformer.apply(ct, otherTable), refreshingResults), - requireMatchingKeys, - sanityCheckJoins); - } + + final Dependency[] dependencies = otherTable.isRefreshing() + ? new Dependency[] {otherTable} + : new Dependency[0]; + + return ExecutionContext.getContext().withUpdateGraph(updateGraph).apply( + () -> new PartitionedTableProxyImpl( + target.transform( + context, + ct -> transformer.apply(ct, otherTable), + refreshingResults, + dependencies), + requireMatchingKeys, + sanityCheckJoins)); } if (other instanceof PartitionedTable.Proxy) { final PartitionedTable.Proxy otherProxy = (PartitionedTable.Proxy) other; @@ -197,32 +203,45 @@ private PartitionedTable.Proxy complexTransform( if (refreshingResults) { updateGraph.checkInitiateSerialTableOperation(); } - try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - - final MatchPair[] keyColumnNamePairs = PartitionedTableImpl.matchKeyColumns(target, otherTarget); - final DependentValidation uniqueKeys = requireMatchingKeys - ? matchingKeysValidation(target, otherTarget, keyColumnNamePairs) - : null; - final DependentValidation overlappingLhsJoinKeys = sanityCheckJoins && joinMatches != null - ? overlappingLhsJoinKeysValidation(target, joinMatches) - : null; - final DependentValidation overlappingRhsJoinKeys = sanityCheckJoins && joinMatches != null - ? overlappingRhsJoinKeysValidation(otherTarget, joinMatches) - : null; - - final Table validatedLhsTable = validated(target.table(), uniqueKeys, overlappingLhsJoinKeys); - final Table validatedRhsTable = validated(otherTarget.table(), uniqueKeys, overlappingRhsJoinKeys); - final PartitionedTable lhsToUse = maybeRewrap(validatedLhsTable, target); - final PartitionedTable rhsToUse = maybeRewrap(validatedRhsTable, otherTarget); - - return new PartitionedTableProxyImpl( - lhsToUse.partitionedTransform(rhsToUse, context, transformer, refreshingResults), - requireMatchingKeys, - sanityCheckJoins); - } + return ExecutionContext.getContext().withUpdateGraph(updateGraph) + .apply(() -> LivenessScopeStack.computeEnclosed( + () -> { + final MatchPair[] keyColumnNamePairs = + PartitionedTableImpl.matchKeyColumns(target, otherTarget); + final DependentValidation uniqueKeys = requireMatchingKeys + ? matchingKeysValidation(target, otherTarget, keyColumnNamePairs) + : null; + final DependentValidation overlappingLhsJoinKeys = + sanityCheckJoins && joinMatches != null + ? overlappingLhsJoinKeysValidation(target, joinMatches) + : null; + final DependentValidation overlappingRhsJoinKeys = + sanityCheckJoins && joinMatches != null + ? overlappingRhsJoinKeysValidation(otherTarget, joinMatches) + : null; + + final Table validatedLhsTable = + validated(target.table(), uniqueKeys, overlappingLhsJoinKeys); + final Table validatedRhsTable = + validated(otherTarget.table(), uniqueKeys, overlappingRhsJoinKeys); + final PartitionedTable lhsToUse = maybeRewrap(validatedLhsTable, target); + final PartitionedTable rhsToUse = maybeRewrap(validatedRhsTable, otherTarget); + + return new PartitionedTableProxyImpl( + lhsToUse.partitionedTransform(rhsToUse, context, transformer, + refreshingResults), + requireMatchingKeys, + sanityCheckJoins); + }, + () -> refreshingResults, + ptp -> refreshingResults)); } - throw new IllegalArgumentException("Unexpected TableOperations input " + other - + ", expected Table or PartitionedTable.Proxy"); + throw onUnexpectedTableOperations(other); + } + + private static IllegalArgumentException onUnexpectedTableOperations(@NotNull TableOperations other) { + return new IllegalArgumentException(String.format( + "Unexpected TableOperations input %s, expected Table or PartitionedTable.Proxy", other)); } /** @@ -246,7 +265,7 @@ private DependentValidation( private static Table validated( @NotNull final Table parent, - @NotNull final DependentValidation... dependentValidationsIn) { + final DependentValidation... dependentValidationsIn) { if (dependentValidationsIn.length == 0 || !parent.isRefreshing()) { return parent; } @@ -389,7 +408,8 @@ private static void checkOverlappingJoinKeys( } } - private static PartitionedTable maybeRewrap(@NotNull final Table table, @NotNull final PartitionedTable existing) { + private static PartitionedTable maybeRewrap(@NotNull final Table table, + @NotNull final PartitionedTable existing) { return table == existing.table() ? existing : new PartitionedTableImpl(table, existing.keyColumnNames(), existing.uniqueKeys(), @@ -553,11 +573,28 @@ public PartitionedTable.Proxy aggAllBy(AggSpec spec, ColumnName... groupByColumn public PartitionedTable.Proxy aggBy(Collection aggregations, boolean preserveEmpty, TableOperations initialGroups, Collection groupByColumns) { if (initialGroups == null) { - return basicTransform(true, ct -> ct.aggBy(aggregations, preserveEmpty, null, groupByColumns)); + return basicTransform( + true, + ct -> ct.aggBy(aggregations, preserveEmpty, null, groupByColumns)); + } + if (initialGroups instanceof Table) { + // Force a consistent view of initial groups table to be used for all current and future constituents + final Table initialGroupsTable = LivenessScopeStack.computeEnclosed( + () -> ((Table) initialGroups).selectDistinct(groupByColumns).snapshot(), + () -> ((Table) initialGroups).isRefreshing(), + Table::isRefreshing); + return basicTransform( + true, + ct -> ct.aggBy(aggregations, preserveEmpty, initialGroupsTable, groupByColumns)); + } + if (initialGroups instanceof PartitionedTable.Proxy) { + return complexTransform( + true, + initialGroups, + (ct, ot) -> ct.aggBy(aggregations, preserveEmpty, ot, groupByColumns), + null); } - return complexTransform(true, initialGroups, - (ct, ot) -> ct.aggBy(aggregations, preserveEmpty, ot, groupByColumns), - null); + throw onUnexpectedTableOperations(initialGroups); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java index 262d12c08c7..6a7cb5db1c1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/TableTransformationColumn.java @@ -21,11 +21,10 @@ import java.util.List; import java.util.Map; import java.util.function.Function; -import java.util.function.UnaryOperator; /** - * {@link SelectColumn} implementation to wrap transformer functions for - * {@link PartitionedTable#transform(UnaryOperator) transformations}. + * {@link SelectColumn} implementation to wrap transformer functions for {@link PartitionedTable#transform + * transformations}. */ public class TableTransformationColumn extends BaseTableTransformationColumn { diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index 13c3b14af50..a2fbaee6581 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -1035,13 +1035,26 @@ public void refreshUpdateSourceForUnitTests(@NotNull final Runnable updateSource */ @TestUseOnly public boolean flushOneNotificationForUnitTests() { + return flushOneNotificationForUnitTests(false); + } + + /** + * Flush a single notification from the UpdateGraph queue. Note that this happens on a simulated UpdateGraph run + * thread, rather than this thread. + * + * @param expectOnlyUnsatisfiedNotifications Whether we expect there to be only unsatisfied notifications pending + * @return whether a notification was found in the queue + */ + @TestUseOnly + public boolean flushOneNotificationForUnitTests(final boolean expectOnlyUnsatisfiedNotifications) { Assert.assertion(unitTestMode, "unitTestMode"); final NotificationProcessor existingNotificationProcessor = notificationProcessor; try { this.notificationProcessor = new ControlledNotificationProcessor(); // noinspection AutoUnboxing,AutoBoxing - return unitTestRefreshThreadPool.submit(this::flushOneNotificationForUnitTestsInternal).get(); + return unitTestRefreshThreadPool.submit( + () -> flushOneNotificationForUnitTestsInternal(expectOnlyUnsatisfiedNotifications)).get(); } catch (InterruptedException | ExecutionException e) { throw new UncheckedDeephavenException(e); } finally { @@ -1050,7 +1063,7 @@ public boolean flushOneNotificationForUnitTests() { } @TestUseOnly - public boolean flushOneNotificationForUnitTestsInternal() { + private boolean flushOneNotificationForUnitTestsInternal(final boolean expectOnlyUnsatisfiedNotifications) { final IntrusiveDoublyLinkedQueue pendingToEvaluate = new IntrusiveDoublyLinkedQueue<>(IntrusiveDoublyLinkedNode.Adapter.getInstance()); notificationProcessor.beforeNotificationsDrained(); @@ -1077,7 +1090,12 @@ public boolean flushOneNotificationForUnitTestsInternal() { } if (satisfied != null) { notificationProcessor.submit(satisfied); - } else if (somethingWasPending) { + if (expectOnlyUnsatisfiedNotifications) { + // noinspection ThrowableNotThrown + Assert.statementNeverExecuted( + "Flushed a notification in unit test mode, but expected only unsatisfied pending notifications"); + } + } else if (somethingWasPending && !expectOnlyUnsatisfiedNotifications) { // noinspection ThrowableNotThrown Assert.statementNeverExecuted( "Did not flush any notifications in unit test mode, yet there were outstanding notifications"); @@ -1113,7 +1131,7 @@ public Runnable flushAllNormalNotificationsForUnitTests(@NotNull final BooleanSu final Future flushJobFuture = unitTestRefreshThreadPool.submit(() -> { final long deadlineNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis); boolean flushed; - while ((flushed = flushOneNotificationForUnitTestsInternal()) || !done.getAsBoolean()) { + while ((flushed = flushOneNotificationForUnitTestsInternal(false)) || !done.getAsBoolean()) { if (!flushed) { final long remainingNanos = deadlineNanoTime - System.nanoTime(); if (!controlledNotificationProcessor.blockUntilNotificationAdded(remainingNanos)) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index b99e1efac38..22c45cca47f 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -7,8 +7,8 @@ import io.deephaven.api.SortColumn; import io.deephaven.api.agg.Partition; import io.deephaven.base.SleepUtil; +import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; -import io.deephaven.configuration.Configuration; import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.context.QueryScope; @@ -25,13 +25,14 @@ import io.deephaven.engine.testutil.generator.SetGenerator; import io.deephaven.engine.testutil.generator.SortedLongGenerator; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; +import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; -import io.deephaven.io.logger.StreamLoggerImpl; import io.deephaven.test.types.OutOfBandTest; import io.deephaven.util.SafeCloseable; -import io.deephaven.util.process.ProcessEnvironment; import junit.framework.TestCase; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableLong; import org.junit.experimental.categories.Category; @@ -43,6 +44,8 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; +import static io.deephaven.api.agg.Aggregation.AggLast; +import static io.deephaven.api.agg.Aggregation.AggSum; import static io.deephaven.engine.testutil.TstUtils.*; import static io.deephaven.engine.util.TableTools.*; import static org.assertj.core.api.Assertions.assertThat; @@ -215,6 +218,9 @@ public void testProxy() { rightTable.partitionedAggBy(List.of(), true, testTable(col("Sym", "aa", "bb", "cc", "dd")), "Sym"); final PartitionedTable.Proxy rightProxy = rightPT.proxy(false, false); + final Table initialKeys = newTable(col("Sym", "cc", "dd", "aa", "bb"), intCol("intCol", 0, 2, 3, 4)); + final PartitionedTable.Proxy initialKeysProxy = initialKeys.partitionBy("Sym").proxy(); + final EvalNuggetInterface[] en = new EvalNuggetInterface[] { new EvalNugget() { public Table e() { @@ -232,8 +238,21 @@ public Table e() { leftProxy.naturalJoin(rightTable.lastBy("Sym"), "Sym").target().merge().sort("K", "Sym")), new QueryTableTest.TableComparator(withK.naturalJoin(rightTable.lastBy("Sym"), "Sym").sort("K", "Sym"), leftProxy.naturalJoin(rightProxy.lastBy(), "Sym").target().merge().sort("K", "Sym")), + new QueryTableTest.TableComparator( + withK.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), "Sym", "intCol").sort("Sym", "intCol"), + leftProxy.aggBy(List.of(AggLast("Sym", "K"), AggSum("doubleCol")), "intCol").target().merge() + .moveColumnsUp("Sym").sort("Sym", "intCol")), + new QueryTableTest.TableComparator( + withK.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeys, + ColumnName.from("Sym", "intCol")).sort("Sym", "intCol"), + leftProxy.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeys, + ColumnName.from("Sym", "intCol")).target().merge().sort("Sym", "intCol")), + new QueryTableTest.TableComparator( + withK.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeys, + ColumnName.from("Sym", "intCol")).sort("Sym", "intCol"), + leftProxy.aggBy(List.of(AggLast("K"), AggSum("doubleCol")), false, initialKeysProxy, + ColumnName.from("Sym", "intCol")).target().merge().sort("Sym", "intCol")), }; - for (int i = 0; i < 100; i++) { simulateShiftAwareStep(size, random, table, columnInfo, en); } @@ -407,6 +426,71 @@ public void testDependencies() { }); } + public void testTransformDependencies() { + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final QueryTable sourceTable = testRefreshingTable(i(1, 2, 4, 6).toTracking(), + col("USym", "aa", "bb", "aa", "bb"), + col("Sentinel", 10, 20, 40, 60)); + + final QueryTable extraTable = testRefreshingTable(i(0).toTracking(), + col("Value", "0.2")); + final MutableBoolean extraParentSatisfied = new MutableBoolean(false); + final NotificationQueue.Dependency extraParentDependency = new NotificationQueue.Dependency() { + @Override + public boolean satisfied(long step) { + return extraParentSatisfied.booleanValue(); + } + + @Override + public UpdateGraph getUpdateGraph() { + return updateGraph; + } + + @Override + public LogOutput append(LogOutput logOutput) { + return logOutput.append("extra dependency"); + } + }; + extraTable.addParentReference(extraParentDependency); + + final PartitionedTable partitioned = sourceTable.partitionBy("USym"); + final PartitionedTable transformed = partitioned.transform(t -> t.join(extraTable), extraTable); + + // We need to flush one notification: one for the source table because we do not require an intermediate + // view table in this case + updateGraph.runWithinUnitTestCycle(() -> { + // Add "dd" to source + addToTable(sourceTable, i(8), col("USym", "dd"), col("Sentinel", 80)); + sourceTable.notifyListeners(i(8), i(), i()); + TestCase.assertTrue(updateGraph.flushOneNotificationForUnitTests()); + + // PartitionBy has processed "dd" + TestCase.assertTrue(partitioned.table().satisfied(updateGraph.clock().currentStep())); + TestCase.assertNotNull(partitioned.constituentFor("dd")); + + // Transform has not processed "dd" yet + TestCase.assertFalse(transformed.table().satisfied(updateGraph.clock().currentStep())); + TestCase.assertNull(transformed.constituentFor("dd")); + + // Flush the notification for transform's internal copy() of partitioned.table() + TestCase.assertTrue(updateGraph.flushOneNotificationForUnitTests()); + + // Add a row to extra + addToTable(extraTable, i(1), col("Value", "0.3")); + extraTable.notifyListeners(i(1), i(), i()); + TestCase.assertFalse(updateGraph.flushOneNotificationForUnitTests(true)); // Fail to update anything + + extraParentSatisfied.setTrue(); // Allow updates to propagate + updateGraph.flushAllNormalNotificationsForUnitTests(); + + TestCase.assertTrue(transformed.table().satisfied(updateGraph.clock().currentStep())); + final Table transformedDD = transformed.constituentFor("dd"); + TestCase.assertTrue(transformedDD.satisfied(updateGraph.clock().currentStep())); + TestCase.assertEquals(2, transformedDD.size()); + }); + } + public static class PauseHelper { private final long start = System.currentTimeMillis(); private volatile boolean released = false;