Skip to content

Commit

Permalink
Barrage Client: fix race conditions in subscription and static-table …
Browse files Browse the repository at this point in the history
…logic (deephaven#4630)

Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
nbauernfeind and rcaudy authored Oct 17, 2023
1 parent 186891c commit cbaf19b
Show file tree
Hide file tree
Showing 14 changed files with 524 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ protected BarrageBlinkTable(
final LinkedHashMap<String, ColumnSource<?>> columns,
final WritableColumnSource<?>[] writableSources,
final Map<String, Object> attributes,
final long initialViewPortRows) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, initialViewPortRows);
@Nullable final ViewportChangedCallback vpCallback) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback);
}

private void processUpdate(final BarrageMessage update) {
Expand All @@ -73,9 +73,7 @@ private void processUpdate(final BarrageMessage update) {
}

if (update.isSnapshot) {
serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy();
serverReverseViewport = update.snapshotRowSetIsReversed;
serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone();
updateServerViewport(update.snapshotRowSet, update.snapshotColumns, update.snapshotRowSetIsReversed);
}

if (update.shifted.nonempty()) {
Expand Down Expand Up @@ -127,10 +125,6 @@ private void processUpdate(final BarrageMessage update) {
}
}

private boolean isSubscribedColumn(int i) {
return serverColumns == null || serverColumns.get(i);
}

private void ensureCapacity(long size) {
if (capacity < size) {
capacity = Long.highestOneBit(Math.max(size * 2, 8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ protected BarrageRedirectedTable(final UpdateSourceRegistrar registrar,
final WritableColumnSource<?>[] writableSources,
final WritableRowRedirection rowRedirection,
final Map<String, Object> attributes,
final long initialViewPortRows) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, initialViewPortRows);
@Nullable final ViewportChangedCallback vpCallback) {
super(registrar, notificationQueue, executorService, columns, writableSources, attributes, vpCallback);
this.rowRedirection = rowRedirection;
}

Expand Down Expand Up @@ -90,16 +90,17 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC
}

if (update.isSnapshot) {
serverViewport = update.snapshotRowSet == null ? null : update.snapshotRowSet.copy();
serverReverseViewport = update.snapshotRowSetIsReversed;
serverColumns = update.snapshotColumns == null ? null : (BitSet) update.snapshotColumns.clone();
updateServerViewport(update.snapshotRowSet, update.snapshotColumns, update.snapshotRowSetIsReversed);
}

// make sure that these RowSet updates make some sense compared with each other, and our current view of the
// table
final WritableRowSet currentRowSet = getRowSet().writableCast();
final boolean mightBeInitialSnapshot = currentRowSet.isEmpty() && update.isSnapshot;

final RowSet serverViewport = getServerViewport();
final boolean serverReverseViewport = getServerReverseViewport();

try (final RowSet currRowsFromPrev = currentRowSet.copy();
final WritableRowSet populatedRows =
(serverViewport != null
Expand Down Expand Up @@ -249,10 +250,6 @@ private UpdateCoalescer processUpdate(final BarrageMessage update, final UpdateC
}
}

private boolean isSubscribedColumn(int i) {
return serverColumns == null || serverColumns.get(i);
}

private RowSet getFreeRows(long size) {
if (size <= 0) {
return RowSetFactory.empty();
Expand Down
Loading

0 comments on commit cbaf19b

Please sign in to comment.