From 5a9e2dc0161654aa4ab4ee6c6e3f079c4ad2e084 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 21 Nov 2024 21:07:40 -0800 Subject: [PATCH] done --- .../examples/common/E2ETestUtils.java | 29 ++++++++++++++++++- .../examples/e2etest/E2ETestApplication.java | 1 + .../ResponsiveKafkaClientSupplier.java | 13 ++++++++- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java index cbe9f1b64..334849f9d 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/common/E2ETestUtils.java @@ -17,10 +17,13 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG; +import com.antithesis.sdk.Assert; +import com.datastax.oss.driver.api.core.AllNodesFailedException; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace; import com.fasterxml.jackson.databind.ObjectMapper; @@ -129,7 +132,31 @@ public static void retryFor(final Runnable task, Duration timeout) { */ public static void maybeCreateKeyspace(final Map properties) { if (properties.get(STORAGE_BACKEND_TYPE_CONFIG).equals(StorageBackend.CASSANDRA.name())) { - E2ETestUtils.retryFor(() -> doMaybeCreateKeyspace(properties), Duration.ofMinutes(5)); + try { + E2ETestUtils.retryFor(() -> doMaybeCreateKeyspace(properties), Duration.ofMinutes(5)); + } catch (final Exception e) { + final String errorMessage = "Failed to create Scylla keyspace within the timeout"; + LOG.error(errorMessage, e); + + final var errorDetails = buildAssertionContext(errorMessage); + errorDetails.put("exceptionType", e.getClass().getName()); + if (e instanceof AllNodesFailedException) { + final Map> allErrors = ((AllNodesFailedException) e).getAllErrors(); + int node_i = 1; + for (final var node : allErrors.entrySet()) { + final String nodeId = "node-" + node_i; + errorDetails.put(nodeId + "_endPoint", node.getKey().getEndPoint().asMetricPrefix()); + + final var nodeErrors = node.getValue(); + LOG.error("All errors for node at {}: {}", nodeId, nodeErrors); + int error_i = 1; + for (final var error : nodeErrors) { + errorDetails.put(nodeId + "_err-" + error_i, error.getMessage()); + } + } + Assert.unreachable(errorMessage, errorDetails); + } + } } } diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java index 15a7317b8..314430c63 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java @@ -172,6 +172,7 @@ public void init(final FixedKeyProcessorContext context) { public void process(final FixedKeyRecord record) { final var random = Math.abs(randomGenerator.nextLong() % 10000); if (random < exceptionThreshold) { + LOG.info("Injecting test exception"); throw new InjectedE2ETestException(); } final ValueAndTimestamp old = store.get(record.key()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java index dd0b0aa45..9ca1024aa 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java @@ -264,7 +264,18 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( } private synchronized void derefListenersForThread(final String threadId) { - if (threadListeners.get(threadId).deref()) { + final var listener = threadListeners.get(threadId); + if (listener == null) { + final String errorMsg = String.format("Could not find thread listener for thread id %s. " + + "Current thread name: %s. Other registered " + + "thread listener ids: %s", + threadId, Thread.currentThread().getName(), + threadListeners.keySet()); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + + if (listener.deref()) { threadListeners.remove(threadId); } }