diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java
index 2d56e0a3370..12dc55fb52a 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BlinkTableTools.java
@@ -7,15 +7,13 @@
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
-import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
-import io.deephaven.engine.table.impl.remote.ConstructSnapshot;
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -26,21 +24,55 @@
* @see Table#BLINK_TABLE_ATTRIBUTE
*/
public class BlinkTableTools {
+ public static final Object DEFAULT_MEMO_KEY = new Object() {
+ @Override
+ public String toString() {
+ return "DEFAULT_MEMOIZATION_KEY";
+ }
+ };
/**
* Convert a Blink Table to an in-memory append only table.
*
- * Note, this table will grow without bound as new blink table rows are encountered.
+ * Note, this table will grow without bound as new blink table rows are encountered. The result is memoized under
+ * {@link #DEFAULT_MEMO_KEY}.
*
* @param blinkTable The input blink table
* @return An append-only in-memory table representing all data encountered in the blink table across all cycles
*/
public static Table blinkToAppendOnly(final Table blinkTable) {
- final UpdateGraph updateGraph = blinkTable.getUpdateGraph();
- try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
- // Setting the size limit as maximum allowed value
- return internalBlinkToAppendOnly(blinkTable, Long.MAX_VALUE);
- }
+ // Setting the size limit as maximum allowed value
+ return blinkToAppendOnly(blinkTable, Long.MAX_VALUE, DEFAULT_MEMO_KEY);
+ }
+
+ /**
+ * Convert a Blink Table to an in-memory append only table.
+ *
+ * Note, this table will grow without bound as new blink table rows are encountered.
+ *
+ * @param blinkTable The input blink table
+ * @param memoKey saves a weak reference to the result of the given operation under the given memoization key (null
+ * to disable memoization)
+ * @return An append-only in-memory table representing all data encountered in the blink table across all cycles
+ */
+ public static Table blinkToAppendOnly(@NotNull final Table blinkTable, @Nullable final Object memoKey) {
+ // Setting the size limit as maximum allowed value
+ return blinkToAppendOnly(blinkTable, Long.MAX_VALUE, memoKey);
+ }
+
+ /**
+ * Convert a Blink Table to an in-memory append only table with a limit on maximum size. Any updates beyond that
+ * limit won't be appended to the table.
+ *
+ * The result is memoized under {@link #DEFAULT_MEMO_KEY}.
+ *
+ * @param blinkTable The input blink table
+ * @param sizeLimit The maximum number of rows in the append-only table
+ * @return An append-only in-memory table representing all data encountered in the blink table across all cycles
+ * till maximum row count
+ */
+ public static Table blinkToAppendOnly(@NotNull final Table blinkTable, long sizeLimit) {
+ return blinkToAppendOnly(blinkTable, sizeLimit, DEFAULT_MEMO_KEY);
}
/**
@@ -49,127 +81,28 @@ public static Table blinkToAppendOnly(final Table blinkTable) {
*
* @param blinkTable The input blink table
* @param sizeLimit The maximum number of rows in the append-only table
+ * @param memoKey saves a weak reference to the result of the given operation under the given size limit and
+ * memoization key (null to disable memoization)
* @return An append-only in-memory table representing all data encountered in the blink table across all cycles
* till maximum row count
*/
- public static Table blinkToAppendOnly(final Table blinkTable, long sizeLimit) {
+ public static Table blinkToAppendOnly(
+ final Table blinkTable,
+ final long sizeLimit,
+ @Nullable final Object memoKey) {
if (sizeLimit < 0) {
throw new IllegalArgumentException("Size limit cannot be negative, limit=" + sizeLimit);
}
+ if (!isBlink(blinkTable)) {
+ throw new IllegalArgumentException("Input is not a blink table!");
+ }
final UpdateGraph updateGraph = blinkTable.getUpdateGraph();
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
- return internalBlinkToAppendOnly(blinkTable, sizeLimit);
+ final QueryTable coalesced = (QueryTable) (blinkTable.coalesce());
+ return coalesced.getResult(new BlinkToAppendOnlyOperation(coalesced, sizeLimit, memoKey));
}
}
- private static Table internalBlinkToAppendOnly(final Table blinkTable, long sizeLimit) {
- return QueryPerformanceRecorder.withNugget("blinkToAppendOnly", () -> {
- if (!isBlink(blinkTable)) {
- throw new IllegalArgumentException("Input is not a blink table!");
- }
-
- final BaseTable> baseBlinkTable = (BaseTable>) blinkTable.coalesce();
- final OperationSnapshotControl snapshotControl =
- baseBlinkTable.createSnapshotControlIfRefreshing(OperationSnapshotControl::new);
- // blink tables must tick
- Assert.neqNull(snapshotControl, "snapshotControl");
-
- final Mutable resultHolder = new MutableObject<>();
-
- ConstructSnapshot.callDataSnapshotFunction("blinkToAppendOnly", snapshotControl,
- (boolean usePrev, long beforeClockValue) -> {
- final Map> columns = new LinkedHashMap<>();
- final Map> columnSourceMap =
- baseBlinkTable.getColumnSourceMap();
- final int columnCount = columnSourceMap.size();
- final ColumnSource>[] sourceColumns = new ColumnSource[columnCount];
- final WritableColumnSource>[] destColumns = new WritableColumnSource[columnCount];
- int colIdx = 0;
- for (Map.Entry> nameColumnSourceEntry : columnSourceMap
- .entrySet()) {
- final ColumnSource> existingColumn = nameColumnSourceEntry.getValue();
- final WritableColumnSource> newColumn = ArrayBackedColumnSource.getMemoryColumnSource(
- 0, existingColumn.getType(), existingColumn.getComponentType());
- columns.put(nameColumnSourceEntry.getKey(), newColumn);
- // for the source columns, we would like to read primitives instead of objects in cases
- // where it is possible
- sourceColumns[colIdx] = ReinterpretUtils.maybeConvertToPrimitive(existingColumn);
- // for the destination sources, we know they are array backed sources that will actually
- // store primitives and we can fill efficiently
- destColumns[colIdx++] =
- (WritableColumnSource>) ReinterpretUtils.maybeConvertToPrimitive(newColumn);
- }
-
- final RowSet baseRowSet =
- (usePrev ? baseBlinkTable.getRowSet().prev() : baseBlinkTable.getRowSet());
- final RowSet useRowSet;
- if (baseRowSet.size() > sizeLimit) {
- useRowSet = baseRowSet.subSetByPositionRange(0, sizeLimit);
- } else {
- useRowSet = baseRowSet;
- }
- final TrackingWritableRowSet rowSet = RowSetFactory.flat(useRowSet.size()).toTracking();
- ChunkUtils.copyData(sourceColumns, useRowSet, destColumns, rowSet, usePrev);
-
- final QueryTable result = new QueryTable(rowSet, columns);
- result.setRefreshing(true);
- result.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, true);
- result.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
- result.setFlat();
- resultHolder.setValue(result);
-
- Assert.leq(result.size(), "result.size()", sizeLimit, "sizeLimit");
-
- snapshotControl.setListenerAndResult(new BaseTable.ListenerImpl("streamToAppendOnly",
- baseBlinkTable, result) {
- @Override
- public void onUpdate(TableUpdate upstream) {
- if (upstream.modified().isNonempty() || upstream.shifted().nonempty()) {
- throw new IllegalArgumentException("Blink tables should not modify or shift!");
- }
- long newRowsSize = upstream.added().size();
- if (newRowsSize == 0) {
- return;
- }
- RowSet subsetAdded = null;
- final long currentSize = rowSet.size();
- if (currentSize + newRowsSize >= sizeLimit) {
- newRowsSize = (sizeLimit - currentSize);
- subsetAdded = upstream.added().subSetByPositionRange(0, newRowsSize);
- }
- final long totalSize = currentSize + newRowsSize;
- columns.values().forEach(c -> c.ensureCapacity(totalSize));
- final RowSet newRange = RowSetFactory.fromRange(currentSize, totalSize - 1);
-
- try (final SafeCloseable ignored = subsetAdded) {
- final RowSet newRowSet = (subsetAdded == null) ? upstream.added() : subsetAdded;
- ChunkUtils.copyData(sourceColumns, newRowSet, destColumns, newRange, false);
- }
- rowSet.insertRange(currentSize, totalSize - 1);
- Assert.leq(totalSize, "totalSize", sizeLimit, "sizeLimit");
-
- final TableUpdateImpl downstream = new TableUpdateImpl();
- downstream.added = newRange;
- downstream.modified = RowSetFactory.empty();
- downstream.removed = RowSetFactory.empty();
- downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
- downstream.shifted = RowSetShiftData.EMPTY;
- result.notifyListeners(downstream);
-
- if (totalSize == sizeLimit) {
- // No more rows can be appended, so remove the listener and remove all references
- forceReferenceCountToZero();
- }
- }
- }, result);
-
- return true;
- });
-
- return resultHolder.getValue();
- });
- }
-
/**
* Returns true if table is a blink table.
*
@@ -183,4 +116,133 @@ public static boolean isBlink(Table table) {
}
return Boolean.TRUE.equals(table.getAttribute(Table.BLINK_TABLE_ATTRIBUTE));
}
+
+ private static class BlinkToAppendOnlyOperation implements QueryTable.MemoizableOperation {
+ private final QueryTable parent;
+ private final long sizeLimit;
+ private final Object memoKey;
+ private final ColumnSource>[] sourceColumns;
+ private final WritableColumnSource>[] destColumns;
+
+ private QueryTable resultTable;
+ private BaseTable.ListenerImpl resultListener;
+
+ private BlinkToAppendOnlyOperation(
+ @NotNull final QueryTable parent,
+ final long sizeLimit,
+ @Nullable final Object memoKey) {
+ this.parent = parent;
+ this.sizeLimit = sizeLimit;
+ this.memoKey = memoKey;
+
+ this.sourceColumns = new ColumnSource>[parent.numColumns()];
+ this.destColumns = new WritableColumnSource>[parent.numColumns()];
+ }
+
+ @Override
+ public String getDescription() {
+ final String sizeLimitStr = sizeLimit == Long.MAX_VALUE ? "unbounded" : Long.toString(sizeLimit);
+ final String memoKeyStr = memoKey == null ? "none" : memoKey.toString();
+ return String.format("BlinkTableTools.blinkToAppendOnly(%s, %s)", sizeLimitStr, memoKeyStr);
+ }
+
+ @Override
+ public String getLogPrefix() {
+ return "BlinkTableTools.blinkToAppendOnly";
+ }
+
+ @Override
+ public MemoizedOperationKey getMemoizedOperationKey() {
+ return memoKey == null ? null : MemoizedOperationKey.blinkToAppendOnly(sizeLimit, memoKey);
+ }
+
+ @Override
+ public Result initialize(boolean usePrev, long beforeClock) {
+ final Map> parentColumns = parent.getColumnSourceMap();
+ final Map> resultColumns = new LinkedHashMap<>(parentColumns.size());
+
+ // note that we do not need to enable prev tracking for an add-only table
+ int colIdx = 0;
+ for (Map.Entry> sourceEntry : parentColumns
+ .entrySet()) {
+ final ColumnSource> sourceColumn = sourceEntry.getValue();
+ final WritableColumnSource> newColumn = ArrayBackedColumnSource.getMemoryColumnSource(
+ 0, sourceColumn.getType(), sourceColumn.getComponentType());
+ resultColumns.put(sourceEntry.getKey(), newColumn);
+
+ // read and write primitives whenever possible
+ sourceColumns[colIdx] = ReinterpretUtils.maybeConvertToPrimitive(sourceColumn);
+ destColumns[colIdx++] = ReinterpretUtils.maybeConvertToWritablePrimitive(newColumn);
+ }
+
+ final RowSet rowSetToUse = usePrev ? parent.getRowSet().prev() : parent.getRowSet();
+ final RowSet initialRowSet = rowSetToUse.isEmpty()
+ ? RowSetFactory.empty()
+ : appendRows(0, rowSetToUse, usePrev);
+ resultTable = new QueryTable(initialRowSet.writableCast().toTracking(), resultColumns);
+ resultTable.setRefreshing(true);
+ resultTable.setAttribute(Table.ADD_ONLY_TABLE_ATTRIBUTE, true);
+ resultTable.setAttribute(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true);
+ resultTable.setFlat();
+
+ if (resultTable.size() < sizeLimit) {
+ resultListener = new BaseTable.ListenerImpl(getDescription(), parent, resultTable) {
+ @Override
+ public void onUpdate(TableUpdate upstream) {
+ BlinkToAppendOnlyOperation.this.onUpdate(upstream);
+ }
+ };
+ }
+
+ return new Result<>(resultTable, resultListener);
+ }
+
+ private void onUpdate(final TableUpdate upstream) {
+ if (upstream.modified().isNonempty() || upstream.shifted().nonempty()) {
+ throw new IllegalStateException("Blink tables should not modify or shift!");
+ }
+ if (upstream.added().isEmpty()) {
+ return;
+ }
+
+ final TableUpdateImpl downstream = new TableUpdateImpl();
+ downstream.added = appendRows(resultTable.size(), upstream.added(), false);
+ Assert.eqTrue(downstream.added.isNonempty(), "downstream.added.isNonempty()");
+ resultTable.getRowSet().writableCast().insertRange(
+ downstream.added.firstRowKey(), downstream.added.lastRowKey());
+ downstream.modified = RowSetFactory.empty();
+ downstream.removed = RowSetFactory.empty();
+ downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
+ downstream.shifted = RowSetShiftData.EMPTY;
+ resultTable.notifyListeners(downstream);
+
+ if (resultTable.size() == sizeLimit) {
+ // No more rows can be appended, so remove the listener and remove all references
+ resultListener.forceReferenceCountToZero();
+ resultListener = null;
+ }
+ }
+
+ private RowSet appendRows(final long currentSize, final RowSet newRows, final boolean usePrev) {
+ long newRowsSize = newRows.size();
+ Assert.gtZero(newRowsSize, "newRowsSize");
+ RowSet rowsToAdd = null;
+ if (currentSize > sizeLimit - newRowsSize) {
+ newRowsSize = (sizeLimit - currentSize);
+ rowsToAdd = newRows.subSetByPositionRange(0, newRowsSize);
+ }
+ final long totalSize = currentSize + newRowsSize;
+ final RowSet newRange = RowSetFactory.fromRange(currentSize, totalSize - 1);
+
+ try (final SafeCloseable ignored = rowsToAdd) {
+ if (rowsToAdd == null) {
+ rowsToAdd = newRows;
+ }
+ ChunkUtils.copyData(sourceColumns, rowsToAdd, destColumns, newRange, usePrev);
+ }
+ Assert.leq(totalSize, "totalSize", sizeLimit, "sizeLimit");
+
+ return newRange;
+ }
+ }
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/MemoizedOperationKey.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/MemoizedOperationKey.java
index 63de1d0389e..1aa88d6a160 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/MemoizedOperationKey.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/MemoizedOperationKey.java
@@ -118,6 +118,10 @@ public static MemoizedOperationKey rollup(Collection extends Aggregation> aggr
includeConstituents);
}
+ public static MemoizedOperationKey blinkToAppendOnly(final long sizeLimit, @NotNull final Object key) {
+ return new BlinkToAppendOnly(sizeLimit, key);
+ }
+
private static boolean isMemoizable(SelectColumn[] selectColumn) {
return Arrays.stream(selectColumn)
.allMatch(sc -> sc instanceof SourceColumn || sc instanceof ReinterpretedColumn);
@@ -540,7 +544,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}
- public static WouldMatch wouldMatch(WouldMatchPair... pairs) {
+ public static MemoizedOperationKey wouldMatch(WouldMatchPair... pairs) {
return new WouldMatch(pairs);
}
@@ -592,7 +596,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}
- public static CrossJoin crossJoin(final Table rightTable, final MatchPair[] columnsToMatch,
+ public static MemoizedOperationKey crossJoin(final Table rightTable, final MatchPair[] columnsToMatch,
final MatchPair[] columnsToAdd, final int numRightBitsToReserve) {
return new CrossJoin(rightTable, columnsToMatch, columnsToAdd, numRightBitsToReserve);
}
@@ -650,7 +654,7 @@ BaseTable.CopyAttributeOperation copyType() {
}
}
- public static RangeJoin rangeJoin(
+ public static MemoizedOperationKey rangeJoin(
@NotNull final Table rightTable,
@NotNull final Collection extends JoinMatch> exactMatches,
@NotNull final RangeJoinMatch rangeMatch,
@@ -672,4 +676,38 @@ protected static boolean equalWeakRefsByReferentIdentity(final WeakReference>
}
return t1 == t2;
}
+
+ private static class BlinkToAppendOnly extends AttributeAgnosticMemoizedOperationKey {
+ private final long sizeLimit;
+ private final Object key;
+
+ private BlinkToAppendOnly(final long sizeLimit, @NotNull final Object key) {
+ this.sizeLimit = sizeLimit;
+ this.key = Objects.requireNonNull(key);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ final BlinkToAppendOnly blinkToAppendOnly = (BlinkToAppendOnly) other;
+
+ return sizeLimit == blinkToAppendOnly.sizeLimit && key.equals(blinkToAppendOnly.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * key.hashCode() + Long.hashCode(sizeLimit);
+ }
+
+ @Override
+ BaseTable.CopyAttributeOperation copyType() {
+ return BaseTable.CopyAttributeOperation.None;
+ }
+ }
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControl.java
index f2ba3febdb5..f63cfa42e10 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControl.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControl.java
@@ -105,9 +105,6 @@ public synchronized boolean snapshotCompletedConsistently(
final boolean usedPreviousValues) {
final boolean snapshotConsistent;
if (isInInitialNotificationWindow()) {
- if (eventualListener == null) {
- throw new IllegalStateException("Listener has not been set on end!");
- }
if (eventualResult == null) {
throw new IllegalStateException("Result has not been set on end!");
}
@@ -132,7 +129,7 @@ public synchronized boolean snapshotCompletedConsistently(
// Be sure to record initial last notification step before subscribing
eventualResult.setLastNotificationStep(lastNotificationStep);
- return subscribeForUpdates(eventualListener);
+ return eventualListener == null || subscribeForUpdates(eventualListener);
}
/**
@@ -160,7 +157,7 @@ boolean subscribeForUpdates(@NotNull final TableUpdateListener listener) {
* @param resultTable The table that will result from this operation
*/
public synchronized void setListenerAndResult(
- @NotNull final TableUpdateListener listener,
+ final TableUpdateListener listener,
@NotNull final NotificationStepReceiver resultTable) {
eventualListener = listener;
eventualResult = resultTable;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControlEx.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControlEx.java
index 654f630f47a..93d02418885 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControlEx.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationSnapshotControlEx.java
@@ -100,7 +100,7 @@ public synchronized boolean snapshotCompletedConsistently(long afterClockValue,
}
@Override
- public synchronized void setListenerAndResult(@NotNull final TableUpdateListener listener,
+ public synchronized void setListenerAndResult(final TableUpdateListener listener,
@NotNull final NotificationStepReceiver resultTable) {
super.setListenerAndResult(listener, resultTable);
if (DEBUG) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
index 5301eb54b48..53e6dc4f776 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java
@@ -116,15 +116,19 @@ default boolean snapshotNeeded() {
*/
class Result {
public final T resultNode;
- public final TableUpdateListener resultListener; // may be null if parent is non-ticking
+ /**
+ * The listener that should be attached to the parent. The listener may be null if the table does not need
+ * to respond to ticks from other sources (e.g. the parent is non-refreshing).
+ */
+ public final TableUpdateListener resultListener;
public Result(@NotNull final T resultNode) {
this(resultNode, null);
}
/**
- * Construct the result of an operation. The listener may be null if the parent is non-ticking and the table
- * does not need to respond to ticks from other sources.
+ * Construct the result of an operation. The listener may be null if the table does not need to respond to
+ * ticks from other sources (e.g. the parent is non-refreshing).
*
* @param resultNode the result of the operation
* @param resultListener the listener that should be attached to the parent (or null)
@@ -3537,8 +3541,7 @@ private T getResultNoMemo(fin
resultTable.setValue(result.resultNode);
if (snapshotControl != null) {
- snapshotControl.setListenerAndResult(Require.neqNull(result.resultListener, "resultListener"),
- result.resultNode);
+ snapshotControl.setListenerAndResult(result.resultListener, result.resultNode);
}
return true;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java
index e55dbcc3c8e..efc298559f2 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/EngineMetrics.java
@@ -29,6 +29,7 @@ public class EngineMetrics {
private static final Logger log = LoggerFactory.getLogger(EngineMetrics.class);
private static final boolean STATS_LOGGING_ENABLED = Configuration.getInstance().getBooleanWithDefault(
"statsLoggingEnabled", true);
+
private static volatile ProcessInfo PROCESS_INFO;
private static volatile EngineMetrics ENGINE_METRICS;
@@ -105,7 +106,9 @@ public QueryTable getProcessInfoQueryTable() {
}
public QueryTable getProcessMetricsQueryTable() {
- return statsImpl == null ? null : (QueryTable) BlinkTableTools.blinkToAppendOnly(statsImpl.blinkTable());
+ return statsImpl == null
+ ? null
+ : (QueryTable) BlinkTableTools.blinkToAppendOnly(statsImpl.blinkTable());
}
private StatsIntradayLogger getStatsLogger() {
diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
index 447243b0f0c..13c3b14af50 100644
--- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
+++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java
@@ -934,13 +934,28 @@ public void markSourcesRefreshedForUnitTests() {
*/
@TestUseOnly
public void completeCycleForUnitTests() {
+ completeCycleForUnitTests(false);
+ }
+
+ /**
+ * Do the second half of the update cycle, including flushing notifications, and completing the
+ * {@link LogicalClockImpl#completeUpdateCycle() LogicalClock} update cycle. Note that this happens on a simulated
+ * UpdateGraph run thread, rather than this thread.
+ *
+ * @param errorCaughtAndInFinallyBlock Whether an error was caught, and we are in a {@code finally} block
+ */
+ private void completeCycleForUnitTests(boolean errorCaughtAndInFinallyBlock) {
Assert.assertion(unitTestMode, "unitTestMode");
- Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
- "logicalClock.currentStep()");
+ if (!errorCaughtAndInFinallyBlock) {
+ Assert.eq(sourcesLastSatisfiedStep, "sourcesLastSatisfiedStep", logicalClock.currentStep(),
+ "logicalClock.currentStep()");
+ }
try {
unitTestRefreshThreadPool.submit(this::completeCycleForUnitTestsInternal).get();
} catch (InterruptedException | ExecutionException e) {
- throw new UncheckedDeephavenException(e);
+ if (!errorCaughtAndInFinallyBlock) {
+ throw new UncheckedDeephavenException(e);
+ }
}
}
@@ -986,10 +1001,14 @@ public void runWithinUnitTestCycle(
final boolean sourcesSatisfied)
throws T {
startCycleForUnitTests(sourcesSatisfied);
+ boolean errorCaught = false;
try {
runnable.run();
+ } catch (final Throwable err) {
+ errorCaught = true;
+ throw err;
} finally {
- completeCycleForUnitTests();
+ completeCycleForUnitTests(errorCaught);
}
}
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestBlinkTableTools.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestBlinkTableTools.java
index a171f2640e9..ccfcff88b50 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestBlinkTableTools.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestBlinkTableTools.java
@@ -4,7 +4,9 @@
package io.deephaven.engine.table.impl;
import io.deephaven.engine.context.ExecutionContext;
+import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSet;
+import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
@@ -12,11 +14,14 @@
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.util.TableTools;
+import io.deephaven.util.SafeCloseable;
import junit.framework.TestCase;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
import static io.deephaven.engine.util.TableTools.*;
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
@@ -45,11 +50,11 @@ public void testBlinkToAppendOnlyTable() {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
updateGraph.runWithinUnitTestCycle(() -> {
- RowSet removed1 = blinkTable.getRowSet().copyPrev();
+ RowSet removed = blinkTable.getRowSet().copyPrev();
((WritableRowSet) blinkTable.getRowSet()).clear();
TstUtils.addToTable(blinkTable, i(7), intCol("I", 1), doubleCol("D", Math.PI), instantCol("DT", dt2),
col("B", true));
- blinkTable.notifyListeners(i(7), removed1, i());
+ blinkTable.notifyListeners(i(7), removed, i());
});
assertTableEquals(TableTools.newTable(intCol("I", 7, 1), doubleCol("D", Double.NEGATIVE_INFINITY, Math.PI),
@@ -68,4 +73,203 @@ public void testBlinkToAppendOnlyTable() {
appendOnly);
}
+ @Test
+ public void testBlinkTableHasPrevData() throws InterruptedException {
+ // we need to tell the UG that we intend to modify source tables mid-cycle
+ final boolean sourceTablesAlreadySatisfied = false;
+
+ final AtomicReference result = new AtomicReference<>();
+ final QueryTable blinkSource = createBlinkTableForConcurrentInstantiationTests();
+ final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
+
+ updateGraph.runWithinUnitTestCycle(() -> {
+ final RowSet added = RowSetFactory.fromRange(10, 19);
+ final RowSet removed = blinkSource.getRowSet().copyPrev();
+ ((WritableRowSet) blinkSource.getRowSet()).update(added, removed);
+
+ createResultOffThread(blinkSource, result, Long.MAX_VALUE);
+
+ TstUtils.assertTableEquals(TableTools.emptyTable(10).select("K = k"), result.get());
+ blinkSource.notifyListeners(added, removed, i());
+ updateGraph.markSourcesRefreshedForUnitTests();
+ }, sourceTablesAlreadySatisfied);
+
+ TstUtils.assertTableEquals(TableTools.emptyTable(20).select("K = k"), result.get());
+ }
+
+ @Test
+ public void testBlinkTableHasCurrData() throws InterruptedException {
+ // we need to tell the UG that we intend to modify source tables mid-cycle
+ final boolean sourceTablesAlreadySatisfied = false;
+
+ final AtomicReference result = new AtomicReference<>();
+ final QueryTable blinkSource = createBlinkTableForConcurrentInstantiationTests();
+ final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
+
+ updateGraph.runWithinUnitTestCycle(() -> {
+ final RowSet added = RowSetFactory.fromRange(10, 19);
+ final RowSet removed = blinkSource.getRowSet().copyPrev();
+ ((WritableRowSet) blinkSource.getRowSet()).update(added, removed);
+
+ blinkSource.notifyListeners(added, removed, i());
+ updateGraph.markSourcesRefreshedForUnitTests();
+
+ createResultOffThread(blinkSource, result, Long.MAX_VALUE);
+ TstUtils.assertTableEquals(TableTools.emptyTable(10).select("K = k + 10"), result.get());
+ }, sourceTablesAlreadySatisfied);
+
+ TstUtils.assertTableEquals(TableTools.emptyTable(10).select("K = k + 10"), result.get());
+ }
+
+ @Test
+ public void testSizeLimitDuringInstantiation() throws InterruptedException {
+ // we need to tell the UG that we intend to modify source tables mid-cycle
+ final boolean sourceTablesAlreadySatisfied = false;
+
+ final AtomicReference result = new AtomicReference<>();
+ final QueryTable blinkSource = createBlinkTableForConcurrentInstantiationTests();
+ final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
+
+ updateGraph.runWithinUnitTestCycle(() -> {
+ final RowSet added = RowSetFactory.fromRange(10, 19);
+ final RowSet removed = blinkSource.getRowSet().copyPrev();
+ ((WritableRowSet) blinkSource.getRowSet()).update(added, removed);
+
+ createResultOffThread(blinkSource, result, 8);
+
+ TstUtils.assertTableEquals(TableTools.emptyTable(8).select("K = k"), result.get());
+ blinkSource.notifyListeners(added, removed, i());
+ updateGraph.markSourcesRefreshedForUnitTests();
+ }, sourceTablesAlreadySatisfied);
+
+ // ensure that the append only table did not tick after instantiation
+ Assert.assertTrue(result.get().getLastNotificationStep() < blinkSource.getLastNotificationStep());
+ TstUtils.assertTableEquals(TableTools.emptyTable(8).select("K = k"), result.get());
+ }
+
+ @Test
+ public void testSizeLimitDuringUpdate() throws InterruptedException {
+ // we need to tell the UG that we intend to modify source tables mid-cycle
+ final boolean sourceTablesAlreadySatisfied = false;
+
+ final AtomicReference result = new AtomicReference<>();
+ final QueryTable blinkSource = createBlinkTableForConcurrentInstantiationTests();
+ final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
+
+ updateGraph.runWithinUnitTestCycle(() -> {
+ final RowSet added = RowSetFactory.fromRange(10, 14);
+ final RowSet removed = blinkSource.getRowSet().copyPrev();
+ ((WritableRowSet) blinkSource.getRowSet()).update(added, removed);
+
+ createResultOffThread(blinkSource, result, 15);
+
+ TstUtils.assertTableEquals(TableTools.emptyTable(10).select("K = k"), result.get());
+ blinkSource.notifyListeners(added, removed, i());
+ updateGraph.markSourcesRefreshedForUnitTests();
+ }, sourceTablesAlreadySatisfied);
+
+ final long lastNotificationStep = result.get().getLastNotificationStep();
+ TstUtils.assertTableEquals(TableTools.emptyTable(15).select("K = k"), result.get());
+
+ updateGraph.runWithinUnitTestCycle(() -> {
+ final RowSet added = RowSetFactory.fromRange(15, 19);
+ final RowSet removed = blinkSource.getRowSet().copyPrev();
+ ((WritableRowSet) blinkSource.getRowSet()).update(added, removed);
+
+ blinkSource.notifyListeners(added, removed, i());
+ updateGraph.markSourcesRefreshedForUnitTests();
+ }, sourceTablesAlreadySatisfied);
+
+ // ensure that the append only table did not tick
+ Assert.assertEquals(lastNotificationStep, result.get().getLastNotificationStep());
+ TstUtils.assertTableEquals(TableTools.emptyTable(15).select("K = k"), result.get());
+ }
+
+ @Test
+ public void testMemoKey() {
+ // Note that EngineCleanup will clean this change during tearDown.
+ QueryTable.setMemoizeResults(true);
+
+ final QueryTable blinkSource = TstUtils.testRefreshingTable(RowSetFactory.empty().toTracking());
+ blinkSource.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true);
+
+ final Object memoKey = new Object();
+ final Object otherMemoKey = new Object();
+
+ final Table r1;
+ try (final SafeCloseable ignored = LivenessScopeStack.open()) {
+ r1 = BlinkTableTools.blinkToAppendOnly(blinkSource);
+ final Table r2 = BlinkTableTools.blinkToAppendOnly(blinkSource);
+ Assert.assertSame(r1, r2);
+
+ // test memo key
+ final Table r_memo = BlinkTableTools.blinkToAppendOnly(blinkSource, memoKey);
+ Assert.assertNotSame(r1, r_memo);
+
+ // test another memo key
+ final Table r_other_memo = BlinkTableTools.blinkToAppendOnly(blinkSource, otherMemoKey);
+ Assert.assertNotSame(r1, r_other_memo);
+ Assert.assertNotSame(r_memo, r_other_memo);
+
+ // test different size limit
+ final Table r_sz = BlinkTableTools.blinkToAppendOnly(blinkSource, 32);
+ Assert.assertNotSame(r1, r_sz);
+
+ // test reuse different size limit
+ final Table r_sz_2 = BlinkTableTools.blinkToAppendOnly(blinkSource, 32);
+ Assert.assertSame(r_sz, r_sz_2);
+
+ // test memo key different size limit
+ final Table r_sz_memo = BlinkTableTools.blinkToAppendOnly(blinkSource, 32, memoKey);
+ Assert.assertNotSame(r_sz, r_sz_memo);
+
+ // test another memo key
+ final Table r_sz_other_memo = BlinkTableTools.blinkToAppendOnly(blinkSource, 32, otherMemoKey);
+ Assert.assertNotSame(r_sz, r_sz_other_memo);
+ Assert.assertNotSame(r_sz_memo, r_sz_other_memo);
+
+ // test null memo key
+ final Table r_null = BlinkTableTools.blinkToAppendOnly(blinkSource, null);
+ Assert.assertNotSame(r1, r_null);
+
+ // test that null memo key is not memoized
+ final Table r_null_2 = BlinkTableTools.blinkToAppendOnly(blinkSource, null);
+ Assert.assertNotSame(r1, r_null);
+ Assert.assertNotSame(r_null, r_null_2);
+ }
+
+ // test that it was memoized only until the end of the liveness scope
+ Assert.assertFalse(r1.tryRetainReference());
+ final Table r2 = BlinkTableTools.blinkToAppendOnly(blinkSource);
+ Assert.assertNotSame(r1, r2);
+ }
+
+ private static QueryTable createBlinkTableForConcurrentInstantiationTests() {
+ final long[] colData = new long[20];
+ for (int ii = 0; ii < colData.length; ++ii) {
+ colData[ii] = ii;
+ }
+ final QueryTable blinkSource = TstUtils.testRefreshingTable(RowSetFactory.flat(colData.length).toTracking(),
+ longCol("K", colData));
+ blinkSource.setAttribute(Table.BLINK_TABLE_ATTRIBUTE, true);
+ blinkSource.getRowSet().writableCast().clear();
+ blinkSource.getRowSet().writableCast().insert(RowSetFactory.flat(10));
+ blinkSource.getRowSet().writableCast().initializePreviousValue();
+ return blinkSource;
+ }
+
+ private static void createResultOffThread(
+ final QueryTable blinkSource,
+ final AtomicReference result,
+ final long sizeLimit) throws InterruptedException {
+ final ExecutionContext execContext = ExecutionContext.getContext();
+ final Thread thread = new Thread(() -> {
+ try (final SafeCloseable ignored = execContext.open()) {
+ result.set((QueryTable) BlinkTableTools.blinkToAppendOnly(blinkSource, sizeLimit));
+ }
+ });
+ thread.start();
+ // don't forget to make this timeout generous when debugging
+ thread.join(500);
+ }
}