From 3322c8a453d4f072963fe4c519e1cca0ca4fa36c Mon Sep 17 00:00:00 2001 From: Aiden Dai <68811299+daixba@users.noreply.github.com> Date: Tue, 14 Nov 2023 23:51:10 +0800 Subject: [PATCH] Fix no acknowledgments for closed shard issue (#3651) Signed-off-by: Aiden Dai --- .../plugins/source/dynamodb/leader/LeaderScheduler.java | 2 +- .../plugins/source/dynamodb/stream/ShardConsumer.java | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java index 2a4bbfd68b..9a9b17ffa6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java @@ -201,7 +201,7 @@ private void compareAndCreateChildrenPartitions(List so sourcePartitions.forEach(sourcePartition -> { StreamPartition streamPartition = (StreamPartition) sourcePartition; List childShardIds = shardManager.findChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId()); - if (!childShardIds.isEmpty()) { + if (childShardIds != null && !childShardIds.isEmpty()) { childShardIds.forEach( shardId -> { if (!completedShardIds.contains(shardId)) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 99e2cb3211..87491c3896 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -198,7 +198,13 @@ public ShardConsumer build() { public void run() { LOG.debug("Shard Consumer start to run..."); // Check should skip processing or not. - if (shouldSkip()) return; + if (shouldSkip()) { + if (acknowledgementSet != null) { + checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); + acknowledgementSet.complete(); + } + return; + } long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = "";