Skip to content

Commit

Permalink
add sometimes/never asserts
Browse files Browse the repository at this point in the history
  • Loading branch information
rodesai committed Jun 14, 2024
1 parent f0f8944 commit 56425db
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_HOSTNAME_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.connection.ConnectionInitException;
import com.datastax.oss.driver.api.core.servererrors.ReadFailureException;
import com.datastax.oss.driver.api.core.servererrors.WriteFailureException;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import dev.responsive.examples.e2etest.Schema.InputRecord;
Expand Down Expand Up @@ -160,7 +163,9 @@ private KafkaStreams buildTopology(final Map<String, Object> properties) {
);
streams.setUncaughtExceptionHandler(exception -> {
if (shouldLogError(exception, new LinkedList<>())) {
LOG.error("uncaught exception on test app stream thread {}",
LOG.error("uncaught exception {}({}) on test app stream thread {}",
exception.getClass().getName(),
exception.getMessage(),
causalSummary(exception, new LinkedList<>()),
exception
);
Expand Down Expand Up @@ -193,7 +198,10 @@ private boolean shouldLogError(final Throwable throwable, List<Throwable> seen)
TaskCorruptedException.class,
TimeoutException.class,
java.util.concurrent.TimeoutException.class,
TransactionAbortedException.class
TransactionAbortedException.class,
WriteFailureException.class,
AllNodesFailedException.class,
ReadFailureException.class
);
for (final var c : dontcare) {
if (c.isInstance(throwable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ private synchronized List<RecordMetadata> popOffsets(final long upTo) {
throw new RuntimeException(e);
}
if (Duration.between(start, Instant.now()).getSeconds() > 300) {
LOG.error("ANTITHESIS NEVER: waited longer than 300 seconds for offset {} {}",
upTo,
partition
);
throw new IllegalStateException(String.format(
"waited longer than 300 seconds for offset %d %d", upTo, partition
));
Expand Down Expand Up @@ -365,7 +369,7 @@ private void updateReceived(
}
final var expectedChecksum = checksum.current();
if (!Arrays.equals(expectedChecksum, observedChecksum)) {
LOG.error("checksum mismatch - key({}), recvdCount({}), {} {}",
LOG.error("ANTITHESIS NEVER: checksum mismatch - key({}), recvdCount({}), {} {}",
key,
recvdCount,
Arrays.toString(checksum.current()),
Expand All @@ -380,6 +384,14 @@ private void checkStalled(final RecordMetadata earliestUnconsumed, final int par
if (earliestUnconsumed.offset() == stalled.offset()) {
// the earliest unconsumed record has not advanced
if (Duration.between(faultsStoppedAt, Instant.now()).compareTo(receivedThreshold) > 0) {
LOG.error(
"ANTITHESIS NEVER: have not seen results for {} on {} in {}. last sent/rcvd {}/{}",
key,
partition,
receivedThreshold.plus(faultStopThreshold),
Instant.ofEpochMilli(stalled.timestamp()),
lastReceived
);
throw new IllegalStateException(String.format(
"have not seen any results for %d on %d in %s. earliest sent %s. last recvd %s",
key,
Expand Down
2 changes: 1 addition & 1 deletion kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.io.ByteArrayOutputStream
* limitations under the License.
*/

// counter to change sha: 32
// counter to change sha: 33

plugins {
id("responsive.java-library-conventions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public void removeProcessor(final String processorName, final int partition) {
if (inFlightForTask != null) {
log.info("Cancelling {} pending records for {}[{}]",
inFlightForTask.size(), processorName, partition);
if (!inFlightForTask.isEmpty()) {
log.info("ANTITHESIS SOMETIMES: cancelling {} pending records for {}[{}]",
inFlightForTask.size(), processorName, partition);
}
inFlightForTask.values().forEach(f -> f.future().cancel(true));
}
}
Expand Down

0 comments on commit 56425db

Please sign in to comment.