diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java index 2a4bbfd68b..9a9b17ffa6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java @@ -201,7 +201,7 @@ private void compareAndCreateChildrenPartitions(List so sourcePartitions.forEach(sourcePartition -> { StreamPartition streamPartition = (StreamPartition) sourcePartition; List childShardIds = shardManager.findChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId()); - if (!childShardIds.isEmpty()) { + if (childShardIds != null && !childShardIds.isEmpty()) { childShardIds.forEach( shardId -> { if (!completedShardIds.contains(shardId)) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 99e2cb3211..87491c3896 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -198,7 +198,13 @@ public ShardConsumer build() { public void run() { LOG.debug("Shard Consumer start to run..."); // Check should skip processing or not. - if (shouldSkip()) return; + if (shouldSkip()) { + if (acknowledgementSet != null) { + checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); + acknowledgementSet.complete(); + } + return; + } long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = "";