Skip to content

Commit

Permalink
Fix no acknowledgments for closed shard issue (opensearch-project#3651)
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Nov 14, 2023
1 parent 8922a0f commit 3322c8a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private void compareAndCreateChildrenPartitions(List<EnhancedSourcePartition> so
sourcePartitions.forEach(sourcePartition -> {
StreamPartition streamPartition = (StreamPartition) sourcePartition;
List<String> childShardIds = shardManager.findChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId());
if (!childShardIds.isEmpty()) {
if (childShardIds != null && !childShardIds.isEmpty()) {
childShardIds.forEach(
shardId -> {
if (!completedShardIds.contains(shardId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,13 @@ public ShardConsumer build() {
public void run() {
LOG.debug("Shard Consumer start to run...");
// Check should skip processing or not.
if (shouldSkip()) return;
if (shouldSkip()) {
if (acknowledgementSet != null) {
checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout);
acknowledgementSet.complete();
}
return;
}

long lastCheckpointTime = System.currentTimeMillis();
String sequenceNumber = "";
Expand Down

0 comments on commit 3322c8a

Please sign in to comment.