Skip to content

Commit

Permalink
Cluster freshness should fail if no single partition succeeds
Browse files Browse the repository at this point in the history
Across all the partitions for all topics for all consumers for a given
cluster, if we succeed at reading the freshness for at least one of
the consumers then the cluster freshness succeeds. Without this, if
none the consumers could be evaluated successfully then the cluster
would still be marked successful, but that often indicated an
incorrectly configured cluster. However, if we are able to read even
a single partition, then we can reach the cluster. Maybe its a transient
so we should be allowed a next round to try to get more successes.

Addresses #51
  • Loading branch information
jyates committed Jun 29, 2022
1 parent c88ad31 commit 70afff1
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -253,7 +254,7 @@ void run() {
* @throws RuntimeException if there is a systemic problem that should shutdown the application.
*/
private ListenableFuture<String> measureCluster(Burrow.ClusterClient client) {
List<ListenableFuture<List<Object>>> completedConsumers = new ArrayList<>();
List<ListenableFuture<List<PartitionResult>>> completedConsumers = new ArrayList<>();
List<String> consumerGroups;
try {
consumerGroups = client.consumerGroups();
Expand All @@ -265,8 +266,9 @@ private ListenableFuture<String> measureCluster(Burrow.ClusterClient client) {
return Futures.immediateFailedFuture(e);
}

String cluster = client.getCluster();
try {
ArrayBlockingQueue<KafkaConsumer> workers = this.availableWorkers.get(client.getCluster());
ArrayBlockingQueue<KafkaConsumer> workers = this.availableWorkers.get(cluster);
for (String consumerGroup : consumerGroups) {
completedConsumers.add(measureConsumer(client, workers, consumerGroup));
}
Expand All @@ -278,10 +280,40 @@ private ListenableFuture<String> measureCluster(Burrow.ClusterClient client) {
throw e;
}

// if all the consumer measurements succeed, then we return the cluster name
// if at least one consumer measurement succeeds, then we return the cluster name
// otherwise, Future.get will throw an exception representing the failure to measure a consumer (and thus the
// failure to successfully monitor the cluster).
return Futures.whenAllSucceed(completedConsumers).call(client::getCluster, this.executor);
return Futures.whenAllSucceed(completedConsumers).call(() -> {
List<PartitionResult> allPartitions = completedConsumers.stream()
.flatMap(f -> {
// recall, these have all completed successfully by this point, unless it's something catastrophic, so
// this is safe to just re-throw if we do find an exception
try {
return f.get().stream();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());

long successes = allPartitions.stream()
.filter(result -> result.success)
.count();

// if any single partition for any single consumer succeeded, then we count the cluster as having been successful
if (successes != allPartitions.size()) {
LOG.info("Freshness for {} / {} partitions across all consumers succeeded for cluster {}",
successes, allPartitions.size(), cluster);
}

if (successes > 0) {
LOG.info("Got freshness for at least one partition for one consumer partition for {} marking the cluster " +
"successful", cluster);
return cluster;
}

throw new RuntimeException("No single partition for any topic for any consumer for cluster {}" + cluster +
" returned successfully - is the cluster configured correctly?");
}, this.executor);
}

/**
Expand All @@ -299,7 +331,7 @@ private ListenableFuture<String> measureCluster(Burrow.ClusterClient client) {
* failed to be measured the entire future (i.e. calls to {@link Future#get()}) will be considered a failure.
* @throws InterruptedException if the application is interrupted while waiting for an available worker
*/
private ListenableFuture<List<Object>> measureConsumer(Burrow.ClusterClient burrow,
private ListenableFuture<List<PartitionResult>> measureConsumer(Burrow.ClusterClient burrow,
ArrayBlockingQueue<KafkaConsumer> workers,
String consumerGroup) throws InterruptedException {
Map<String, Object> status;
Expand All @@ -316,7 +348,7 @@ private ListenableFuture<List<Object>> measureConsumer(Burrow.ClusterClient burr

boolean anyEndOffsetFound = false;
List<Map<String, Object>> partitions = (List<Map<String, Object>>) status.get("partitions");
List<ListenableFuture<?>> partitionFreshnessComputation = new ArrayList<>(partitions.size());
List<ListenableFuture<PartitionResult>> partitionFreshnessComputation = new ArrayList<>(partitions.size());
for (Map<String, Object> state : partitions) {
String topic = (String) state.get("topic");
int partition = (int) state.get("partition");
Expand All @@ -336,7 +368,22 @@ private ListenableFuture<List<Object>> measureConsumer(Burrow.ClusterClient burr

// wait for a consumer to become available
KafkaConsumer consumer = workers.take();
ListenableFuture<?> result = this.executor.submit(new FreshnessTracker(consumerState, consumer, metrics));
ListenableFuture<PartitionResult> result = this.executor.submit(new Callable<PartitionResult>() {
FreshnessTracker tracker = new FreshnessTracker(consumerState, consumer, metrics);

@Override
public PartitionResult call() {
try {
tracker.run();
return new PartitionResult(consumerState);
} catch (Exception e) {
// intentionally at debug - there are many reasons for failures and often many partitions will fail for
// one reason or another, which can clog the logs.
LOG.debug("Failed to evaluate freshness for {}", consumerState, e);
return new PartitionResult(consumerState, e);
}
}
});
// Hand back the consumer to the available workers when the task is complete
Futures.addCallback(result, new FutureCallback<Object>() {
@Override
Expand Down Expand Up @@ -367,16 +414,33 @@ public void onFailure(Throwable throwable) {
.map(partition -> {
try {
return partition.get();
} catch (Exception e) {
// skip it!
return null;
} catch (Exception e){
// only can happen if we are interrupted, or something catastrophic, which both fit our criteria for
// failing the consumer
throw new RuntimeException(e);
}
})
.filter(Objects::isNull)
.collect(Collectors.toList());
}, this.executor);
}

class PartitionResult {
FreshnessTracker.ConsumerOffset consumerOffset;
boolean success;
Optional<Throwable> errorCause;


public PartitionResult(FreshnessTracker.ConsumerOffset consumerOffset) {
this(consumerOffset, null);
}

public PartitionResult(FreshnessTracker.ConsumerOffset consumerOffset, Throwable errorCause) {
this.consumerOffset = consumerOffset;
this.errorCause = Optional.ofNullable(errorCause);
this.success = !this.errorCause.isPresent();
}
}

private void stop() {
if (this.executor != null) {
this.executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -105,9 +106,9 @@ public void testLargeCurrentLag() throws Exception {
freshness.run();

FreshnessMetrics metrics = freshness.getMetricsForTesting();

assertSuccessfulClusterMeasurement(freshness, "cluster1");
Gauge.Child measurement = metrics.freshness.labels("cluster1", "group1", "topic1", "1");
assertTrue("Should have at least the specified lag for the group "+lagMs+", but found"+measurement.get(),
assertTrue("Should have at least the specified lag for the group " + lagMs + ", but found" + measurement.get(),
measurement.get() >= lagMs);
});
}
Expand Down Expand Up @@ -146,12 +147,17 @@ public void testFailClusterWhenInterruptedWaitingForAConsumer() throws Exception
public void testFailConsumerButNotClusterIfComputationFails() throws Exception {
Burrow burrow = mock(Burrow.class);
Burrow.ClusterClient client = mockClusterState("cluster1", "group1",
partitionState("topic1", 1, 10, 10L));
partitionState("topic1", 0, 10, 10L),
partitionState("topic1", 1, 10, 10L)
);
when(burrow.getClusters()).thenReturn(newArrayList(client));

withExecutor(executor -> {
KafkaConsumer consumer = mock(KafkaConsumer.class);
when(consumer.poll(Mockito.any(Duration.class))).thenThrow(new RuntimeException("injected"));
// first consumer lookup fails, the second one success
when(consumer.poll(Mockito.any(Duration.class)))
.thenThrow(new RuntimeException("injected"))
.thenReturn(records("topic", 1, 10, 10L));

ConsumerFreshness freshness = new ConsumerFreshness();
freshness.setupForTesting(burrow, workers("cluster1", consumer), executor);
Expand Down Expand Up @@ -219,7 +225,7 @@ public void testBurrowFailingToReadConsumerGroupStatusMarksGroupError() throws E
freshness.run();
assertEquals(1.0, freshness.getMetricsForTesting().error.labels("cluster", "group").get(), 0.0);
// failing all the groups status lookup should not fail the cluster. Feels weird, but it's the current behavior
assertSuccessfulClusterMeasurement(freshness, "cluster");
assertNoSuccessfulClusterMeasurement(freshness, "cluster");
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -240,8 +246,8 @@ public void testBurrowMissingConsumerGroupPartitionsMarksErrorForGroup() throws
when(client.getConsumerGroupStatus("group")).thenReturn(new HashMap<>());
freshness.run();
assertEquals(1.0, freshness.getMetricsForTesting().error.labels("cluster", "group").get(), 0.0);
// the cluster is overall successful, even though the group fails
assertSuccessfulClusterMeasurement(freshness, "cluster");
// no consumer group was successful, cluster is not successful
assertNoSuccessfulClusterMeasurement(freshness, "cluster");
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -267,8 +273,8 @@ public void testBurrowMissingConsumerGroupPartitionEndOffsetMarksMissing() throw
)));
freshness.run();
assertEquals(1.0, freshness.getMetricsForTesting().missing.get(), 0.0);
// the cluster is overall successful, even though the group fails
assertSuccessfulClusterMeasurement(freshness, "cluster");
// no consumer group was successful, cluster is not successful
assertNoSuccessfulClusterMeasurement(freshness, "cluster");
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -420,7 +426,7 @@ public void testFailToSubmitTaskExitsTracker() throws Exception {
Burrow.ClusterClient client = mockClusterState("cluster", "group", partitionState("t", 1, 1, 0));
when(burrow.getClusters()).thenReturn(newArrayList(client));
Exception cause = new RejectedExecutionException("injected");
when(executor.submit(any(FreshnessTracker.class))).thenThrow(cause);
when(executor.submit(any(Callable.class))).thenThrow(cause);

thrown.expect(RuntimeException.class);
thrown.expectCause(org.hamcrest.CoreMatchers.equalTo(cause));
Expand Down

0 comments on commit 70afff1

Please sign in to comment.