Skip to content

Commit

Permalink
Eliminate Double Error Notifications in jsapi and Error Log (deephave…
Browse files Browse the repository at this point in the history
…n#5048)

Co-authored-by: Colin Alworth <[email protected]>
Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
3 people authored Jan 19, 2024
1 parent ca5751c commit 1593f91
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,6 @@
public class GrpcUtil {
private static final Logger log = LoggerFactory.getLogger(GrpcUtil.class);

public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err) {
return securelyWrapError(log, err, Code.INVALID_ARGUMENT);
}

public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err,
final Code statusCode) {
if (err instanceof StatusRuntimeException) {
return (StatusRuntimeException) err;
}

final UUID errorId = UUID.randomUUID();
log.error().append("Internal Error '").append(errorId.toString()).append("' ").append(err).endl();
return Exceptions.statusRuntimeException(statusCode, "Details Logged w/ID '" + errorId + "'");
}

/**
* Wraps the provided runner in a try/catch block to minimize damage caused by a failing externally supplied helper.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,6 @@ private void updateSubscriptionsSnapshotAndPropagate() {
if (pendingError != null) {
StatusRuntimeException ex = errorTransformer.transform(pendingError);
for (final Subscription subscription : activeSubscriptions) {
// TODO (core#801): effective error reporting to api clients
GrpcUtil.safelyError(subscription.listener, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package io.deephaven.server.session;

import com.github.f4b6a3.uuid.UuidCreator;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import io.deephaven.auth.AuthenticationException;
Expand All @@ -13,6 +15,7 @@
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.TerminationNotificationResponse;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.util.Scheduler;
import io.deephaven.auth.AuthContext;
import io.deephaven.util.process.ProcessEnvironment;
Expand All @@ -22,6 +25,7 @@
import org.apache.arrow.flight.auth2.Auth2Constants;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -54,8 +58,21 @@ public interface ErrorTransformer {

@Singleton
public static class ObfuscatingErrorTransformer implements ErrorTransformer {
@VisibleForTesting
static final int MAX_STACK_TRACE_CAUSAL_DEPTH = 25;
private static final int MAX_CACHE_BUILDER_SIZE = 1009;
private static final int MAX_CACHE_DURATION_MIN = 1;

private final Cache<Throwable, UUID> idCache;

@Inject
public ObfuscatingErrorTransformer() {}
public ObfuscatingErrorTransformer() {
idCache = CacheBuilder.newBuilder()
.expireAfterAccess(MAX_CACHE_DURATION_MIN, TimeUnit.MINUTES)
.maximumSize(MAX_CACHE_BUILDER_SIZE)
.weakKeys()
.build();
}

@Override
public StatusRuntimeException transform(final Throwable err) {
Expand All @@ -70,10 +87,51 @@ public StatusRuntimeException transform(final Throwable err) {
}
return sre;
} else if (err instanceof InterruptedException) {
return GrpcUtil.securelyWrapError(log, err, Code.UNAVAILABLE);
return securelyWrapError(err, Code.UNAVAILABLE);
} else {
return GrpcUtil.securelyWrapError(log, err);
return securelyWrapError(err, Code.INVALID_ARGUMENT);
}
}

private StatusRuntimeException securelyWrapError(@NotNull final Throwable err, final Code statusCode) {
UUID errorId;
final boolean shouldLog;

synchronized (idCache) {
errorId = idCache.getIfPresent(err);
shouldLog = errorId == null;

int currDepth = 0;
// @formatter:off
for (Throwable causeToCheck = err.getCause();
errorId == null && ++currDepth < MAX_STACK_TRACE_CAUSAL_DEPTH && causeToCheck != null;
causeToCheck = causeToCheck.getCause()) {
// @formatter:on
errorId = idCache.getIfPresent(causeToCheck);
}

if (errorId == null) {
errorId = UuidCreator.getRandomBased();
}

// @formatter:off
for (Throwable throwableToAdd = err;
currDepth > 0;
throwableToAdd = throwableToAdd.getCause(), --currDepth) {
// @formatter:on
if (throwableToAdd.getStackTrace().length > 0) {
// Note that stackless exceptions are singletons, so it would be a bad idea to cache them
idCache.put(throwableToAdd, errorId);
}
}
}

if (shouldLog) {
// this is a new top-level error; log it, possibly using an existing errorId
log.error().append("Internal Error '").append(errorId.toString()).append("' ").append(err).endl();
}

return Exceptions.statusRuntimeException(statusCode, "Details Logged w/ID '" + errorId + "'");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
package io.deephaven.server.table;

import com.google.rpc.Code;
import dagger.assisted.Assisted;
import dagger.assisted.AssistedFactory;
import dagger.assisted.AssistedInject;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.SnapshotUnsuccessfulException;
import io.deephaven.engine.exceptions.TableAlreadyFailedException;
Expand All @@ -16,7 +19,6 @@
import io.deephaven.engine.table.impl.OperationSnapshotControl;
import io.deephaven.engine.table.impl.UncoalescedTable;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.hash.KeyedLongObjectHashMap;
import io.deephaven.hash.KeyedLongObjectKey;
import io.deephaven.internal.log.LoggerFactory;
Expand All @@ -26,6 +28,7 @@
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.util.SafeCloseable;
import io.grpc.stub.StreamObserver;
Expand All @@ -42,23 +45,33 @@
* sent a notification for exportId == 0 (which is otherwise an invalid export id).
*/
public class ExportedTableUpdateListener implements StreamObserver<ExportNotification> {
@AssistedFactory
public interface Factory {
ExportedTableUpdateListener create(
SessionState session,
StreamObserver<ExportedTableUpdateMessage> responseObserver);
}

private static final Logger log = LoggerFactory.getLogger(ExportedTableUpdateListener.class);

private final SessionState session;

private final String logPrefix;
private final StreamObserver<ExportedTableUpdateMessage> responseObserver;
private final SessionService.ErrorTransformer errorTransformer;
private final KeyedLongObjectHashMap<ListenerImpl> updateListenerMap = new KeyedLongObjectHashMap<>(EXPORT_KEY);

private volatile boolean isDestroyed = false;

@AssistedInject
public ExportedTableUpdateListener(
final SessionState session,
final StreamObserver<ExportedTableUpdateMessage> responseObserver) {
@Assisted final SessionState session,
@Assisted final StreamObserver<ExportedTableUpdateMessage> responseObserver,
final SessionService.ErrorTransformer errorTransformer) {
this.session = session;
this.logPrefix = "ExportedTableUpdateListener(" + Integer.toHexString(System.identityHashCode(this)) + ") ";
this.responseObserver = responseObserver;
this.errorTransformer = errorTransformer;
}

/**
Expand Down Expand Up @@ -174,7 +187,7 @@ private synchronized void onNewTableExport(final Ticket ticket, final int export
sendUpdateMessage(ticket, -1, Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION,
"Exported Table Already Failed"));
} else {
sendUpdateMessage(ticket, -1, GrpcUtil.securelyWrapError(log, err, Code.FAILED_PRECONDITION));
sendUpdateMessage(ticket, -1, errorTransformer.transform(err));
}
}
}
Expand Down Expand Up @@ -213,10 +226,10 @@ private synchronized void sendUpdateMessage(final Ticket ticket, final long size
* The table listener implementation that propagates updates to our internal queue.
*/
private class ListenerImpl extends InstrumentedTableUpdateListener {
final private BaseTable table;
final private BaseTable<?> table;
final private int exportId;

private ListenerImpl(final BaseTable table, final int exportId) {
private ListenerImpl(final BaseTable<?> table, final int exportId) {
super("ExportedTableUpdateListener (" + exportId + ")");
this.table = table;
this.exportId = exportId;
Expand All @@ -230,7 +243,8 @@ public void onUpdate(final TableUpdate upstream) {

@Override
public void onFailureInternal(final Throwable error, final Entry sourceEntry) {
sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket(exportId), table.size(), error);
sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket(exportId), table.size(),
errorTransformer.transform(error));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,20 @@ public class TableServiceGrpcImpl extends TableServiceGrpc.TableServiceImplBase
private final TableServiceContextualAuthWiring authWiring;
private final Map<BatchTableRequest.Operation.OpCase, GrpcTableOperation<?>> operationMap;

private final ExportedTableUpdateListener.Factory exportedTableUpdateListenerFactory;

@Inject
public TableServiceGrpcImpl(final TicketRouter ticketRouter,
public TableServiceGrpcImpl(
final TicketRouter ticketRouter,
final SessionService sessionService,
final TableServiceContextualAuthWiring authWiring,
final Map<BatchTableRequest.Operation.OpCase, GrpcTableOperation<?>> operationMap) {
final Map<BatchTableRequest.Operation.OpCase, GrpcTableOperation<?>> operationMap,
final ExportedTableUpdateListener.Factory exportedTableUpdateListenerFactory) {
this.ticketRouter = ticketRouter;
this.sessionService = sessionService;
this.authWiring = authWiring;
this.operationMap = operationMap;
this.exportedTableUpdateListenerFactory = exportedTableUpdateListenerFactory;
}

private <T> GrpcTableOperation<T> getOp(final BatchTableRequest.Operation.OpCase op) {
Expand Down Expand Up @@ -609,7 +614,8 @@ public void exportedTableUpdates(
@NotNull final StreamObserver<ExportedTableUpdateMessage> responseObserver) {
final SessionState session = sessionService.getCurrentSession();
authWiring.checkPermissionExportedTableUpdates(session.getAuthContext(), request, Collections.emptyList());
final ExportedTableUpdateListener listener = new ExportedTableUpdateListener(session, responseObserver);
final ExportedTableUpdateListener listener =
exportedTableUpdateListenerFactory.create(session, responseObserver);
session.addExportListener(listener);
((ServerCallStreamObserver<ExportedTableUpdateMessage>) responseObserver).setOnCancelHandler(
() -> session.removeExportListener(listener));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.server.util.TestControlledScheduler;
import io.deephaven.util.SafeCloseable;
import io.deephaven.auth.AuthContext;
import io.grpc.StatusRuntimeException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -198,4 +199,77 @@ public void testSessionsAreIndependent() {
Assert.eqNull(sessionService.getSessionForToken(expiration2.token),
"sessionService.getSessionForToken(initialToken.token)");
}

@Test
public void testErrorIdDeDupesIdentity() {
final Exception e1 = new RuntimeException("e1");
final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer();

final StatusRuntimeException t1 = transformer.transform(e1);
final StatusRuntimeException t2 = transformer.transform(e1);
Assert.neq(t1, "t1", t2, "t2");
Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()");
}

@Test
public void testErrorIdDeDupesParentCause() {
final Exception parent = new RuntimeException("parent");
final Exception child = new RuntimeException("child", parent);
final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer();

// important to transform parent then child for this test
final StatusRuntimeException t1 = transformer.transform(parent);
final StatusRuntimeException t2 = transformer.transform(child);
Assert.neq(t1, "t1", t2, "t2");
Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()");
}

@Test
public void testErrorIdDeDupesChildCause() {
final Exception parent = new RuntimeException("parent");
final Exception child = new RuntimeException("child", parent);
final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer();

// important to transform child then parent for this test
final StatusRuntimeException t1 = transformer.transform(child);
final StatusRuntimeException t2 = transformer.transform(parent);
Assert.neq(t1, "t1", t2, "t2");
Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()");
}

@Test
public void testErrorIdDeDupesSharedAncestorCause() {
final Exception parent = new RuntimeException("parent");
final Exception child1 = new RuntimeException("child1", parent);
final Exception child2 = new RuntimeException("child2", parent);
final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer();

final StatusRuntimeException t1 = transformer.transform(child1);
final StatusRuntimeException t2 = transformer.transform(child2);
Assert.neq(t1, "t1", t2, "t2");
Assert.equals(t1.getMessage(), "t1.getMessage()", t2.getMessage(), "t2.getMessage()");

final StatusRuntimeException t3 = transformer.transform(parent);
Assert.neq(t1, "t1", t3, "t3");
Assert.equals(t1.getMessage(), "t1.getMessage()", t3.getMessage(), "t3.getMessage()");
}

@Test
public void testErrorCausalLimit() {
final Exception leaf = new RuntimeException("leaf");
final Exception p1 = new RuntimeException("lastIncluded", leaf);
Exception p0 = p1;
for (int i = SessionService.ObfuscatingErrorTransformer.MAX_STACK_TRACE_CAUSAL_DEPTH - 1; i > 0; --i) {
p0 = new RuntimeException("e" + i, p0);
}

final SessionService.ObfuscatingErrorTransformer transformer = new SessionService.ObfuscatingErrorTransformer();
final StatusRuntimeException t0 = transformer.transform(p0);
final StatusRuntimeException t1 = transformer.transform(p1);
Assert.equals(t0.getMessage(), "t0.getMessage()", t1.getMessage(), "t1.getMessage()");

// this one should not have made it
final StatusRuntimeException tleaf = transformer.transform(leaf);
Assert.notEquals(t0.getMessage(), "t0.getMessage()", tleaf.getMessage(), "tleaf.getMessage()");
}
}
Loading

0 comments on commit 1593f91

Please sign in to comment.