diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java index 686ec4e2ef..7ada401383 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java @@ -7,10 +7,10 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.SourceCoordinationStore; -import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +118,7 @@ public Optional acquireAvailablePartition(String partit LOG.debug("Try to acquire an available {} partition", partitionType); Optional sourceItem = coordinationStore.tryAcquireAvailablePartition(this.sourceIdentifier + "|" + partitionType, hostName, DEFAULT_LEASE_TIMEOUT); if (sourceItem.isEmpty()) { - LOG.info("Partition owner {} failed to acquire a partition, no available {} partitions now", hostName, partitionType); + LOG.debug("Partition owner {} failed to acquire a partition, no available {} partitions now", hostName, partitionType); return Optional.empty(); } @@ -145,7 +145,7 @@ public void saveProgressStateForPartition(EnhancedSourcePartition partiti updateItem.setPartitionProgressState(partition.convertPartitionProgressStatetoString(partition.getProgressState())); coordinationStore.tryUpdateSourcePartitionItem(updateItem); - LOG.info("Progress for for partition {} (Type {}) was saved", partition.getPartitionKey(), partitionType); + LOG.debug("Progress for for partition {} (Type {}) was saved", partition.getPartitionKey(), partitionType); } @Override @@ -167,7 +167,7 @@ public void giveUpPartition(EnhancedSourcePartition partition) { // Throws UpdateException if update failed. coordinationStore.tryUpdateSourcePartitionItem(updateItem); - LOG.info("Partition key {} was given up by owner {}", partition.getPartitionKey(), hostName); + LOG.debug("Partition key {} was given up by owner {}", partition.getPartitionKey(), hostName); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 107032c373..9b5a172ef9 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -130,15 +130,15 @@ public void shutdown() { * Future optimization can be done to accept configuration changes */ public void init() { - LOG.info("Start initialize DynamoDB service"); + LOG.info("Try to initialize DynamoDB service"); final Optional initPartition = coordinator.acquireAvailablePartition(InitPartition.PARTITION_TYPE); if (initPartition.isEmpty()) { // Already initialized. Do nothing. + LOG.info("DynamoDB service is already initialized."); return; } - - LOG.info("Start validating table configurations"); + LOG.info("Start initialization process"); List tableInfos; try { tableInfos = tableConfigs.stream().map(this::getTableInfo).collect(Collectors.toList()); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java index 58337f871a..6d1d0aedd9 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java @@ -120,7 +120,7 @@ public DataFileLoader build() { @Override public void run() { - LOG.debug("Read export data from s3://" + bucketName + "/" + key + " with start line " + startLine); + LOG.info("Start loading s3://{}/{} with start line {}", bucketName, key, startLine); long lastCheckpointTime = System.currentTimeMillis(); List lines = new ArrayList<>(); @@ -171,12 +171,12 @@ public void run() { lines.clear(); reader.close(); - LOG.debug("Data Loader completed successfully"); + LOG.info("Complete loading s3://{}/{}", bucketName, key); } catch (Exception e) { checkpointer.checkpoint(lineCount); - throw new RuntimeException("Data Loader completed with Exception: " + e.getMessage()); + String errorMessage = String.format("Loading of s3://{}/{} completed with Exception: {}", bucketName, key, e.getMessage()); + throw new RuntimeException(errorMessage); } - } /** diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index be1d1fc175..3e5a74eb5f 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -31,8 +31,15 @@ public class DataFileScheduler implements Runnable { private final AtomicInteger numOfWorkers = new AtomicInteger(0); - private static final int MAX_JOB_COUNT = 2; - private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000; + /** + * Maximum concurrent data loader per node + */ + private static final int MAX_JOB_COUNT = 4; + + /** + * Default interval to acquire a lease from coordination store + */ + private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000; static final String EXPORT_FILE_SUCCESS_COUNT = "exportFileSuccess"; @@ -74,7 +81,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) { @Override public void run() { - LOG.debug("Start running Data file Scheduler"); + LOG.info("Start running Data File Scheduler"); while (!Thread.interrupted()) { if (numOfWorkers.get() < MAX_JOB_COUNT) { @@ -93,7 +100,7 @@ public void run() { } } - LOG.debug("Data file scheduler is interrupted, Stop all data file loaders..."); + LOG.warn("Data file scheduler is interrupted, Stop all data file loaders..."); // Cannot call executor.shutdownNow() here // Otherwise the final checkpoint will fail due to SDK interruption. executor.shutdown(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 6c9d9108c2..86fc52572f 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -92,7 +92,7 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna @Override public void run() { - LOG.debug("Start running Export Scheduler"); + LOG.info("Start running Export Scheduler"); while (!Thread.interrupted()) { // Does not have limit on max leases // As most of the time it's just to wait @@ -121,7 +121,7 @@ public void run() { } } - LOG.debug("Export scheduler interrupted, looks like shutdown has triggered"); + LOG.warn("Export scheduler interrupted, looks like shutdown has triggered"); executor.shutdownNow(); } @@ -173,7 +173,7 @@ private BiConsumer completeExport(ExportPartition exportParti private void createDataFilePartitions(String exportArn, String bucketName, Map dataFileInfo) { - LOG.debug("Totally {} data files generated for export {}", dataFileInfo.size(), exportArn); + LOG.info("Totally {} data files generated for export {}", dataFileInfo.size(), exportArn); AtomicInteger totalRecords = new AtomicInteger(); AtomicInteger totalFiles = new AtomicInteger(); dataFileInfo.forEach((key, size) -> { @@ -225,7 +225,7 @@ private String checkExportStatus(ExportPartition exportPartition) { String status = exportTaskManager.checkExportStatus(exportArn); if (!"IN_PROGRESS".equals(status)) { - LOG.debug("Export {} is completed with final status {}", exportArn, status); + LOG.info("Export {} is completed with final status {}", exportArn, status); return status; } LOG.debug("Export {} is still running in progress, sleep and recheck later", exportArn); @@ -243,19 +243,19 @@ private String getOrCreateExportArn(ExportPartition exportPartition) { ExportProgressState state = exportPartition.getProgressState().get(); // Check the progress state if (state.getExportArn() != null) { - LOG.debug("Export Job has already submitted for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime()); + LOG.info("Export Job has already submitted for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime()); // Export job already submitted return state.getExportArn(); } - LOG.debug("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime()); + LOG.info("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime()); // submit a new export request String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), exportPartition.getExportTime()); // Update state with export Arn in the coordination table. // So that it won't be submitted again after a restart. if (exportArn != null) { - LOG.debug("Export arn is " + exportArn); + LOG.info("Export arn is " + exportArn); state.setExportArn(exportArn); enhancedSourceCoordinator.saveProgressStateForPartition(exportPartition); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReader.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReader.java index 4e8468114b..00919f1a5d 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReader.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ManifestFileReader.java @@ -60,7 +60,7 @@ public ExportSummary parseSummaryFile(String bucket, String key) { } public Map parseDataFile(String bucket, String key) { - LOG.debug("Try to read the manifest data file"); + LOG.info("Try to read the manifest data file"); Map result = new HashMap<>(); InputStream object = objectReader.readFile(bucket, key); @@ -72,12 +72,12 @@ public Map parseDataFile(String bucket, String key) { // An example line as below: // {"itemCount":46331,"md5Checksum":"a0k21IY3eelgr2PuWJLjJw==","etag":"51f9f394903c5d682321c6211aae8b6a-1","dataFileS3Key":"test-table-export/AWSDynamoDB/01692350182719-6de2c037/data/fpgzwz7ome3s7a5gqn2mu3ogtq.json.gz"} Map map = MAPPER.readValue(line, MAP_TYPE_REFERENCE); - LOG.debug("Get a file {} with item count {}", map.get(DATA_FILE_S3_KEY), map.get(DATA_FILE_ITEM_COUNT_KEY)); + LOG.info("Export data file: {} with item count {}", map.get(DATA_FILE_S3_KEY), map.get(DATA_FILE_ITEM_COUNT_KEY)); result.put(map.get(DATA_FILE_S3_KEY), Integer.valueOf(map.get(DATA_FILE_ITEM_COUNT_KEY))); } } catch (IOException e) { - LOG.error("IO Exception due to " + e.getMessage()); + LOG.error("IO Exception due to {}", e.getMessage()); } return result; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index fd9855547b..ef47eb39ef 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -39,7 +39,7 @@ public class ShardConsumer implements Runnable { /** * Idle Time between GetRecords Reads */ - private static final int GET_RECORD_INTERVAL_MILLS = 200; + private static final int GET_RECORD_INTERVAL_MILLS = 300; /** * Default interval to check if export is completed. @@ -131,7 +131,7 @@ public ShardConsumer build() { @Override public void run() { - LOG.debug("Shard Consumer start to run..."); + LOG.info("Shard Consumer start to run..."); long lastCheckpointTime = System.currentTimeMillis(); String sequenceNumber = ""; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index c6e1fea6ce..1c35f882ad 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -52,7 +52,7 @@ public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordi public Runnable createConsumer(StreamPartition streamPartition) { - LOG.debug("Try to create a thread for shard " + streamPartition.getShardId()); + LOG.info("Try to start a Shard Consumer for " + streamPartition.getShardId()); // Check and get the current state. Optional progressState = streamPartition.getProgressState(); @@ -69,7 +69,8 @@ public Runnable createConsumer(StreamPartition streamPartition) { String shardIter = shardManager.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber); if (shardIter == null) { - LOG.error("Unable to get a shard iterator, looks like the shard has expired"); + LOG.info("Unable to get a shard iterator, looks like the shard has expired"); + LOG.error("Failed to start a Shard Consumer for " + streamPartition.getShardId()); return null; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java index c4383d9c5a..1e342178de 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardManager.java @@ -16,6 +16,7 @@ import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -26,32 +27,42 @@ public class ShardManager { private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class); + /** + * Max number of shards to return in the DescribeStream API call, maximum 100. + */ + private static final int MAX_SHARD_COUNT = 100; + private final DynamoDbStreamsClient streamsClient; public ShardManager(DynamoDbStreamsClient streamsClient) { this.streamsClient = streamsClient; } - private List listShards(String streamArn) { + LOG.info("Start getting all shards from {}", streamArn); + long startTime = System.currentTimeMillis(); // Get all the shard IDs from the stream. - List shards; + List shards = new ArrayList<>(); String lastEvaluatedShardId = null; do { DescribeStreamRequest req = DescribeStreamRequest.builder() .streamArn(streamArn) + .limit(MAX_SHARD_COUNT) .exclusiveStartShardId(lastEvaluatedShardId) .build(); DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req); - shards = describeStreamResult.streamDescription().shards(); + shards.addAll(describeStreamResult.streamDescription().shards()); // If LastEvaluatedShardId is set, // at least one more page of shard IDs to retrieve lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId(); + + } while (lastEvaluatedShardId != null); - LOG.debug("Stream {} has {} shards found", streamArn, shards.size()); + long endTime = System.currentTimeMillis(); + LOG.info("Stream {} has {} shards found, took {} milliseconds", streamArn, shards.size(), endTime - startTime); return shards; } @@ -151,7 +162,7 @@ public List getRootShardIds(String streamArn) { .filter(shard -> shard.parentShardId() == null || !childIds.contains(shard.parentShardId())) .map(shard -> shard.shardId()) .collect(Collectors.toList()); - + LOG.info("Found {} root shards for {}", rootIds.size(), streamArn); return rootIds; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 0e9f2ef910..170d2c0f49 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -26,8 +26,20 @@ public class StreamScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class); - private static final int MAX_JOB_COUNT = 50; - private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000; + /** + * Max number of shards each node can handle in parallel + */ + private static final int MAX_JOB_COUNT = 250; + + /** + * Default interval to acquire a lease from coordination store + */ + private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000; + + /** + * Add a delay of getting child shards when the parent finished. + */ + private static final int DELAY_TO_GET_CHILD_SHARDS_MILLIS = 1_500; private final AtomicInteger numOfWorkers = new AtomicInteger(0); private final EnhancedSourceCoordinator coordinator; @@ -59,7 +71,7 @@ private void processStreamPartition(StreamPartition streamPartition) { @Override public void run() { - LOG.debug("Stream Scheduler start to run..."); + LOG.info("Start running Stream Scheduler"); while (!Thread.interrupted()) { if (numOfWorkers.get() < MAX_JOB_COUNT) { final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); @@ -77,7 +89,7 @@ public void run() { } } // Should Stop - LOG.debug("Stream Scheduler is interrupted, looks like shutdown has triggered"); + LOG.warn("Stream Scheduler is interrupted, looks like shutdown has triggered"); // Cannot call executor.shutdownNow() here // Otherwise the final checkpoint will fail due to SDK interruption. @@ -89,14 +101,20 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) { return (v, ex) -> { numOfWorkers.decrementAndGet(); if (ex == null) { - LOG.debug("Shard consumer is completed"); + LOG.info("Shard consumer for {} is completed", streamPartition.getShardId()); LOG.debug("Start creating new stream partitions for Child Shards"); + try { + // Add a delay as the Child shards may not be ready yet. + Thread.sleep(DELAY_TO_GET_CHILD_SHARDS_MILLIS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } List childShardIds = shardManager.getChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId()); - LOG.debug("Child Ids Retrieved: {}", childShardIds); + LOG.info("{} child shards for {} have been found", childShardIds.size(), streamPartition.getShardId()); createStreamPartitions(streamPartition.getStreamArn(), childShardIds); - LOG.debug("Create child shard completed"); + LOG.info("Creation of all child shards partitions is completed"); // Finally mask the partition as completed. coordinator.completePartition(streamPartition); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index fc25226f78..535ef994fc 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -130,7 +130,7 @@ void test_run_loadFile_correctly() throws InterruptedException { loader.run(); // Run for a while - Thread.sleep(500); + Thread.sleep(1000); // Should call s3 getObject verify(s3Client).getObject(any(GetObjectRequest.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 60ce8dd254..875fce4a85 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -94,7 +94,7 @@ public void test_normal_run() throws InterruptedException { executor.submit(scheduler); // Need to run a while - Thread.sleep(500); + Thread.sleep(2000); // Should acquire the stream partition verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE); // Should start a new consumer