Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Nov 22, 2024
1 parent 9e29126 commit 5a9e2dc
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
import static dev.responsive.kafka.api.config.ResponsiveConfig.CASSANDRA_PORT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG;

import com.antithesis.sdk.Assert;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -129,7 +132,31 @@ public static void retryFor(final Runnable task, Duration timeout) {
*/
public static void maybeCreateKeyspace(final Map<String, Object> properties) {
if (properties.get(STORAGE_BACKEND_TYPE_CONFIG).equals(StorageBackend.CASSANDRA.name())) {
E2ETestUtils.retryFor(() -> doMaybeCreateKeyspace(properties), Duration.ofMinutes(5));
try {
E2ETestUtils.retryFor(() -> doMaybeCreateKeyspace(properties), Duration.ofMinutes(5));
} catch (final Exception e) {
final String errorMessage = "Failed to create Scylla keyspace within the timeout";
LOG.error(errorMessage, e);

final var errorDetails = buildAssertionContext(errorMessage);
errorDetails.put("exceptionType", e.getClass().getName());
if (e instanceof AllNodesFailedException) {
final Map<Node, List<Throwable>> allErrors = ((AllNodesFailedException) e).getAllErrors();
int node_i = 1;
for (final var node : allErrors.entrySet()) {
final String nodeId = "node-" + node_i;
errorDetails.put(nodeId + "_endPoint", node.getKey().getEndPoint().asMetricPrefix());

final var nodeErrors = node.getValue();
LOG.error("All errors for node at {}: {}", nodeId, nodeErrors);
int error_i = 1;
for (final var error : nodeErrors) {
errorDetails.put(nodeId + "_err-" + error_i, error.getMessage());
}
}
Assert.unreachable(errorMessage, errorDetails);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public void init(final FixedKeyProcessorContext<Long, OutputRecord> context) {
public void process(final FixedKeyRecord<Long, InputRecord> record) {
final var random = Math.abs(randomGenerator.nextLong() % 10000);
if (random < exceptionThreshold) {
LOG.info("Injecting test exception");
throw new InjectedE2ETestException();
}
final ValueAndTimestamp<OutputRecord> old = store.get(record.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,18 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread(
}

private synchronized void derefListenersForThread(final String threadId) {
if (threadListeners.get(threadId).deref()) {
final var listener = threadListeners.get(threadId);
if (listener == null) {
final String errorMsg = String.format("Could not find thread listener for thread id %s. "
+ "Current thread name: %s. Other registered "
+ "thread listener ids: %s",
threadId, Thread.currentThread().getName(),
threadListeners.keySet());
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}

if (listener.deref()) {
threadListeners.remove(threadId);
}
}
Expand Down

0 comments on commit 5a9e2dc

Please sign in to comment.