Skip to content

Commit

Permalink
Add metric for shards actively being processed, lower ownership timeo… (
Browse files Browse the repository at this point in the history
opensearch-project#3629)

Add metric for shards actively being processed, lower ownership timeout from 10 minutes to 5 minutes for ddb source

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 10, 2023
1 parent 9f0c92a commit 95dd099
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
public class DataFileCheckpointer {
private static final Logger LOG = LoggerFactory.getLogger(DataFileCheckpointer.class);

static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5);


private final EnhancedSourceCoordinator enhancedSourceCoordinator;

Expand Down Expand Up @@ -48,7 +50,7 @@ private void setProgressState(int lineNumber) {
public void checkpoint(int lineNumber) {
LOG.debug("Checkpoint data file " + dataFilePartition.getKey() + " with line number " + lineNumber);
setProgressState(lineNumber);
enhancedSourceCoordinator.saveProgressStateForPartition(dataFilePartition, null);
enhancedSourceCoordinator.saveProgressStateForPartition(dataFilePartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
public class StreamCheckpointer {
private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class);

static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5);

private final EnhancedSourceCoordinator coordinator;

private final StreamPartition streamPartition;
Expand Down Expand Up @@ -52,7 +54,7 @@ private void setSequenceNumber(String sequenceNumber) {
public void checkpoint(String sequenceNumber) {
LOG.debug("Checkpoint shard " + streamPartition.getShardId() + " with sequenceNumber " + sequenceNumber);
setSequenceNumber(sequenceNumber);
coordinator.saveProgressStateForPartition(streamPartition, null);
coordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class StreamScheduler implements Runnable {
private static final int DELAY_TO_GET_CHILD_SHARDS_MILLIS = 1_500;

static final String ACTIVE_CHANGE_EVENT_CONSUMERS = "activeChangeEventConsumers";
static final String SHARDS_IN_PROCESSING = "activeShardsInProcessing";

private final AtomicInteger numOfWorkers = new AtomicInteger(0);
private final EnhancedSourceCoordinator coordinator;
Expand All @@ -55,6 +56,7 @@ public class StreamScheduler implements Runnable {
private final ShardManager shardManager;
private final PluginMetrics pluginMetrics;
private final AtomicLong activeChangeEventConsumers;
private final AtomicLong shardsInProcessing;
private final AcknowledgementSetManager acknowledgementSetManager;
private final DynamoDBSourceConfig dynamoDBSourceConfig;

Expand All @@ -74,6 +76,7 @@ public StreamScheduler(final EnhancedSourceCoordinator coordinator,

executor = Executors.newFixedThreadPool(MAX_JOB_COUNT);
activeChangeEventConsumers = pluginMetrics.gauge(ACTIVE_CHANGE_EVENT_CONSUMERS, new AtomicLong());
shardsInProcessing = pluginMetrics.gauge(SHARDS_IN_PROCESSING, new AtomicLong());
}

private void processStreamPartition(StreamPartition streamPartition) {
Expand All @@ -98,11 +101,15 @@ private void processStreamPartition(StreamPartition streamPartition) {
CompletableFuture runConsumer = CompletableFuture.runAsync(shardConsumer, executor);

if (acknowledgmentsEnabled) {
runConsumer.whenComplete((v, ex) -> numOfWorkers.decrementAndGet());
runConsumer.whenComplete((v, ex) -> {
numOfWorkers.decrementAndGet();
shardsInProcessing.decrementAndGet();
});
} else {
runConsumer.whenComplete(completeConsumer(streamPartition));
}
numOfWorkers.incrementAndGet();
shardsInProcessing.incrementAndGet();
} else {
// If failed to create a new consumer.
coordinator.completePartition(streamPartition);
Expand Down Expand Up @@ -153,6 +160,7 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) {
return (v, ex) -> {
if (!dynamoDBSourceConfig.isAcknowledgmentsEnabled()) {
numOfWorkers.decrementAndGet();
shardsInProcessing.decrementAndGet();
}
if (ex == null) {
LOG.info("Shard consumer for {} is completed", streamPartition.getShardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE;

@ExtendWith(MockitoExtension.class)
class ShardConsumerTest {
Expand Down Expand Up @@ -169,7 +170,7 @@ void test_run_shardConsumer_correctly() throws Exception {
verify(bufferAccumulator, times(total)).add(any(org.opensearch.dataprepper.model.record.Record.class));
verify(bufferAccumulator).flush();
// Should complete the consumer as reach to end of shard
verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(null));
verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE));
}

@Test
Expand Down Expand Up @@ -203,7 +204,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception {
verify(bufferAccumulator).flush();

// Should complete the consumer as reach to end of shard
verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(null));
verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE));

verify(acknowledgementSet).complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.ACTIVE_CHANGE_EVENT_CONSUMERS;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.SHARDS_IN_PROCESSING;

@ExtendWith(MockitoExtension.class)
class StreamSchedulerTest {
Expand Down Expand Up @@ -79,6 +80,9 @@ class StreamSchedulerTest {
@Mock
private AtomicLong activeShardConsumers;

@Mock
private AtomicLong activeShardsInProcessing;


private final String tableName = UUID.randomUUID().toString();
private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName;
Expand Down Expand Up @@ -108,6 +112,7 @@ void setup() {
lenient().when(shardManager.getChildShardIds(anyString(), anyString())).thenReturn(List.of(shardId));

when(pluginMetrics.gauge(eq(ACTIVE_CHANGE_EVENT_CONSUMERS), any(AtomicLong.class))).thenReturn(activeShardConsumers);
when(pluginMetrics.gauge(eq(SHARDS_IN_PROCESSING), any(AtomicLong.class))).thenReturn(activeShardsInProcessing);

}

Expand Down Expand Up @@ -135,6 +140,9 @@ public void test_normal_run() throws InterruptedException {
// Should mask the stream partition as completed.
verify(coordinator).completePartition(any(StreamPartition.class));

verify(activeShardsInProcessing).incrementAndGet();
verify(activeShardsInProcessing).decrementAndGet();

executorService.shutdownNow();
}

Expand Down Expand Up @@ -174,6 +182,9 @@ public void test_normal_run_with_acknowledgments() throws InterruptedException {
// Should mask the stream partition as completed.
verify(coordinator).completePartition(any(StreamPartition.class));

verify(activeShardsInProcessing).incrementAndGet();
verify(activeShardsInProcessing).decrementAndGet();

executorService.shutdownNow();
}

Expand Down

0 comments on commit 95dd099

Please sign in to comment.