diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java index ffe6dde5a3f..71425e7e234 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageBlinkTable.java @@ -48,8 +48,8 @@ protected BarrageBlinkTable( final LinkedHashMap> columns, final WritableColumnSource[] writableSources, final Map attributes, - final long initialViewPortRows) { - super(registrar, notificationQueue, executorService, columns, writableSources, attributes, initialViewPortRows); + @Nullable final ViewportChangedCallback vpCallback) { + super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback); } private void processUpdate(final BarrageMessage update) { @@ -73,9 +73,7 @@ private void processUpdate(final BarrageMessage update) { } if (update.isSnapshot) { - serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy(); - serverReverseViewport = update.snapshotRowSetIsReversed; - serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone(); + updateServerViewport(update.snapshotRowSet, update.snapshotColumns, update.snapshotRowSetIsReversed); } if (update.shifted.nonempty()) { @@ -127,10 +125,6 @@ private void processUpdate(final BarrageMessage update) { } } - private boolean isSubscribedColumn(int i) { - return serverColumns == null || serverColumns.get(i); - } - private void ensureCapacity(long size) { if (capacity < size) { capacity = Long.highestOneBit(Math.max(size * 2, 8)); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java index c68faeeb3ce..858862712e7 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageRedirectedTable.java @@ -55,8 +55,8 @@ protected BarrageRedirectedTable(final UpdateSourceRegistrar registrar, final WritableColumnSource[] writableSources, final WritableRowRedirection rowRedirection, final Map attributes, - final long initialViewPortRows) { - super(registrar, notificationQueue, executorService, columns, writableSources, attributes, initialViewPortRows); + @Nullable final ViewportChangedCallback vpCallback) { + super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback); this.rowRedirection = rowRedirection; } @@ -90,9 +90,7 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC } if (update.isSnapshot) { - serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy(); - serverReverseViewport = update.snapshotRowSetIsReversed; - serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone(); + updateServerViewport(update.snapshotRowSet, update.snapshotColumns, update.snapshotRowSetIsReversed); } // make sure that these RowSet updates make some sense compared with each other, and our current view of the @@ -100,6 +98,9 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC final WritableRowSet currentRowSet = getRowSet().writableCast(); final boolean mightBeInitialSnapshot = currentRowSet.isEmpty() && update.isSnapshot; + final RowSet serverViewport = getServerViewport(); + final boolean serverReverseViewport = getServerReverseViewport(); + try (final RowSet currRowsFromPrev = currentRowSet.copy(); final WritableRowSet populatedRows = (serverViewport != null @@ -249,10 +250,6 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC } } - private boolean isSubscribedColumn(int i) { - return serverColumns == null || serverColumns.get(i); - } - private RowSet getFreeRows(long size) { if (size <= 0) { return RowSetFactory.empty(); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 43f728b0773..745cbf6ee4a 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -55,6 +55,33 @@ */ public abstract class BarrageTable extends QueryTable implements BarrageMessage.Listener { + public interface ViewportChangedCallback { + /** + * Called when the viewport has changed. Note that the server may send many viewport changes for a single + * request; as the server may choose to expand the viewport slowly to avoid update-graph lock contention. + * + * @param rowSet the new position space viewport - is null if the server is now respecting a full subscription + * @param columns the columns that are included in the viewport - is null if all columns are subscribed + * @param reverse whether the viewport is reversed - a reversed viewport + * + * @return true to continue to receive viewport changes, false to stop receiving viewport changes + */ + boolean viewportChanged(@Nullable RowSet rowSet, @Nullable BitSet columns, boolean reverse); + + /** + * Called when there is an unexpected error. Both remote and local failures will be reported. Once a failure + * occurs, this barrage table will stop receiving and processing updates from the remote server. + * + * @param t the error + */ + void onError(Throwable t); + + /** + * Called when the subscription is closed; will not be invoked after an onError. + */ + void onClose(); + } + public static final boolean DEBUG_ENABLED = Configuration.getInstance().getBooleanWithDefault("BarrageTable.debug", false); @@ -76,11 +103,6 @@ public abstract class BarrageTable extends QueryTable implements BarrageMessage. /** unsubscribed must never be reset to false once it has been set to true */ private volatile boolean unsubscribed = false; - /** sealed must never be reset to false once it has been set to true */ - private volatile boolean sealed = false; - /** the callback to run once sealing is complete */ - private Runnable onSealRunnable = null; - private Runnable onSealFailure = null; /** * The client and the server update asynchronously with respect to one another. The client requests a viewport, the @@ -93,9 +115,15 @@ public abstract class BarrageTable extends QueryTable implements BarrageMessage. * the server assumes that the client has maintained its state prior to these server-side viewport acks and will not * re-send data that the client should already have within the existing viewport. */ - protected RowSet serverViewport; - protected boolean serverReverseViewport; - protected BitSet serverColumns; + private RowSet serverViewport; + private BitSet serverColumns; + private boolean serverReverseViewport; + + /** + * A batch of updates may change the viewport more than once, but we cannot deliver until the updates have been + * propagated to this BarrageTable and its last notification step has been updated. + */ + private final ArrayDeque pendingVpChangeNotifications = new ArrayDeque<>(); /** synchronize access to pendingUpdates */ private final Object pendingUpdatesLock = new Object(); @@ -119,13 +147,20 @@ public abstract class BarrageTable extends QueryTable implements BarrageMessage. private final SourceRefresher refresher; + /** + * Used to notify a listener that the viewport has changed. This is typically used by the caller to know when the + * server has acknowledged a viewport change request. + */ + @Nullable + private ViewportChangedCallback viewportChangedCallback; + protected BarrageTable(final UpdateSourceRegistrar registrar, final NotificationQueue notificationQueue, @Nullable final ScheduledExecutorService executorService, final LinkedHashMap> columns, final WritableColumnSource[] writableSources, final Map attributes, - final long initialViewPortRows) { + @Nullable final ViewportChangedCallback viewportChangedCallback) { super(RowSetFactory.empty().toTracking(), columns); attributes.entrySet().stream() .filter(e -> !e.getKey().equals(Table.SYSTEMIC_TABLE_ATTRIBUTE)) @@ -142,12 +177,6 @@ protected BarrageTable(final UpdateSourceRegistrar registrar, stats = new Stats(tableKey); } - if (initialViewPortRows == -1) { - serverViewport = null; - } else { - serverViewport = RowSetFactory.empty(); - } - this.destSources = new WritableColumnSource[writableSources.length]; for (int ii = 0; ii < writableSources.length; ++ii) { destSources[ii] = ReinterpretUtils.maybeConvertToWritablePrimitive(writableSources[ii]); @@ -168,6 +197,7 @@ protected BarrageTable(final UpdateSourceRegistrar registrar, } this.refresher = new SourceRefresher(); + this.viewportChangedCallback = viewportChangedCallback; } /** @@ -210,36 +240,30 @@ public BitSet getServerColumns() { return serverColumns; } - /** - * Invoke sealTable to prevent further updates from being processed and to mark this source table as static. - * - * @param onSealRunnable pass a callback that gets invoked once the table has finished applying updates - * @param onSealFailure pass a callback that gets invoked if the table fails to finish applying updates - */ - public synchronized void sealTable(final Runnable onSealRunnable, final Runnable onSealFailure) { - // TODO (core#803): sealing of static table data acquired over flight/barrage - if (stats != null) { - stats.stop(); - } - setRefreshing(false); - sealed = true; - this.onSealRunnable = onSealRunnable; - this.onSealFailure = onSealFailure; - - doWakeup(); - } - @Override public void handleBarrageMessage(final BarrageMessage update) { - if (unsubscribed || sealed) { - beginLog(LogLevel.INFO).append(": Discarding update for unsubscribed/sealed table!").endl(); + if (unsubscribed) { + beginLog(LogLevel.INFO).append(": Discarding update for unsubscribed table!").endl(); return; } synchronized (pendingUpdatesLock) { pendingUpdates.add(update.clone()); } - doWakeup(); + + if (!isRefreshing()) { + try { + realRefresh(); + } catch (Throwable err) { + if (viewportChangedCallback != null) { + viewportChangedCallback.onError(err); + viewportChangedCallback = null; + } + throw err; + } + } else { + doWakeup(); + } } @Override @@ -260,16 +284,67 @@ protected void instrumentedRefresh() { final long startTm = System.nanoTime(); realRefresh(); recordMetric(stats -> stats.refresh, System.nanoTime() - startTm); - } catch (Exception e) { - beginLog(LogLevel.ERROR).append(": Failure during BarrageTable run: ").append(e).endl(); - notifyListenersOnError(e, null); + } catch (Throwable err) { + beginLog(LogLevel.ERROR).append(": Failure during BarrageTable instrumentedRefresh: ") + .append(err).endl(); + notifyListenersOnError(err, null); + + if (viewportChangedCallback != null) { + viewportChangedCallback.onError(err); + viewportChangedCallback = null; + } + if (err instanceof Error) { + // rethrow if this was an error (which should not be swallowed) + throw err; + } } } } + protected void updateServerViewport( + final RowSet viewport, + final BitSet columns, + final boolean reverseViewport) { + Assert.holdsLock(this, "BarrageTable.this"); + + final RowSet finalViewport = viewport == null ? null : viewport.copy(); + final BitSet finalColumns = (columns == null || columns.cardinality() == numColumns()) + ? null + : (BitSet) columns.clone(); + + serverViewport = finalViewport; + serverColumns = finalColumns; + serverReverseViewport = reverseViewport; + + if (viewportChangedCallback == null) { + return; + } + + // We cannot deliver the vp change until the updates have been propagated to this BarrageTable and its last + // notification step has been updated. + pendingVpChangeNotifications.add(() -> { + if (viewportChangedCallback == null) { + return; + } + if (!viewportChangedCallback.viewportChanged(finalViewport, finalColumns, reverseViewport)) { + viewportChangedCallback = null; + } + }); + } + + protected boolean isSubscribedColumn(int i) { + return serverColumns == null || serverColumns.get(i); + } + private synchronized void realRefresh() { if (pendingError != null) { - notifyListenersOnError(pendingError, null); + if (viewportChangedCallback != null) { + viewportChangedCallback.onError(pendingError); + viewportChangedCallback = null; + } + if (isRefreshing()) { + notifyListenersOnError(pendingError, null); + } // once we notify on error we are done, we can not notify any further, we are failed cleanup(); return; @@ -279,7 +354,13 @@ private synchronized void realRefresh() { // publish one last clear downstream; this data would be stale final RowSet allRows = getRowSet().copy(); getRowSet().writableCast().remove(allRows); - notifyListeners(RowSetFactory.empty(), allRows, RowSetFactory.empty()); + if (isRefreshing()) { + notifyListeners(RowSetFactory.empty(), allRows, RowSetFactory.empty()); + } + } + if (viewportChangedCallback != null) { + viewportChangedCallback.onClose(); + viewportChangedCallback = null; } cleanup(); return; @@ -301,42 +382,34 @@ private synchronized void realRefresh() { localPendingUpdates.clear(); if (update != null) { - maybeEnablePrevTracking(); - notifyListeners(update); + if (isRefreshing()) { + maybeEnablePrevTracking(); + notifyListeners(update); + } else { + update.release(); + } } - if (sealed) { - // remove all unpopulated rows from viewport snapshots - if (this.serverViewport != null) { - WritableRowSet currentRowSet = getRowSet().writableCast(); - try (final RowSet populated = currentRowSet.subSetForPositions(serverViewport, serverReverseViewport)) { - currentRowSet.retain(populated); - } - } - if (onSealRunnable != null) { - onSealRunnable.run(); - } - onSealRunnable = null; - onSealFailure = null; - cleanup(); + if (!pendingVpChangeNotifications.isEmpty()) { + pendingVpChangeNotifications.forEach(Runnable::run); + pendingVpChangeNotifications.clear(); } } private void cleanup() { unsubscribed = true; - registrar.removeSource(refresher); + if (stats != null) { + stats.stop(); + } + if (isRefreshing()) { + registrar.removeSource(refresher); + } synchronized (pendingUpdatesLock) { // release any pending snapshots, as we will never process them pendingUpdates.clear(); } // we are quite certain the shadow copies should have been drained on the last run Assert.eqZero(shadowPendingUpdates.size(), "shadowPendingUpdates.size()"); - - if (onSealFailure != null) { - onSealFailure.run(); - } - onSealRunnable = null; - onSealFailure = null; } @Override @@ -352,6 +425,18 @@ protected NotificationQueue getNotificationQueue() { private void enqueueError(final Throwable e) { synchronized (pendingUpdatesLock) { pendingError = e; + } + if (!isRefreshing()) { + try { + realRefresh(); + } catch (Throwable err) { + if (viewportChangedCallback != null) { + viewportChangedCallback.onError(err); + viewportChangedCallback = null; + } + throw err; + } + } else { doWakeup(); } } @@ -363,7 +448,6 @@ private void enqueueError(final Throwable e) { * @param executorService an executor service used to flush stats * @param tableDefinition the table definition * @param attributes Key-Value pairs of attributes to forward to the QueryTable's metadata - * @param initialViewPortRows the number of rows in the intial viewport (-1 if the table will be a full sub) * * @return a properly initialized {@link BarrageTable} */ @@ -372,9 +456,9 @@ public static BarrageTable make( @Nullable final ScheduledExecutorService executorService, final TableDefinition tableDefinition, final Map attributes, - final long initialViewPortRows) { + @Nullable final ViewportChangedCallback vpCallback) { final UpdateGraph ug = ExecutionContext.getContext().getUpdateGraph(); - return make(ug, ug, executorService, tableDefinition, attributes, initialViewPortRows); + return make(ug, ug, executorService, tableDefinition, attributes, vpCallback); } @VisibleForTesting @@ -384,7 +468,7 @@ public static BarrageTable make( @Nullable final ScheduledExecutorService executor, final TableDefinition tableDefinition, final Map attributes, - final long initialViewPortRows) { + @Nullable final ViewportChangedCallback vpCallback) { final List> columns = tableDefinition.getColumns(); final WritableColumnSource[] writableSources = new WritableColumnSource[columns.size()]; @@ -394,22 +478,16 @@ public static BarrageTable make( if (isBlinkTable instanceof Boolean && (Boolean) isBlinkTable) { final LinkedHashMap> finalColumns = makeColumns(columns, writableSources); table = new BarrageBlinkTable( - registrar, queue, executor, finalColumns, writableSources, attributes, initialViewPortRows); + registrar, queue, executor, finalColumns, writableSources, attributes, vpCallback); } else { final WritableRowRedirection rowRedirection = new LongColumnSourceWritableRowRedirection(new LongSparseArraySource()); final LinkedHashMap> finalColumns = makeColumns(columns, writableSources, rowRedirection); table = new BarrageRedirectedTable( - registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, - initialViewPortRows); + registrar, queue, executor, finalColumns, writableSources, rowRedirection, attributes, vpCallback); } - // Even if this source table will eventually be static, the data isn't here already. Static tables need to - // have refreshing set to false after processing data but prior to publishing the object to consumers. - table.setRefreshing(true); - table.addSourceToRegistrar(); - return table; } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 9e55b026754..072e149f41d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -12,14 +12,13 @@ import io.deephaven.chunk.ChunkType; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; -import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.util.BarrageMessage; -import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.io.streams.ByteBufferInputStream; import io.deephaven.proto.util.Exceptions; +import io.deephaven.util.annotations.ScriptApi; import io.deephaven.util.datastructures.LongSizedDataStructure; import org.apache.arrow.flatbuf.Message; import org.apache.arrow.flatbuf.MessageHeader; @@ -31,7 +30,6 @@ import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Iterator; -import java.util.concurrent.locks.Condition; import static io.deephaven.extensions.barrage.util.BarrageProtoUtil.DEFAULT_SER_OPTIONS; @@ -49,7 +47,6 @@ public class ArrowToTableConverter { protected BarrageSubscriptionOptions options = DEFAULT_SER_OPTIONS; private volatile boolean completed = false; - private volatile Throwable exceptionWhileCompleting = null; private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException { final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo(); @@ -72,6 +69,7 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip return mi; } + @ScriptApi public synchronized void setSchema(final byte[] ipcMessage) { if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); @@ -83,6 +81,7 @@ public synchronized void setSchema(final byte[] ipcMessage) { parseSchema((Schema) mi.header.header(new Schema())); } + @ScriptApi public synchronized void addRecordBatch(final byte[] ipcMessage) { if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); @@ -105,6 +104,7 @@ public synchronized void addRecordBatch(final byte[] ipcMessage) { resultTable.handleBarrageMessage(msg); } + @ScriptApi public synchronized BarrageTable getResultTable() { if (!completed) { throw new IllegalStateException("Conversion must be completed prior to requesting the result"); @@ -112,50 +112,12 @@ public synchronized BarrageTable getResultTable() { return resultTable; } + @ScriptApi public synchronized void onCompleted() throws InterruptedException { if (completed) { throw new IllegalStateException("Conversion cannot be completed twice"); } - - final Condition completedCondition; - final UpdateGraph updateGraph = resultTable.getUpdateGraph(); - if (updateGraph.exclusiveLock().isHeldByCurrentThread()) { - completedCondition = updateGraph.exclusiveLock().newCondition(); - } else { - completedCondition = null; - } - - resultTable.sealTable(() -> { - completed = true; - signalCompletion(completedCondition); - }, () -> { - exceptionWhileCompleting = new Exception(); - signalCompletion(completedCondition); - }); - - while (!completed && exceptionWhileCompleting == null) { - // handle the condition where this function may have the exclusive lock - if (completedCondition != null) { - completedCondition.await(); - } else { - wait(); // ArrowToTableConverter lock - } - } - - if (exceptionWhileCompleting != null) { - throw new UncheckedDeephavenException("Error while sealing result table:", exceptionWhileCompleting); - } - } - - private void signalCompletion(final Condition completedCondition) { - if (completedCondition != null) { - UpdateGraph updateGraph = resultTable.getUpdateGraph(); - updateGraph.requestSignal(completedCondition); - } else { - synchronized (ArrowToTableConverter.this) { - ArrowToTableConverter.this.notifyAll(); - } - } + completed = true; } protected void parseSchema(final Schema header) { @@ -164,9 +126,7 @@ protected void parseSchema(final Schema header) { } final BarrageUtil.ConvertedArrowSchema result = BarrageUtil.convertArrowSchema(header); - result.attributes.put(Table.ADD_ONLY_TABLE_ATTRIBUTE, true); - result.attributes.put(Table.APPEND_ONLY_TABLE_ATTRIBUTE, true); - resultTable = BarrageTable.make(null, result.tableDef, result.attributes, -1); + resultTable = BarrageTable.make(null, result.tableDef, result.attributes, null); resultTable.setFlat(); columnConversionFactors = result.conversionFactors; diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java index 746b4408f66..31870548a15 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshot.java @@ -3,6 +3,7 @@ */ package io.deephaven.client.impl; +import io.deephaven.UncheckedDeephavenException; import io.deephaven.engine.liveness.LivenessReferent; import io.deephaven.engine.rowset.RowSet; import io.deephaven.extensions.barrage.BarrageSnapshotOptions; @@ -45,6 +46,15 @@ BarrageSnapshot snapshot(TableSpec tableSpec, BarrageSnapshotOptions options) */ BarrageTable entireTable() throws InterruptedException; + /** + * Request a full snapshot of the data and populate a {@link BarrageTable} with the data that is received. + * + * @param blockUntilComplete Whether to block execution until all rows for the subscribed table are available + * + * @return the {@code BarrageTable} + */ + BarrageTable entireTable(boolean blockUntilComplete) throws InterruptedException; + /** * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with * the data that is received. @@ -69,6 +79,35 @@ BarrageSnapshot snapshot(TableSpec tableSpec, BarrageSnapshotOptions options) */ BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException; + /** + * Request a partial snapshot of the data limited by viewport or column set and populate a {@link BarrageTable} with + * the data that is received. Allows the viewport to be reversed. + * + * @param viewport the position-space viewport to use for the subscription + * @param columns the columns to include in the subscription + * @param reverseViewport Whether to treat {@code posRowSet} as offsets from + * {@link io.deephaven.engine.table.Table#size()} rather than {@code 0} + * @param blockUntilComplete Whether to block execution until the subscribed table viewport is satisfied + * + * @return the {@code BarrageTable} + */ + BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) + throws InterruptedException; + + /** + * Block until the snapshot is complete. + *

+ * It is an error to {@code blockUntilComplete} if the current thread holds the result table's UpdateGraph shared + * lock. If the current thread holds the result table's UpdateGraph exclusive lock, then this method will use an + * update graph condition variable to wait for completion. Otherwise, this method will use the snapshot's object + * monitor to wait for completion. + * + * @throws InterruptedException if the current thread is interrupted while waiting for completion + * @throws UncheckedDeephavenException if an error occurred while handling the snapshot + * @return the {@code BarrageTable} + */ + BarrageTable blockUntilComplete() throws InterruptedException; + @Override void close(); } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index 1cf704e6aa6..cd80dae4d99 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -77,7 +77,7 @@ public BarrageSnapshotImpl( final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response()); final TableDefinition tableDefinition = schema.tableDef; - resultTable = BarrageTable.make(executorService, tableDefinition, schema.attributes, -1); + resultTable = BarrageTable.make(executorService, tableDefinition, schema.attributes, new CheckForCompletion()); resultTable.addParentReference(this); final MethodDescriptor snapshotDescriptor = @@ -117,8 +117,8 @@ public void onNext(final BarrageMessage barrageMessage) { return; } try (barrageMessage) { - final Listener listener = resultTable; - if (!connected || listener == null) { + final Listener localResultTable = resultTable; + if (!connected || localResultTable == null) { return; } @@ -135,7 +135,7 @@ public void onNext(final BarrageMessage barrageMessage) { rowsReceived += resultSize; - listener.handleBarrageMessage(barrageMessage); + localResultTable.handleBarrageMessage(barrageMessage); } } @@ -145,11 +145,11 @@ public void onError(final Throwable t) { .append(": Error detected in snapshot: ") .append(t).endl(); - final Listener listener = resultTable; - if (!connected || listener == null) { + final Listener localResultTable = resultTable; + if (!connected || localResultTable == null) { return; } - listener.handleBarrageError(t); + localResultTable.handleBarrageError(t); handleDisconnect(); } @@ -161,36 +161,39 @@ public void onCompleted() { @Override public BarrageTable entireTable() throws InterruptedException { - return partialTable(null, null); + return partialTable(null, null, false, true); } @Override - public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException { - return partialTable(viewport, columns, false); + public BarrageTable entireTable(boolean blockUntilComplete) throws InterruptedException { + return partialTable(null, null, false, blockUntilComplete); } @Override - public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport) - throws InterruptedException { - // notify user when connection has already been used and closed - if (prevUsed) { - throw new UnsupportedOperationException("Snapshot object already used"); - } - - // test lock conditions - if (resultTable.getUpdateGraph().sharedLock().isHeldByCurrentThread()) { - throw new UnsupportedOperationException( - "Cannot snapshot while holding the UpdateGraph shared lock"); - } + public BarrageTable partialTable(RowSet viewport, BitSet columns) throws InterruptedException { + return partialTable(viewport, columns, false, true); + } - prevUsed = true; + @Override + public BarrageTable partialTable( + RowSet viewport, BitSet columns, boolean reverseViewport) throws InterruptedException { + return partialTable(viewport, columns, reverseViewport, true); + } - if (resultTable.getUpdateGraph().exclusiveLock().isHeldByCurrentThread()) { - completedCondition = resultTable.getUpdateGraph().exclusiveLock().newCondition(); - } + @Override + public BarrageTable partialTable( + RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) + throws InterruptedException { + synchronized (this) { + if (!connected) { + throw new UncheckedDeephavenException(this + " is not connected"); + } - if (!connected) { - throw new UncheckedDeephavenException(this + " is not connected"); + // notify user when connection has already been used and closed + if (prevUsed) { + throw new UnsupportedOperationException("Snapshot object already used"); + } + prevUsed = true; } // store this for streamreader parser @@ -203,20 +206,57 @@ public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, b observer.onCompleted(); - while (!completed && exceptionWhileCompleting == null) { - // handle the condition where this function may have the exclusive lock - if (completedCondition != null) { - completedCondition.await(); - } else { - wait(); // barragesnapshotimpl lock - } + if (blockUntilComplete) { + return blockUntilComplete(); + } + + return resultTable; + } + + private boolean checkIfCompleteOrThrow() { + if (exceptionWhileCompleting != null) { + throw new UncheckedDeephavenException("Error while handling subscription:", exceptionWhileCompleting); } + return completed; + } - if (exceptionWhileCompleting == null) { + @Override + public BarrageTable blockUntilComplete() throws InterruptedException { + if (checkIfCompleteOrThrow()) { return resultTable; + } + + // test lock conditions + if (resultTable.getUpdateGraph().sharedLock().isHeldByCurrentThread()) { + throw new UnsupportedOperationException( + "Cannot wait for snapshot to complete while holding the UpdateGraph shared lock"); + } + + final boolean holdingUpdateGraphLock = resultTable.getUpdateGraph().exclusiveLock().isHeldByCurrentThread(); + if (completedCondition == null && holdingUpdateGraphLock) { + synchronized (this) { + if (checkIfCompleteOrThrow()) { + return resultTable; + } + if (completedCondition == null) { + completedCondition = resultTable.getUpdateGraph().exclusiveLock().newCondition(); + } + } + } + + if (holdingUpdateGraphLock) { + while (!checkIfCompleteOrThrow()) { + completedCondition.await(); + } } else { - throw new UncheckedDeephavenException("Error while handling snapshot:", exceptionWhileCompleting); + synchronized (this) { + while (!checkIfCompleteOrThrow()) { + wait(); // BarrageSnapshotImpl lock + } + } } + + return resultTable; } @Override @@ -230,24 +270,17 @@ private void handleDisconnect() { return; } - resultTable.sealTable(() -> { - completed = true; - signalCompletion(); - }, () -> { - exceptionWhileCompleting = new Exception(); - signalCompletion(); - }); + completed = true; + signalCompletion(); cleanup(); } - private void signalCompletion() { + private synchronized void signalCompletion() { if (completedCondition != null) { resultTable.getUpdateGraph().requestSignal(completedCondition); - } else { - synchronized (BarrageSnapshotImpl.this) { - BarrageSnapshotImpl.this.notifyAll(); - } } + + notifyAll(); } @Override @@ -372,4 +405,22 @@ public BarrageMessage parse(final InputStream stream) { stream); } } + + private class CheckForCompletion implements BarrageTable.ViewportChangedCallback { + @Override + public boolean viewportChanged(@Nullable RowSet rowSet, @Nullable BitSet columns, boolean reverse) { + return true; + } + + @Override + public void onError(Throwable t) { + exceptionWhileCompleting = t; + signalCompletion(); + } + + @Override + public void onClose() { + signalCompletion(); + } + } } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java index 8d68f52d0a9..97e2e91c9eb 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscription.java @@ -3,6 +3,7 @@ */ package io.deephaven.client.impl; +import io.deephaven.UncheckedDeephavenException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.engine.liveness.LivenessReferent; @@ -45,14 +46,6 @@ BarrageSubscription subscribe(TableSpec tableSpec, BarrageSubscriptionOptions op */ boolean isCompleted(); - /** - * This call will return the number of rows received by the subscription handler. This is the sum of all the - * `rowsIncluded` in the BarrageMessages - * - * @return number of rows received by the subscription handler - */ - long getRowsReceived(); - /** * Request a full subscription of the data and populate a {@link BarrageTable} with the incrementally updating data * that is received. This call will block until all rows for the subscribed table are available. @@ -172,6 +165,20 @@ BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean rever boolean blockUntilComplete) throws InterruptedException; + /** + * Block until the subscription is complete. + *

+ * It is an error to {@code blockUntilComplete} if the current thread holds the result table's UpdateGraph shared + * lock. If the current thread holds the result table's UpdateGraph exclusive lock, then this method will use an + * update graph condition variable to wait for completion. Otherwise, this method will use the subscription's object + * monitor to wait for completion. + * + * @throws InterruptedException if the current thread is interrupted while waiting for completion + * @throws UncheckedDeephavenException if an error occurred while handling the subscription + * @return the {@code BarrageTable} + */ + BarrageTable blockUntilComplete() throws InterruptedException; + @Override void close(); } diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index 694b7fb7c46..c328eb06146 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -13,21 +13,14 @@ import io.deephaven.chunk.ChunkType; import io.deephaven.engine.liveness.ReferenceCountedLivenessNode; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.rowset.RowSetShiftData; -import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.TableDefinition; -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener; -import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.table.impl.util.BarrageMessage; -import io.deephaven.engine.table.impl.util.BarrageMessage.Listener; import io.deephaven.extensions.barrage.BarrageSubscriptionOptions; import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.*; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; -import io.deephaven.util.annotations.ReferentialIntegrity; import io.deephaven.util.annotations.VisibleForTesting; import io.grpc.CallOptions; import io.grpc.ClientCall; @@ -55,18 +48,17 @@ public class BarrageSubscriptionImpl extends ReferenceCountedLivenessNode implem private final TableHandle tableHandle; private final BarrageSubscriptionOptions options; private final ClientCallStreamObserver observer; + private final CheckForCompletion checkForCompletion; + private final BarrageTable resultTable; - private BarrageTable resultTable; + private boolean subscribed; + private boolean isSnapshot; private volatile Condition completedCondition; - private volatile boolean completed = false; - private volatile long rowsReceived = 0L; - private volatile Throwable exceptionWhileCompleting = null; - private InstrumentedTableUpdateListener listener = null; + private volatile boolean completed; + private volatile Throwable exceptionWhileCompleting; - private boolean subscribed = false; private volatile boolean connected = true; - private boolean isSnapshot = false; /** * Represents a BarrageSubscription. @@ -82,13 +74,13 @@ public BarrageSubscriptionImpl( super(false); this.logName = tableHandle.exportId().toString(); - this.options = options; this.tableHandle = tableHandle; + this.options = options; final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(tableHandle.response()); final TableDefinition tableDefinition = schema.tableDef; - resultTable = BarrageTable.make(executorService, tableDefinition, schema.attributes, -1); - resultTable.addParentReference(this); + checkForCompletion = new CheckForCompletion(); + resultTable = BarrageTable.make(executorService, tableDefinition, schema.attributes, checkForCompletion); final MethodDescriptor subscribeDescriptor = getClientDoExchangeDescriptor(options, schema.computeWireChunkTypes(), schema.computeWireTypes(), @@ -125,31 +117,17 @@ public void onNext(final BarrageMessage barrageMessage) { return; } try (barrageMessage) { - final Listener localResultTable = resultTable; - if (!connected || localResultTable == null) { + if (!connected) { return; } - long numRows = barrageMessage.rowsIncluded.size(); - rowsReceived += numRows; - localResultTable.handleBarrageMessage(barrageMessage); - - // if the message was empty, then BaseTable prevents propagating the empty update, and our listener was - // not invoked, so let's invoke it ourselves - if (numRows == 0) { - final TableUpdate emptyUpdate = new TableUpdateImpl( - RowSetFactory.empty(), RowSetFactory.empty(), RowSetFactory.empty(), RowSetShiftData.EMPTY, - ModifiedColumnSet.EMPTY); - listener.onUpdate(emptyUpdate); - emptyUpdate.release(); - } + resultTable.handleBarrageMessage(barrageMessage); } } @Override public void onError(final Throwable t) { - final Listener listener = resultTable; - if (!connected || listener == null) { + if (!connected) { return; } @@ -157,7 +135,8 @@ public void onError(final Throwable t) { .append(": Error detected in subscription: ") .append(t).endl(); - listener.handleBarrageError(t); + exceptionWhileCompleting = t; + resultTable.handleBarrageError(t); handleDisconnect(); } @@ -172,11 +151,6 @@ public boolean isCompleted() { return completed; } - @Override - public long getRowsReceived() { - return rowsReceived; - } - @Override public BarrageTable entireTable() throws InterruptedException { return entireTable(true); @@ -199,123 +173,101 @@ public BarrageTable partialTable(RowSet viewport, BitSet columns, boolean revers } @Override - public synchronized BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport, + public BarrageTable partialTable(RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) throws InterruptedException { - if (!connected) { - throw new UncheckedDeephavenException( - this + " is no longer an active subscription and cannot be retained further"); - } - if (subscribed) { - throw new UncheckedDeephavenException( - "BarrageSubscription objects cannot be reused."); - } else { - // test lock conditions - if (resultTable.getUpdateGraph().sharedLock().isHeldByCurrentThread()) { - throw new UnsupportedOperationException( - "Cannot create subscription while holding the UpdateGraph shared lock"); + synchronized (this) { + if (!connected) { + throw new UncheckedDeephavenException( + this + " is no longer an active subscription and cannot be retained further"); } - - if (resultTable.getUpdateGraph().exclusiveLock().isHeldByCurrentThread()) { - completedCondition = resultTable.getUpdateGraph().exclusiveLock().newCondition(); + if (subscribed) { + throw new UncheckedDeephavenException( + "BarrageSubscription objects cannot be reused."); } - - // Send the initial subscription: - observer.onNext(FlightData.newBuilder() - .setAppMetadata(ByteStringAccess.wrap(makeRequestInternal( - viewport, columns, reverseViewport, options, tableHandle.ticketId().bytes()))) - .build()); subscribed = true; + } - // use a listener to decide when the table is complete - listener = new InstrumentedTableUpdateListener("completeness-listener") { - @ReferentialIntegrity - final BarrageTable tableRef = resultTable; - { - // Maintain a liveness ownership relationship with resultTable for the lifetime of the - // listener - manage(tableRef); - } + checkForCompletion.setExpected( + viewport == null ? null : viewport.copy(), + columns == null ? null : (BitSet) (columns.clone()), + reverseViewport); - @Override - protected void destroy() { - super.destroy(); - tableRef.removeUpdateListener(this); - } + if (!isSnapshot) { + resultTable.addSourceToRegistrar(); + resultTable.addParentReference(this); + } - @Override - protected void onFailureInternal(final Throwable originalException, final Entry sourceEntry) { - exceptionWhileCompleting = originalException; - if (completedCondition != null) { - resultTable.getUpdateGraph().requestSignal(completedCondition); - } else { - synchronized (BarrageSubscriptionImpl.this) { - BarrageSubscriptionImpl.this.notifyAll(); - } - } - } + // Send the initial subscription: + observer.onNext(FlightData.newBuilder() + .setAppMetadata(ByteStringAccess.wrap(makeRequestInternal( + viewport, columns, reverseViewport, options, tableHandle.ticketId().bytes()))) + .build()); - @Override - public void onUpdate(final TableUpdate upstream) { - boolean isComplete = false; + if (blockUntilComplete) { + return blockUntilComplete(); + } - // test to see if the viewport matches the requested - if (viewport == null && resultTable.getServerViewport() == null) { - isComplete = true; - } else if (viewport != null && resultTable.getServerViewport() != null - && reverseViewport == resultTable.getServerReverseViewport()) { - isComplete = viewport.subsetOf(resultTable.getServerViewport()); - } + return resultTable; + } - if (isComplete) { - if (isSnapshot) { - resultTable.sealTable(() -> { - // signal that we are closing the connection - observer.onCompleted(); - signalCompletion(); - }, () -> { - exceptionWhileCompleting = new Exception(); - }); - } else { - signalCompletion(); - } - - // no longer need to listen for completion - resultTable.removeUpdateListener(this); - listener = null; - } - } - }; + private boolean checkIfCompleteOrThrow() { + if (exceptionWhileCompleting != null) { + throw new UncheckedDeephavenException("Error while handling subscription:", exceptionWhileCompleting); + } + return completed; + } - resultTable.addUpdateListener(listener); + @Override + public BarrageTable blockUntilComplete() throws InterruptedException { + if (checkIfCompleteOrThrow()) { + return resultTable; + } - if (blockUntilComplete) { - while (!completed && exceptionWhileCompleting == null) { - // handle the condition where this function may have the exclusive lock - if (completedCondition != null) { - completedCondition.await(); - } else { - wait(); // barragesubscriptionimpl lock - } + // test lock conditions + if (resultTable.getUpdateGraph().sharedLock().isHeldByCurrentThread()) { + throw new UnsupportedOperationException( + "Cannot wait for subscription to complete while holding the UpdateGraph shared lock"); + } + + final boolean holdingUpdateGraphLock = resultTable.getUpdateGraph().exclusiveLock().isHeldByCurrentThread(); + if (completedCondition == null && holdingUpdateGraphLock) { + synchronized (this) { + if (checkIfCompleteOrThrow()) { + return resultTable; + } + if (completedCondition == null) { + completedCondition = resultTable.getUpdateGraph().exclusiveLock().newCondition(); } } } - if (exceptionWhileCompleting == null) { - return resultTable; + if (holdingUpdateGraphLock) { + while (!checkIfCompleteOrThrow()) { + completedCondition.await(); + } } else { - throw new UncheckedDeephavenException("Error while handling subscription:", exceptionWhileCompleting); + synchronized (this) { + while (!checkIfCompleteOrThrow()) { + wait(); // BarrageSubscriptionImpl lock + } + } } + + return resultTable; } - private void signalCompletion() { + private synchronized void signalCompletion() { completed = true; + + // if we are building a snapshot via a growing viewport subscription, then cancel our subscription + if (isSnapshot) { + observer.onCompleted(); + } + if (completedCondition != null) { resultTable.getUpdateGraph().requestSignal(completedCondition); - } else { - synchronized (BarrageSubscriptionImpl.this) { - BarrageSubscriptionImpl.this.notifyAll(); - } } + notifyAll(); } @Override @@ -340,7 +292,7 @@ public BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolea } @Override - public synchronized BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport, + public BarrageTable snapshotPartialTable(RowSet viewport, BitSet columns, boolean reverseViewport, boolean blockUntilComplete) throws InterruptedException { isSnapshot = true; return partialTable(viewport, columns, reverseViewport, blockUntilComplete); @@ -375,7 +327,6 @@ public synchronized void close() { private void cleanup() { this.connected = false; this.tableHandle.close(); - resultTable = null; } @Override @@ -489,4 +440,70 @@ public BarrageMessage parse(final InputStream stream) { return streamReader.safelyParseFrom(options, null, columnChunkTypes, columnTypes, componentTypes, stream); } } + + private class CheckForCompletion implements BarrageTable.ViewportChangedCallback { + private RowSet expectedViewport; + private BitSet expectedColumns; + private boolean expectedReverseViewport; + + private synchronized void setExpected(RowSet viewport, BitSet columns, boolean reverseViewport) { + expectedViewport = viewport == null ? null : viewport.copy(); + expectedColumns = columns == null ? null : (BitSet) (columns.clone()); + expectedReverseViewport = reverseViewport; + } + + @Override + public synchronized boolean viewportChanged( + @Nullable final RowSet serverViewport, + @Nullable final BitSet serverColumns, + final boolean serverReverseViewport) { + if (completed) { + return false; + } + + // @formatter:off + final boolean correctColumns = + // all columns are expected + (expectedColumns == null + && (serverColumns == null || serverColumns.cardinality() == resultTable.numColumns())) + // only specific set of columns are expected + || (expectedColumns != null && expectedColumns.equals(serverColumns)); + + final boolean isComplete = exceptionWhileCompleting != null + // Full subscription is completed + || (correctColumns && expectedViewport == null && serverViewport == null) + // Viewport subscription is completed + || (correctColumns && expectedViewport != null + && expectedReverseViewport == resultTable.getServerReverseViewport() + && expectedViewport.equals(serverViewport)); + // @formatter:on + + if (isComplete) { + // remove all unpopulated rows from viewport snapshots + if (isSnapshot && serverViewport != null) { + // noinspection resource + WritableRowSet currentRowSet = resultTable.getRowSet().writableCast(); + try (final RowSet populated = + currentRowSet.subSetForPositions(serverViewport, serverReverseViewport)) { + currentRowSet.retain(populated); + } + } + + signalCompletion(); + } + + return !isComplete; + } + + @Override + public void onError(Throwable t) { + exceptionWhileCompleting = t; + signalCompletion(); + } + + @Override + public void onClose() { + signalCompletion(); + } + } } diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 83cec575b0e..dc53cb716e6 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -251,14 +251,11 @@ public void onCompleted() { } localResultTable.dropReference(); - // no more changes allowed; this is officially static content - localResultTable.sealTable(() -> localExportBuilder.submit(() -> { + // let's finally export the table to our listener + localExportBuilder.submit(() -> { GrpcUtil.safelyComplete(observer); session.removeOnCloseCallback(this); return localResultTable; - }), () -> { - GrpcUtil.safelyError(observer, Code.DATA_LOSS, "Do put could not be sealed"); - session.removeOnCloseCallback(this); }); } diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index 8f7dd5250c8..d06c189d4be 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -603,33 +603,39 @@ private class DeltaListener extends InstrumentedTableUpdateListener { @Override public void onUpdate(final TableUpdate upstream) { synchronized (BarrageMessageProducer.this) { - if (lastUpdateClockStep >= parent.getUpdateGraph().clock().currentStep()) { - throw new IllegalStateException(logPrefix + "lastUpdateClockStep=" + lastUpdateClockStep - + " >= notification on " - + parent.getUpdateGraph().clock().currentStep()); - } + try { + if (lastUpdateClockStep >= parent.getUpdateGraph().clock().currentStep()) { + throw new IllegalStateException(logPrefix + "lastUpdateClockStep=" + lastUpdateClockStep + + " >= notification on " + + parent.getUpdateGraph().clock().currentStep()); + } - final boolean shouldEnqueueDelta = !activeSubscriptions.isEmpty(); - if (shouldEnqueueDelta) { - final long startTm = System.nanoTime(); - enqueueUpdate(upstream); - recordMetric(stats -> stats.enqueue, System.nanoTime() - startTm); - schedulePropagation(); - } - parentTableSize = parent.size(); - - // mark when the last indices are from, so that terminal notifications can make use of them if required - lastUpdateClockStep = parent.getUpdateGraph().clock().currentStep(); - if (log.isDebugEnabled()) { - try (final RowSet prevRowSet = parent.getRowSet().copyPrev()) { - log.debug().append(logPrefix) - .append("lastUpdateClockStep=").append(lastUpdateClockStep) - .append(", upstream=").append(upstream).append(", shouldEnqueueDelta=") - .append(shouldEnqueueDelta) - .append(", rowSet=").append(parent.getRowSet()).append(", prevRowSet=") - .append(prevRowSet) - .endl(); + final boolean shouldEnqueueDelta = !activeSubscriptions.isEmpty(); + if (shouldEnqueueDelta) { + final long startTm = System.nanoTime(); + enqueueUpdate(upstream); + recordMetric(stats -> stats.enqueue, System.nanoTime() - startTm); + schedulePropagation(); + } + parentTableSize = parent.size(); + + lastUpdateClockStep = parent.getUpdateGraph().clock().currentStep(); + if (log.isDebugEnabled()) { + try (final RowSet prevRowSet = parent.getRowSet().copyPrev()) { + log.debug().append(logPrefix) + .append("lastUpdateClockStep=").append(lastUpdateClockStep) + .append(", upstream=").append(upstream).append(", shouldEnqueueDelta=") + .append(shouldEnqueueDelta) + .append(", rowSet=").append(parent.getRowSet()).append(", prevRowSet=") + .append(prevRowSet) + .endl(); + } } + } catch (Exception err) { + // the BMP is failing not the parent table; so we need to remove the BMP from the update graph + forceReferenceCountToZero(); + pendingError = err; + schedulePropagation(); } } } @@ -637,10 +643,8 @@ public void onUpdate(final TableUpdate upstream) { @Override protected void onFailureInternal(final Throwable originalException, Entry sourceEntry) { synchronized (BarrageMessageProducer.this) { - if (pendingError != null) { - pendingError = originalException; - schedulePropagation(); - } + pendingError = originalException; + schedulePropagation(); } } diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index a5fd42fff69..ebdd08834ef 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -49,7 +49,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; diff --git a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java index abf3956ebd9..cfafae01416 100644 --- a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java +++ b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java @@ -140,7 +140,12 @@ private synchronized void onNewTableExport(final Ticket ticket, final int export } final SwapListener swapListener = new SwapListener(table); - swapListener.subscribeForUpdates(); + try { + swapListener.subscribeForUpdates(); + } catch (IllegalStateException ise) { + // It's possible that the table has already failed or been destroyed. + return; + } final ListenerImpl listener = new ListenerImpl(table, exportId); listener.tryRetainReference(); updateListenerMap.put(exportId, listener); diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java index 0c42a2f639b..7c80902a289 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageBlinkTableTest.java @@ -183,9 +183,9 @@ private class RemoteClient { final ByteString schemaBytes = BarrageUtil.schemaBytesFromTable(blinkTable); final Schema flatbufSchema = SchemaHelper.flatbufSchema(schemaBytes.asReadOnlyByteBuffer()); final BarrageUtil.ConvertedArrowSchema schema = BarrageUtil.convertArrowSchema(flatbufSchema); - this.barrageTable = BarrageTable.make(updateSourceCombiner, - ExecutionContext.getContext().getUpdateGraph(), - null, schema.tableDef, schema.attributes, viewport == null ? -1 : viewport.size()); + this.barrageTable = BarrageTable.make(updateSourceCombiner, ExecutionContext.getContext().getUpdateGraph(), + null, schema.tableDef, schema.attributes, null); + this.barrageTable.addSourceToRegistrar(); final BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder() .useDeephavenNulls(useDeephavenNulls) diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java index e7b19b3ed30..202e7e7d11a 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java @@ -179,8 +179,8 @@ private class RemoteClient { this.barrageTable = BarrageTable.make(updateSourceCombiner, ExecutionContext.getContext().getUpdateGraph(), - null, barrageMessageProducer.getTableDefinition(), new HashMap<>(), - viewport == null ? -1 : viewport.size()); + null, barrageMessageProducer.getTableDefinition(), new HashMap<>(), null); + this.barrageTable.addSourceToRegistrar(); final BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder() .useDeephavenNulls(useDeephavenNulls)