Skip to content

Commit

Permalink
Fix bug where s3 scan could skip when lastModifiedTimestamps are the …
Browse files Browse the repository at this point in the history
…same (opensearch-project#4124)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 14, 2024
1 parent 503b774 commit a8024e9
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
2 changes: 1 addition & 1 deletion data-prepper-plugins/dynamodb-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

This source ingests data to Data Prepper from DynamoDB

See the [`dynamodb` source documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/)
See the [`dynamodb` source documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/)
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private boolean isLastModifiedTimeAfterMostRecentScanForBucket(final String buck

final Instant lastProcessedObjectTimestamp = Instant.parse((String) globalStateMap.get(bucketName));

return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp) > 0;
return s3Object.lastModified().compareTo(lastProcessedObjectTimestamp.minusSeconds(1)) >= 0;
}

private Instant getMostRecentLastModifiedTimestamp(final ListObjectsV2Response listObjectsV2Response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio

final S3Object invalidForFirstBucketSuffixObject = mock(S3Object.class);
given(invalidForFirstBucketSuffixObject.key()).willReturn("test.invalid");
given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now());
given(invalidForFirstBucketSuffixObject.lastModified()).willReturn(Instant.now().minusSeconds(2));
s3ObjectsList.add(invalidForFirstBucketSuffixObject);
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + invalidForFirstBucketSuffixObject.key()).build());

Expand All @@ -223,7 +223,9 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio

final List<PartitionIdentifier> expectedPartitionIdentifiersSecondScan = new ArrayList<>();
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + secondScanObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(firstBucket + "|" + validObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + secondScanObject.key()).build());
expectedPartitionIdentifiersSecondScan.add(PartitionIdentifier.builder().withPartitionKey(secondBucket + "|" + validObject.key()).build());

final List<S3Object> secondScanObjects = new ArrayList<>(s3ObjectsList);
secondScanObjects.add(secondScanObject);
Expand Down

0 comments on commit a8024e9

Please sign in to comment.