From 1593f91c405bd8e0d65518c26eeba67f5104bbff Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Fri, 19 Jan 2024 12:30:58 -0700 Subject: [PATCH] Eliminate Double Error Notifications in jsapi and Error Log (#5048) Co-authored-by: Colin Alworth Co-authored-by: Ryan Caudy --- .../extensions/barrage/util/GrpcUtil.java | 15 ---- .../barrage/BarrageMessageProducer.java | 1 - .../server/session/SessionService.java | 64 +++++++++++++++- .../table/ExportedTableUpdateListener.java | 28 +++++-- .../table/ops/TableServiceGrpcImpl.java | 12 ++- .../server/session/SessionServiceTest.java | 74 +++++++++++++++++++ .../table/ExportTableUpdateListenerTest.java | 30 +++++--- .../web/client/api/HasEventHandling.java | 2 - .../web/client/api/WorkerConnection.java | 12 +-- .../barrage/stream/ResponseStreamWrapper.java | 5 ++ .../web/client/state/ClientTableState.java | 5 ++ 11 files changed, 197 insertions(+), 51 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java index d9223ce1909..d2231daa020 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/GrpcUtil.java @@ -19,21 +19,6 @@ public class GrpcUtil { private static final Logger log = LoggerFactory.getLogger(GrpcUtil.class); - public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err) { - return securelyWrapError(log, err, Code.INVALID_ARGUMENT); - } - - public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err, - final Code statusCode) { - if (err instanceof StatusRuntimeException) { - return (StatusRuntimeException) err; - } - - final UUID errorId = UUID.randomUUID(); - log.error().append("Internal Error '").append(errorId.toString()).append("' ").append(err).endl(); - return Exceptions.statusRuntimeException(statusCode, "Details Logged w/ID '" + errorId + "'"); - } - /** * Wraps the provided runner in a try/catch block to minimize damage caused by a failing externally supplied helper. * diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index b341e15b48c..66529776278 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -1495,7 +1495,6 @@ private void updateSubscriptionsSnapshotAndPropagate() { if (pendingError != null) { StatusRuntimeException ex = errorTransformer.transform(pendingError); for (final Subscription subscription : activeSubscriptions) { - // TODO (core#801): effective error reporting to api clients GrpcUtil.safelyError(subscription.listener, ex); } } diff --git a/server/src/main/java/io/deephaven/server/session/SessionService.java b/server/src/main/java/io/deephaven/server/session/SessionService.java index 52bcf46f2e1..dacadc99064 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionService.java +++ b/server/src/main/java/io/deephaven/server/session/SessionService.java @@ -4,6 +4,8 @@ package io.deephaven.server.session; import com.github.f4b6a3.uuid.UuidCreator; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; import com.google.rpc.Code; import io.deephaven.auth.AuthenticationException; @@ -13,6 +15,7 @@ import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.proto.backplane.grpc.TerminationNotificationResponse; +import io.deephaven.proto.util.Exceptions; import io.deephaven.server.util.Scheduler; import io.deephaven.auth.AuthContext; import io.deephaven.util.process.ProcessEnvironment; @@ -22,6 +25,7 @@ import org.apache.arrow.flight.auth2.Auth2Constants; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; import javax.inject.Inject; import javax.inject.Named; @@ -54,8 +58,21 @@ public interface ErrorTransformer { @Singleton public static class ObfuscatingErrorTransformer implements ErrorTransformer { + @VisibleForTesting + static final int MAX_STACK_TRACE_CAUSAL_DEPTH = 25; + private static final int MAX_CACHE_BUILDER_SIZE = 1009; + private static final int MAX_CACHE_DURATION_MIN = 1; + + private final Cache idCache; + @Inject - public ObfuscatingErrorTransformer() {} + public ObfuscatingErrorTransformer() { + idCache = CacheBuilder.newBuilder() + .expireAfterAccess(MAX_CACHE_DURATION_MIN, TimeUnit.MINUTES) + .maximumSize(MAX_CACHE_BUILDER_SIZE) + .weakKeys() + .build(); + } @Override public StatusRuntimeException transform(final Throwable err) { @@ -70,10 +87,51 @@ public StatusRuntimeException transform(final Throwable err) { } return sre; } else if (err instanceof InterruptedException) { - return GrpcUtil.securelyWrapError(log, err, Code.UNAVAILABLE); + return securelyWrapError(err, Code.UNAVAILABLE); } else { - return GrpcUtil.securelyWrapError(log, err); + return securelyWrapError(err, Code.INVALID_ARGUMENT); + } + } + + private StatusRuntimeException securelyWrapError(@NotNull final Throwable err, final Code statusCode) { + UUID errorId; + final boolean shouldLog; + + synchronized (idCache) { + errorId = idCache.getIfPresent(err); + shouldLog = errorId == null; + + int currDepth = 0; + // @formatter:off + for (Throwable causeToCheck = err.getCause(); + errorId == null && ++currDepth < MAX_STACK_TRACE_CAUSAL_DEPTH && causeToCheck != null; + causeToCheck = causeToCheck.getCause()) { + // @formatter:on + errorId = idCache.getIfPresent(causeToCheck); + } + + if (errorId == null) { + errorId = UuidCreator.getRandomBased(); + } + + // @formatter:off + for (Throwable throwableToAdd = err; + currDepth > 0; + throwableToAdd = throwableToAdd.getCause(), --currDepth) { + // @formatter:on + if (throwableToAdd.getStackTrace().length > 0) { + // Note that stackless exceptions are singletons, so it would be a bad idea to cache them + idCache.put(throwableToAdd, errorId); + } + } } + + if (shouldLog) { + // this is a new top-level error; log it, possibly using an existing errorId + log.error().append("Internal Error '").append(errorId.toString()).append("' ").append(err).endl(); + } + + return Exceptions.statusRuntimeException(statusCode, "Details Logged w/ID '" + errorId + "'"); } } diff --git a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java index dcb41f5ae30..778d1da62cd 100644 --- a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java +++ b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java @@ -4,6 +4,9 @@ package io.deephaven.server.table; import com.google.rpc.Code; +import dagger.assisted.Assisted; +import dagger.assisted.AssistedFactory; +import dagger.assisted.AssistedInject; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.SnapshotUnsuccessfulException; import io.deephaven.engine.exceptions.TableAlreadyFailedException; @@ -16,7 +19,6 @@ import io.deephaven.engine.table.impl.OperationSnapshotControl; import io.deephaven.engine.table.impl.UncoalescedTable; import io.deephaven.engine.updategraph.NotificationQueue; -import io.deephaven.extensions.barrage.util.GrpcUtil; import io.deephaven.hash.KeyedLongObjectHashMap; import io.deephaven.hash.KeyedLongObjectKey; import io.deephaven.internal.log.LoggerFactory; @@ -26,6 +28,7 @@ import io.deephaven.proto.backplane.grpc.Ticket; import io.deephaven.proto.util.Exceptions; import io.deephaven.proto.util.ExportTicketHelper; +import io.deephaven.server.session.SessionService; import io.deephaven.server.session.SessionState; import io.deephaven.util.SafeCloseable; import io.grpc.stub.StreamObserver; @@ -42,6 +45,12 @@ * sent a notification for exportId == 0 (which is otherwise an invalid export id). */ public class ExportedTableUpdateListener implements StreamObserver { + @AssistedFactory + public interface Factory { + ExportedTableUpdateListener create( + SessionState session, + StreamObserver responseObserver); + } private static final Logger log = LoggerFactory.getLogger(ExportedTableUpdateListener.class); @@ -49,16 +58,20 @@ public class ExportedTableUpdateListener implements StreamObserver responseObserver; + private final SessionService.ErrorTransformer errorTransformer; private final KeyedLongObjectHashMap updateListenerMap = new KeyedLongObjectHashMap<>(EXPORT_KEY); private volatile boolean isDestroyed = false; + @AssistedInject public ExportedTableUpdateListener( - final SessionState session, - final StreamObserver responseObserver) { + @Assisted final SessionState session, + @Assisted final StreamObserver responseObserver, + final SessionService.ErrorTransformer errorTransformer) { this.session = session; this.logPrefix = "ExportedTableUpdateListener(" + Integer.toHexString(System.identityHashCode(this)) + ") "; this.responseObserver = responseObserver; + this.errorTransformer = errorTransformer; } /** @@ -174,7 +187,7 @@ private synchronized void onNewTableExport(final Ticket ticket, final int export sendUpdateMessage(ticket, -1, Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Exported Table Already Failed")); } else { - sendUpdateMessage(ticket, -1, GrpcUtil.securelyWrapError(log, err, Code.FAILED_PRECONDITION)); + sendUpdateMessage(ticket, -1, errorTransformer.transform(err)); } } } @@ -213,10 +226,10 @@ private synchronized void sendUpdateMessage(final Ticket ticket, final long size * The table listener implementation that propagates updates to our internal queue. */ private class ListenerImpl extends InstrumentedTableUpdateListener { - final private BaseTable table; + final private BaseTable table; final private int exportId; - private ListenerImpl(final BaseTable table, final int exportId) { + private ListenerImpl(final BaseTable table, final int exportId) { super("ExportedTableUpdateListener (" + exportId + ")"); this.table = table; this.exportId = exportId; @@ -230,7 +243,8 @@ public void onUpdate(final TableUpdate upstream) { @Override public void onFailureInternal(final Throwable error, final Entry sourceEntry) { - sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket(exportId), table.size(), error); + sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket(exportId), table.size(), + errorTransformer.transform(error)); } @Override diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 7574253f2c7..55bd6b607b0 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -100,15 +100,20 @@ public class TableServiceGrpcImpl extends TableServiceGrpc.TableServiceImplBase private final TableServiceContextualAuthWiring authWiring; private final Map> operationMap; + private final ExportedTableUpdateListener.Factory exportedTableUpdateListenerFactory; + @Inject - public TableServiceGrpcImpl(final TicketRouter ticketRouter, + public TableServiceGrpcImpl( + final TicketRouter ticketRouter, final SessionService sessionService, final TableServiceContextualAuthWiring authWiring, - final Map> operationMap) { + final Map> operationMap, + final ExportedTableUpdateListener.Factory exportedTableUpdateListenerFactory) { this.ticketRouter = ticketRouter; this.sessionService = sessionService; this.authWiring = authWiring; this.operationMap = operationMap; + this.exportedTableUpdateListenerFactory = exportedTableUpdateListenerFactory; } private GrpcTableOperation getOp(final BatchTableRequest.Operation.OpCase op) { @@ -609,7 +614,8 @@ public void exportedTableUpdates( @NotNull final StreamObserver responseObserver) { final SessionState session = sessionService.getCurrentSession(); authWiring.checkPermissionExportedTableUpdates(session.getAuthContext(), request, Collections.emptyList()); - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, responseObserver); + final ExportedTableUpdateListener listener = + exportedTableUpdateListenerFactory.create(session, responseObserver); session.addExportListener(listener); ((ServerCallStreamObserver) responseObserver).setOnCancelHandler( () -> session.removeExportListener(listener)); diff --git a/server/src/test/java/io/deephaven/server/session/SessionServiceTest.java b/server/src/test/java/io/deephaven/server/session/SessionServiceTest.java index 841d9f70822..b65a2a6a363 100644 --- a/server/src/test/java/io/deephaven/server/session/SessionServiceTest.java +++ b/server/src/test/java/io/deephaven/server/session/SessionServiceTest.java @@ -9,6 +9,7 @@ import io.deephaven.server.util.TestControlledScheduler; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; +import io.grpc.StatusRuntimeException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -198,4 +199,77 @@ public void testSessionsAreIndependent() { Assert.eqNull(sessionService.getSessionForToken(expiration2.token), "sessionService.getSessionForToken(initialToken.token)"); } + + @Test + public void testErrorIdDeDupesIdentity() { + final Exception e1 = new RuntimeException("e1"); + final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer(); + + final StatusRuntimeException t1 = transformer.transform(e1); + final StatusRuntimeException t2 = transformer.transform(e1); + Assert.neq(t1, "t1", t2, "t2"); + Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()"); + } + + @Test + public void testErrorIdDeDupesParentCause() { + final Exception parent = new RuntimeException("parent"); + final Exception child = new RuntimeException("child", parent); + final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer(); + + // important to transform parent then child for this test + final StatusRuntimeException t1 = transformer.transform(parent); + final StatusRuntimeException t2 = transformer.transform(child); + Assert.neq(t1, "t1", t2, "t2"); + Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()"); + } + + @Test + public void testErrorIdDeDupesChildCause() { + final Exception parent = new RuntimeException("parent"); + final Exception child = new RuntimeException("child", parent); + final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer(); + + // important to transform child then parent for this test + final StatusRuntimeException t1 = transformer.transform(child); + final StatusRuntimeException t2 = transformer.transform(parent); + Assert.neq(t1, "t1", t2, "t2"); + Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()"); + } + + @Test + public void testErrorIdDeDupesSharedAncestorCause() { + final Exception parent = new RuntimeException("parent"); + final Exception child1 = new RuntimeException("child1", parent); + final Exception child2 = new RuntimeException("child2", parent); + final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer(); + + final StatusRuntimeException t1 = transformer.transform(child1); + final StatusRuntimeException t2 = transformer.transform(child2); + Assert.neq(t1, "t1", t2, "t2"); + Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()"); + + final StatusRuntimeException t3 = transformer.transform(parent); + Assert.neq(t1, "t1", t3, "t3"); + Assert.equals(t1.getMessage(), "t1.getMessage()", t3.getMessage(), "t3.getMessage()"); + } + + @Test + public void testErrorCausalLimit() { + final Exception leaf = new RuntimeException("leaf"); + final Exception p1 = new RuntimeException("lastIncluded", leaf); + Exception p0 = p1; + for (int i = SessionService.ObfuscatingErrorTransformer.MAX_STACK_TRACE_CAUSAL_DEPTH - 1; i > 0; --i) { + p0 = new RuntimeException("e" + i, p0); + } + + final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer(); + final StatusRuntimeException t0 = transformer.transform(p0); + final StatusRuntimeException t1 = transformer.transform(p1); + Assert.equals(t0.getMessage(), "t0.getMessage()", t1.getMessage(), "t1.getMessage()"); + + // this one should not have made it + final StatusRuntimeException tleaf = transformer.transform(leaf); + Assert.notEquals(t0.getMessage(), "t0.getMessage()", tleaf.getMessage(), "tleaf.getMessage()"); + } } diff --git a/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java b/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java index e6eff349461..3aaf9e4e349 100644 --- a/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java +++ b/server/src/test/java/io/deephaven/server/table/ExportTableUpdateListenerTest.java @@ -47,6 +47,7 @@ public class ExportTableUpdateListenerTest { private TestControlledScheduler scheduler; private TestSessionState session; private QueuingResponseObserver observer; + private SessionService.ErrorTransformer errorTransformer; @Before public void setup() { @@ -59,6 +60,8 @@ public void setup() { scheduler = new TestControlledScheduler(); session = new TestSessionState(); observer = new QueuingResponseObserver(); + + errorTransformer = new SessionService.ObfuscatingErrorTransformer(); } @After @@ -72,9 +75,15 @@ public void tearDown() { executionContext.close(); } + private ExportedTableUpdateListener createListener( + final SessionState session, + final QueuingResponseObserver observer) { + return new ExportedTableUpdateListener(session, observer, errorTransformer); + } + @Test public void testLifeCycleStaticTable() { - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -99,7 +108,7 @@ public void testRefreshStaticTable() { final SessionState.ExportObject t1 = session.newServerSideExport(src); // now add the listener - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -114,7 +123,7 @@ public void testRefreshStaticTable() { @Test public void testLifeCycleTickingTable() { - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -150,7 +159,7 @@ public void testRefreshTickingTable() { } // now add the listener - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -176,7 +185,7 @@ public void testSessionClose() { final SessionState.ExportObject t1 = session.newServerSideExport(src); // now add the listener - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -209,7 +218,7 @@ public void testPropagatesError() { } // now add the listener - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -226,10 +235,7 @@ public void testPropagatesError() { Assert.equals(updateId, "updateId", t1.getExportId(), "t1.getExportId()"); Assert.eq(msg.getSize(), "msg.getSize()", 42); Assert.eqFalse(msg.getUpdateFailureMessage().isEmpty(), "msg.getUpdateFailureMessage().isEmpty()"); - - // TODO (core#801): validate that our error is not directly embedded in the update (that would be a security - // concern) - Assert.eqTrue(msg.getUpdateFailureMessage().contains("awful"), "msg.contains('awful')"); + Assert.eqFalse(msg.getUpdateFailureMessage().contains("awful"), "msg.contains('awful')"); } @Test @@ -242,7 +248,7 @@ public void testListenerClosed() { } // now add the listener - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } @@ -276,7 +282,7 @@ public void testTableSizeUsesPrev() { final MutableObject> t1 = new MutableObject<>(); // now add the listener - final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, observer); + final ExportedTableUpdateListener listener = createListener(session, observer); try (final SafeCloseable scope = LivenessScopeStack.open()) { session.addExportListener(listener); } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/HasEventHandling.java b/web/client-api/src/main/java/io/deephaven/web/client/api/HasEventHandling.java index 29a0f174dff..bb78177eaea 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/HasEventHandling.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/HasEventHandling.java @@ -10,7 +10,6 @@ import elemental2.dom.CustomEvent; import elemental2.dom.CustomEventInit; import elemental2.dom.DomGlobal; -import elemental2.dom.Event; import elemental2.promise.Promise; import io.deephaven.web.client.fu.JsLog; import io.deephaven.web.client.fu.LazyPromise; @@ -18,7 +17,6 @@ import javaemul.internal.annotations.DoNotAutobox; import jsinterop.annotations.JsMethod; import jsinterop.annotations.JsOptional; -import jsinterop.annotations.JsProperty; import jsinterop.base.Js; import jsinterop.base.JsArrayLike; import jsinterop.base.JsPropertyMap; diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index 7d4ea477962..7996138b9c2 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -404,8 +404,7 @@ public boolean checkStatus(ResponseStreamWrapper.Status status) { // signal that the user needs to re-authenticate, make a new session // TODO (deephaven-core#3501) in theory we could make a new session for some auth types info.fireEvent(CoreClient.EVENT_RECONNECT_AUTH_FAILED); - } else if (status.getCode() == Code.Internal || status.getCode() == Code.Unknown - || status.getCode() == Code.Unavailable) { + } else if (status.isTransportError()) { // fire deprecated event for now info.notifyConnectionError(status); @@ -455,7 +454,7 @@ private void startExportNotificationsStream() { exportNotifications.onData(update -> { if (update.getUpdateFailureMessage() != null && !update.getUpdateFailureMessage().isEmpty()) { cache.get(new TableTicket(update.getExportId().getTicket_asU8())).ifPresent(state1 -> { - state1.forActiveTables(t -> t.failureHandled(update.getUpdateFailureMessage())); + state1.setResolution(ClientTableState.ResolutionState.FAILED, update.getUpdateFailureMessage()); }); } else { exportedTableUpdateMessage(new TableTicket(update.getExportId().getTicket_asU8()), @@ -1537,11 +1536,8 @@ private void startAndMaybeFlush(boolean isSnapshot, RecordBatch header, ByteBuff }); stream.onStatus(err -> { checkStatus(err); - if (!err.isOk()) { - // TODO (core#1181): fix this hack that enables barrage errors to propagate to the UI widget - state.forActiveSubscriptions((table, subscription) -> { - table.failureHandled(err.getDetails()); - }); + if (!err.isOk() && !err.isTransportError()) { + state.setResolution(ClientTableState.ResolutionState.FAILED, err.getDetails()); } }); BiDiStream oldStream = subscriptionStreams.put(state, stream); diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/ResponseStreamWrapper.java b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/ResponseStreamWrapper.java index a0f95c8498a..0a637e47fc5 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/ResponseStreamWrapper.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/barrage/stream/ResponseStreamWrapper.java @@ -45,6 +45,11 @@ static Status of(int code, String details, BrowserHeaders metadata) { default boolean isOk() { return getCode() == Code.OK; } + + @JsOverlay + default boolean isTransportError() { + return getCode() == Code.Internal || getCode() == Code.Unknown || getCode() == Code.Unavailable; + } } @JsType(isNative = true) public interface ServiceError { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/state/ClientTableState.java b/web/client-api/src/main/java/io/deephaven/web/client/state/ClientTableState.java index 37e64b60e64..dd246bf737a 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/state/ClientTableState.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/state/ClientTableState.java @@ -284,6 +284,9 @@ public void setResolution(ResolutionState resolution, String failMsg) { assert resolution == ResolutionState.RELEASED : "Trying to unrelease CTS " + this + " to " + resolution; return; } + if (this.resolution == resolution) { + return; + } this.resolution = resolution; if (resolution == ResolutionState.RUNNING) { if (onRunning != null) { @@ -302,6 +305,8 @@ public void setResolution(ResolutionState resolution, String failMsg) { // after a failure, we should discard onRunning (and this state entirely) onRunning = null; onReleased = null; + + forActiveTables(t -> t.failureHandled(failMsg)); } else if (resolution == ResolutionState.RELEASED) { if (onReleased != null) { onReleased.forEach(JsRunnable::run);