Skip to content

Commit

Permalink
dont bubble up offset polling errors (#206)
Browse files Browse the repository at this point in the history
dont bubble up offset polling errors. If we do, then the poller makes no more
attempts to poll. Instead, just log the error and the poller will retry on the
next iteration. If its still failing after 3 iterations, it reinitializes the
admin client.

Co-authored-by: Rohan Desai <[email protected]>
  • Loading branch information
rodesai and rodesai authored Dec 19, 2023
1 parent 5acacc9 commit 3a07f18
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
*/
public class EndOffsetsPoller {
private static final Logger LOG = LoggerFactory.getLogger(EndOffsetsPoller.class);
private static final int ADMIN_RECREATE_THRESHOLD = 3;

private final Map<String, Listener> threadIdToMetrics = new HashMap<>();
private final ResponsiveMetrics metrics;
Expand Down Expand Up @@ -148,7 +149,9 @@ private void stopPoller() {
}

private static class Poller {
private final Admin adminClient;
private Admin adminClient;
private int failuresWithoutReinit = 0;
private final Supplier<Admin> adminSupplier;
private final ScheduledFuture<?> future;
private final ScheduledExecutorService executor;
private final Supplier<Collection<Listener>> threadMetricsSupplier;
Expand All @@ -158,20 +161,40 @@ private Poller(
final ScheduledExecutorService executor,
final Supplier<Collection<Listener>> threadMetricsSupplier
) {
this.adminClient = adminClientSupplier.get();
this.adminSupplier = adminClientSupplier;
this.executor = executor;
this.threadMetricsSupplier = threadMetricsSupplier;
init();
this.future = executor.scheduleAtFixedRate(this::pollEndOffsets, 0, 30, TimeUnit.SECONDS);
}

private void stop() {
future.cancel(true);
executor.schedule(() -> adminClient.close(Duration.ofNanos(0)), 0, TimeUnit.SECONDS);
executor.schedule(this::close, 0, TimeUnit.SECONDS);
}

private void pollEndOffsets() {
private void init() {
adminClient = adminSupplier.get();
failuresWithoutReinit = 0;
}

private void close() {
adminClient.close(Duration.ofNanos(0));
}

private void maybeReinit() {
if (failuresWithoutReinit >= ADMIN_RECREATE_THRESHOLD) {
LOG.info("reinitializing admin client");
close();
init();
}
}

private void doPollEndOffsets() {
LOG.info("Polling end offsets");

maybeReinit();

final var partitions = new HashMap<TopicPartition, OffsetSpec>();
final Collection<Listener> threadMetrics = threadMetricsSupplier.get();
for (final var tm : threadMetrics) {
Expand All @@ -188,8 +211,19 @@ private void pollEndOffsets() {
}
threadMetrics.forEach(tm -> tm.update(endOffsets));

failuresWithoutReinit = 0;

LOG.info("Finished updating end offsets");
}

private void pollEndOffsets() {
try {
doPollEndOffsets();
} catch (final RuntimeException e) {
LOG.error("error polling end offsets. will retry at next poll interval", e);
failuresWithoutReinit += 1;
}
}
}

public static class Listener implements ResponsiveConsumer.Listener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,40 @@ public void shouldRemoveEndOffsetMetricForThread() {
verify(metrics).removeMetric(metricName(PARTITION1));
}

@Test
public void shouldNotBubbleErrorsUpToExecutor() {
// given:
when(adminClient.listOffsets(anyMap())).thenThrow(new RuntimeException("oops"));
final var callback = endOffsetsPoller.addForThread(THREAD_ID);
callback.onPartitionsAssigned(List.of(PARTITION1, PARTITION2));
verify(executor).scheduleAtFixedRate(taskCaptor.capture(), eq(0L), anyLong(), any());
final var task = taskCaptor.getValue();

// when/then:
task.run();
task.run();
}

@Test
public void shouldRecreateAdminClientAfterRepeatedFailures() {
// given:
when(adminClient.listOffsets(anyMap())).thenThrow(new RuntimeException("oops"));
final var callback = endOffsetsPoller.addForThread(THREAD_ID);
callback.onPartitionsAssigned(List.of(PARTITION1, PARTITION2));
verify(executor).scheduleAtFixedRate(taskCaptor.capture(), eq(0L), anyLong(), any());
final var task = taskCaptor.getValue();

// when:
task.run();
task.run();
task.run();
task.run();

// then:
verify(factories, times(2)).createAdminClient(anyMap());
}


@Test
public void shouldPollAllEndOffsetsForThread() {
// given:
Expand Down

0 comments on commit 3a07f18

Please sign in to comment.