Skip to content

Commit

Permalink
Improve error logging for failure during Antithesis e2e test setup (#397
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ableegoldman authored Nov 22, 2024
1 parent 9e29126 commit 2669a78
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG;

import com.antithesis.sdk.Assert;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -129,7 +132,31 @@ public static void retryFor(final Runnable task, Duration timeout) {
*/
public static void maybeCreateKeyspace(final Map<String, Object> properties) {
if (properties.get(STORAGE_BACKEND_TYPE_CONFIG).equals(StorageBackend.CASSANDRA.name())) {
E2ETestUtils.retryFor(() -> doMaybeCreateKeyspace(properties), Duration.ofMinutes(5));
try {
E2ETestUtils.retryFor(() -> doMaybeCreateKeyspace(properties), Duration.ofMinutes(5));
} catch (final Exception e) {
final String errorMessage = "Failed to create Scylla keyspace within the timeout";
LOG.error(errorMessage, e);

final var errorDetails = buildAssertionContext(errorMessage);
errorDetails.put("exceptionType", e.getClass().getName());
if (e instanceof AllNodesFailedException) {
final Map<Node, List<Throwable>> allErrors = ((AllNodesFailedException) e).getAllErrors();
int nodeIdx = 1;
for (final var node : allErrors.entrySet()) {
final String nodeId = "node-" + nodeIdx;
errorDetails.put(nodeId + "_endPoint", node.getKey().getEndPoint().asMetricPrefix());

final var nodeErrors = node.getValue();
LOG.error("All errors for node at {}: {}", nodeId, nodeErrors);
int errorIdx = 1;
for (final var error : nodeErrors) {
errorDetails.put(nodeId + "_err-" + errorIdx, error.getMessage());
}
}
Assert.unreachable(errorMessage, errorDetails);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public void init(final FixedKeyProcessorContext<Long, OutputRecord> context) {
public void process(final FixedKeyRecord<Long, InputRecord> record) {
final var random = Math.abs(randomGenerator.nextLong() % 10000);
if (random < exceptionThreshold) {
LOG.info("Injecting test exception");
throw new InjectedE2ETestException();
}
final ValueAndTimestamp<OutputRecord> old = store.get(record.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ protected Topology buildTopology() {
})
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofHours(12)))
.reduce(EnrichedOrder::combineWith, Materialized.with(Serdes.String(), RegressionSchema.enrichedOrderSerde()))
.reduce(EnrichedOrder::combineWith,
Materialized.with(Serdes.String(), RegressionSchema.enrichedOrderSerde()))
.toStream()
.selectKey((w, v) -> w.key())
.to(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,18 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread(
}

private synchronized void derefListenersForThread(final String threadId) {
if (threadListeners.get(threadId).deref()) {
final var listener = threadListeners.get(threadId);
if (listener == null) {
final String errorMsg = String.format("Could not find thread listener for thread id %s. "
+ "Current thread name: %s. Other registered "
+ "thread listener ids: %s",
threadId, Thread.currentThread().getName(),
threadListeners.keySet());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}

if (listener.deref()) {
threadListeners.remove(threadId);
}
}
Expand Down

0 comments on commit 2669a78

Please sign in to comment.