From 56425db4e9bf06759f0111673961075bb4988cef Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Fri, 14 Jun 2024 00:22:29 -0700 Subject: [PATCH] add sometimes/never asserts --- .../examples/e2etest/E2ETestApplication.java | 12 ++++++++++-- .../responsive/examples/e2etest/E2ETestDriver.java | 14 +++++++++++++- kafka-client/build.gradle.kts | 2 +- .../kafka/api/async/internals/AsyncThreadPool.java | 4 ++++ 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java index 97d7a38ee..448e5fd6a 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestApplication.java @@ -4,10 +4,13 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_HOSTNAME_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG; +import com.datastax.oss.driver.api.core.AllNodesFailedException; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.connection.ConnectionInitException; +import com.datastax.oss.driver.api.core.servererrors.ReadFailureException; +import com.datastax.oss.driver.api.core.servererrors.WriteFailureException; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace; import dev.responsive.examples.e2etest.Schema.InputRecord; @@ -160,7 +163,9 @@ private KafkaStreams buildTopology(final Map properties) { ); streams.setUncaughtExceptionHandler(exception -> { if (shouldLogError(exception, new LinkedList<>())) { - LOG.error("uncaught exception on test app stream thread {}", + LOG.error("uncaught exception {}({}) on test app stream thread {}", + exception.getClass().getName(), + exception.getMessage(), causalSummary(exception, new LinkedList<>()), exception ); @@ -193,7 +198,10 @@ private boolean shouldLogError(final Throwable throwable, List seen) TaskCorruptedException.class, TimeoutException.class, java.util.concurrent.TimeoutException.class, - TransactionAbortedException.class + TransactionAbortedException.class, + WriteFailureException.class, + AllNodesFailedException.class, + ReadFailureException.class ); for (final var c : dontcare) { if (c.isInstance(throwable)) { diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java index b3da70f1f..7cdc49d41 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/e2etest/E2ETestDriver.java @@ -314,6 +314,10 @@ private synchronized List popOffsets(final long upTo) { throw new RuntimeException(e); } if (Duration.between(start, Instant.now()).getSeconds() > 300) { + LOG.error("ANTITHESIS NEVER: waited longer than 300 seconds for offset {} {}", + upTo, + partition + ); throw new IllegalStateException(String.format( "waited longer than 300 seconds for offset %d %d", upTo, partition )); @@ -365,7 +369,7 @@ private void updateReceived( } final var expectedChecksum = checksum.current(); if (!Arrays.equals(expectedChecksum, observedChecksum)) { - LOG.error("checksum mismatch - key({}), recvdCount({}), {} {}", + LOG.error("ANTITHESIS NEVER: checksum mismatch - key({}), recvdCount({}), {} {}", key, recvdCount, Arrays.toString(checksum.current()), @@ -380,6 +384,14 @@ private void checkStalled(final RecordMetadata earliestUnconsumed, final int par if (earliestUnconsumed.offset() == stalled.offset()) { // the earliest unconsumed record has not advanced if (Duration.between(faultsStoppedAt, Instant.now()).compareTo(receivedThreshold) > 0) { + LOG.error( + "ANTITHESIS NEVER: have not seen results for {} on {} in {}. last sent/rcvd {}/{}", + key, + partition, + receivedThreshold.plus(faultStopThreshold), + Instant.ofEpochMilli(stalled.timestamp()), + lastReceived + ); throw new IllegalStateException(String.format( "have not seen any results for %d on %d in %s. earliest sent %s. last recvd %s", key, diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 272fbd564..ce51d40d8 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -16,7 +16,7 @@ import java.io.ByteArrayOutputStream * limitations under the License. */ -// counter to change sha: 32 +// counter to change sha: 33 plugins { id("responsive.java-library-conventions") diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java index 7d0532de1..ba022f7f8 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java @@ -108,6 +108,10 @@ public void removeProcessor(final String processorName, final int partition) { if (inFlightForTask != null) { log.info("Cancelling {} pending records for {}[{}]", inFlightForTask.size(), processorName, partition); + if (!inFlightForTask.isEmpty()) { + log.info("ANTITHESIS SOMETIMES: cancelling {} pending records for {}[{}]", + inFlightForTask.size(), processorName, partition); + } inFlightForTask.values().forEach(f -> f.future().cancel(true)); } }