Skip to content

Commit

Permalink
Add more verbose logging for the DynamoDb source (opensearch-project#…
Browse files Browse the repository at this point in the history
…3500)

Add more verbose logging to the DynamoDB source

Signed-off-by: Aiden Dai <[email protected]>
  • Loading branch information
daixba authored Oct 16, 2023
1 parent ccbe50c commit 7605763
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.SourceCoordinationStore;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -118,7 +118,7 @@ public Optional<EnhancedSourcePartition> acquireAvailablePartition(String partit
LOG.debug("Try to acquire an available {} partition", partitionType);
Optional<SourcePartitionStoreItem> sourceItem = coordinationStore.tryAcquireAvailablePartition(this.sourceIdentifier + "|" + partitionType, hostName, DEFAULT_LEASE_TIMEOUT);
if (sourceItem.isEmpty()) {
LOG.info("Partition owner {} failed to acquire a partition, no available {} partitions now", hostName, partitionType);
LOG.debug("Partition owner {} failed to acquire a partition, no available {} partitions now", hostName, partitionType);
return Optional.empty();
}

Expand All @@ -145,7 +145,7 @@ public <T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partiti
updateItem.setPartitionProgressState(partition.convertPartitionProgressStatetoString(partition.getProgressState()));

coordinationStore.tryUpdateSourcePartitionItem(updateItem);
LOG.info("Progress for for partition {} (Type {}) was saved", partition.getPartitionKey(), partitionType);
LOG.debug("Progress for for partition {} (Type {}) was saved", partition.getPartitionKey(), partitionType);
}

@Override
Expand All @@ -167,7 +167,7 @@ public <T> void giveUpPartition(EnhancedSourcePartition<T> partition) {

// Throws UpdateException if update failed.
coordinationStore.tryUpdateSourcePartitionItem(updateItem);
LOG.info("Partition key {} was given up by owner {}", partition.getPartitionKey(), hostName);
LOG.debug("Partition key {} was given up by owner {}", partition.getPartitionKey(), hostName);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ public void shutdown() {
* Future optimization can be done to accept configuration changes
*/
public void init() {
LOG.info("Start initialize DynamoDB service");
LOG.info("Try to initialize DynamoDB service");

final Optional<EnhancedSourcePartition> initPartition = coordinator.acquireAvailablePartition(InitPartition.PARTITION_TYPE);
if (initPartition.isEmpty()) {
// Already initialized. Do nothing.
LOG.info("DynamoDB service is already initialized.");
return;
}

LOG.info("Start validating table configurations");
LOG.info("Start initialization process");
List<TableInfo> tableInfos;
try {
tableInfos = tableConfigs.stream().map(this::getTableInfo).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public DataFileLoader build() {

@Override
public void run() {
LOG.debug("Read export data from s3://" + bucketName + "/" + key + " with start line " + startLine);
LOG.info("Start loading s3://{}/{} with start line {}", bucketName, key, startLine);
long lastCheckpointTime = System.currentTimeMillis();
List<String> lines = new ArrayList<>();

Expand Down Expand Up @@ -171,12 +171,12 @@ public void run() {

lines.clear();
reader.close();
LOG.debug("Data Loader completed successfully");
LOG.info("Complete loading s3://{}/{}", bucketName, key);
} catch (Exception e) {
checkpointer.checkpoint(lineCount);
throw new RuntimeException("Data Loader completed with Exception: " + e.getMessage());
String errorMessage = String.format("Loading of s3://{}/{} completed with Exception: {}", bucketName, key, e.getMessage());
throw new RuntimeException(errorMessage);
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ public class DataFileScheduler implements Runnable {

private final AtomicInteger numOfWorkers = new AtomicInteger(0);

private static final int MAX_JOB_COUNT = 2;
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000;
/**
* Maximum concurrent data loader per node
*/
private static final int MAX_JOB_COUNT = 4;

/**
* Default interval to acquire a lease from coordination store
*/
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000;

static final String EXPORT_FILE_SUCCESS_COUNT = "exportFileSuccess";

Expand Down Expand Up @@ -74,7 +81,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {

@Override
public void run() {
LOG.debug("Start running Data file Scheduler");
LOG.info("Start running Data File Scheduler");

while (!Thread.interrupted()) {
if (numOfWorkers.get() < MAX_JOB_COUNT) {
Expand All @@ -93,7 +100,7 @@ public void run() {
}

}
LOG.debug("Data file scheduler is interrupted, Stop all data file loaders...");
LOG.warn("Data file scheduler is interrupted, Stop all data file loaders...");
// Cannot call executor.shutdownNow() here
// Otherwise the final checkpoint will fail due to SDK interruption.
executor.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna

@Override
public void run() {
LOG.debug("Start running Export Scheduler");
LOG.info("Start running Export Scheduler");
while (!Thread.interrupted()) {
// Does not have limit on max leases
// As most of the time it's just to wait
Expand Down Expand Up @@ -121,7 +121,7 @@ public void run() {
}

}
LOG.debug("Export scheduler interrupted, looks like shutdown has triggered");
LOG.warn("Export scheduler interrupted, looks like shutdown has triggered");
executor.shutdownNow();

}
Expand Down Expand Up @@ -173,7 +173,7 @@ private BiConsumer<String, Throwable> completeExport(ExportPartition exportParti


private void createDataFilePartitions(String exportArn, String bucketName, Map<String, Integer> dataFileInfo) {
LOG.debug("Totally {} data files generated for export {}", dataFileInfo.size(), exportArn);
LOG.info("Totally {} data files generated for export {}", dataFileInfo.size(), exportArn);
AtomicInteger totalRecords = new AtomicInteger();
AtomicInteger totalFiles = new AtomicInteger();
dataFileInfo.forEach((key, size) -> {
Expand Down Expand Up @@ -225,7 +225,7 @@ private String checkExportStatus(ExportPartition exportPartition) {

String status = exportTaskManager.checkExportStatus(exportArn);
if (!"IN_PROGRESS".equals(status)) {
LOG.debug("Export {} is completed with final status {}", exportArn, status);
LOG.info("Export {} is completed with final status {}", exportArn, status);
return status;
}
LOG.debug("Export {} is still running in progress, sleep and recheck later", exportArn);
Expand All @@ -243,19 +243,19 @@ private String getOrCreateExportArn(ExportPartition exportPartition) {
ExportProgressState state = exportPartition.getProgressState().get();
// Check the progress state
if (state.getExportArn() != null) {
LOG.debug("Export Job has already submitted for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
LOG.info("Export Job has already submitted for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
// Export job already submitted
return state.getExportArn();
}

LOG.debug("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
LOG.info("Try to submit a new export job for table {} with export time {}", exportPartition.getTableArn(), exportPartition.getExportTime());
// submit a new export request
String exportArn = exportTaskManager.submitExportJob(exportPartition.getTableArn(), state.getBucket(), state.getPrefix(), exportPartition.getExportTime());

// Update state with export Arn in the coordination table.
// So that it won't be submitted again after a restart.
if (exportArn != null) {
LOG.debug("Export arn is " + exportArn);
LOG.info("Export arn is " + exportArn);
state.setExportArn(exportArn);
enhancedSourceCoordinator.saveProgressStateForPartition(exportPartition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public ExportSummary parseSummaryFile(String bucket, String key) {
}

public Map<String, Integer> parseDataFile(String bucket, String key) {
LOG.debug("Try to read the manifest data file");
LOG.info("Try to read the manifest data file");

Map<String, Integer> result = new HashMap<>();
InputStream object = objectReader.readFile(bucket, key);
Expand All @@ -72,12 +72,12 @@ public Map<String, Integer> parseDataFile(String bucket, String key) {
// An example line as below:
// {"itemCount":46331,"md5Checksum":"a0k21IY3eelgr2PuWJLjJw==","etag":"51f9f394903c5d682321c6211aae8b6a-1","dataFileS3Key":"test-table-export/AWSDynamoDB/01692350182719-6de2c037/data/fpgzwz7ome3s7a5gqn2mu3ogtq.json.gz"}
Map<String, String> map = MAPPER.readValue(line, MAP_TYPE_REFERENCE);
LOG.debug("Get a file {} with item count {}", map.get(DATA_FILE_S3_KEY), map.get(DATA_FILE_ITEM_COUNT_KEY));
LOG.info("Export data file: {} with item count {}", map.get(DATA_FILE_S3_KEY), map.get(DATA_FILE_ITEM_COUNT_KEY));
result.put(map.get(DATA_FILE_S3_KEY), Integer.valueOf(map.get(DATA_FILE_ITEM_COUNT_KEY)));

}
} catch (IOException e) {
LOG.error("IO Exception due to " + e.getMessage());
LOG.error("IO Exception due to {}", e.getMessage());
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ShardConsumer implements Runnable {
/**
* Idle Time between GetRecords Reads
*/
private static final int GET_RECORD_INTERVAL_MILLS = 200;
private static final int GET_RECORD_INTERVAL_MILLS = 300;

/**
* Default interval to check if export is completed.
Expand Down Expand Up @@ -131,7 +131,7 @@ public ShardConsumer build() {

@Override
public void run() {
LOG.debug("Shard Consumer start to run...");
LOG.info("Shard Consumer start to run...");

long lastCheckpointTime = System.currentTimeMillis();
String sequenceNumber = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordi

public Runnable createConsumer(StreamPartition streamPartition) {

LOG.debug("Try to create a thread for shard " + streamPartition.getShardId());
LOG.info("Try to start a Shard Consumer for " + streamPartition.getShardId());

// Check and get the current state.
Optional<StreamProgressState> progressState = streamPartition.getProgressState();
Expand All @@ -69,7 +69,8 @@ public Runnable createConsumer(StreamPartition streamPartition) {

String shardIter = shardManager.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber);
if (shardIter == null) {
LOG.error("Unable to get a shard iterator, looks like the shard has expired");
LOG.info("Unable to get a shard iterator, looks like the shard has expired");
LOG.error("Failed to start a Shard Consumer for " + streamPartition.getShardId());
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -26,32 +27,42 @@ public class ShardManager {

private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);

/**
* Max number of shards to return in the DescribeStream API call, maximum 100.
*/
private static final int MAX_SHARD_COUNT = 100;

private final DynamoDbStreamsClient streamsClient;

public ShardManager(DynamoDbStreamsClient streamsClient) {
this.streamsClient = streamsClient;
}


private List<Shard> listShards(String streamArn) {
LOG.info("Start getting all shards from {}", streamArn);
long startTime = System.currentTimeMillis();
// Get all the shard IDs from the stream.
List<Shard> shards;
List<Shard> shards = new ArrayList<>();
String lastEvaluatedShardId = null;
do {
DescribeStreamRequest req = DescribeStreamRequest.builder()
.streamArn(streamArn)
.limit(MAX_SHARD_COUNT)
.exclusiveStartShardId(lastEvaluatedShardId)
.build();

DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req);
shards = describeStreamResult.streamDescription().shards();
shards.addAll(describeStreamResult.streamDescription().shards());

// If LastEvaluatedShardId is set,
// at least one more page of shard IDs to retrieve
lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId();


} while (lastEvaluatedShardId != null);

LOG.debug("Stream {} has {} shards found", streamArn, shards.size());
long endTime = System.currentTimeMillis();
LOG.info("Stream {} has {} shards found, took {} milliseconds", streamArn, shards.size(), endTime - startTime);
return shards;
}

Expand Down Expand Up @@ -151,7 +162,7 @@ public List<String> getRootShardIds(String streamArn) {
.filter(shard -> shard.parentShardId() == null || !childIds.contains(shard.parentShardId()))
.map(shard -> shard.shardId())
.collect(Collectors.toList());

LOG.info("Found {} root shards for {}", rootIds.size(), streamArn);
return rootIds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,20 @@
public class StreamScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StreamScheduler.class);

private static final int MAX_JOB_COUNT = 50;
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000;
/**
* Max number of shards each node can handle in parallel
*/
private static final int MAX_JOB_COUNT = 250;

/**
* Default interval to acquire a lease from coordination store
*/
private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 15_000;

/**
* Add a delay of getting child shards when the parent finished.
*/
private static final int DELAY_TO_GET_CHILD_SHARDS_MILLIS = 1_500;

private final AtomicInteger numOfWorkers = new AtomicInteger(0);
private final EnhancedSourceCoordinator coordinator;
Expand Down Expand Up @@ -59,7 +71,7 @@ private void processStreamPartition(StreamPartition streamPartition) {

@Override
public void run() {
LOG.debug("Stream Scheduler start to run...");
LOG.info("Start running Stream Scheduler");
while (!Thread.interrupted()) {
if (numOfWorkers.get() < MAX_JOB_COUNT) {
final Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE);
Expand All @@ -77,7 +89,7 @@ public void run() {
}
}
// Should Stop
LOG.debug("Stream Scheduler is interrupted, looks like shutdown has triggered");
LOG.warn("Stream Scheduler is interrupted, looks like shutdown has triggered");

// Cannot call executor.shutdownNow() here
// Otherwise the final checkpoint will fail due to SDK interruption.
Expand All @@ -89,14 +101,20 @@ private BiConsumer completeConsumer(StreamPartition streamPartition) {
return (v, ex) -> {
numOfWorkers.decrementAndGet();
if (ex == null) {
LOG.debug("Shard consumer is completed");
LOG.info("Shard consumer for {} is completed", streamPartition.getShardId());
LOG.debug("Start creating new stream partitions for Child Shards");

try {
// Add a delay as the Child shards may not be ready yet.
Thread.sleep(DELAY_TO_GET_CHILD_SHARDS_MILLIS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
List<String> childShardIds = shardManager.getChildShardIds(streamPartition.getStreamArn(), streamPartition.getShardId());
LOG.debug("Child Ids Retrieved: {}", childShardIds);
LOG.info("{} child shards for {} have been found", childShardIds.size(), streamPartition.getShardId());

createStreamPartitions(streamPartition.getStreamArn(), childShardIds);
LOG.debug("Create child shard completed");
LOG.info("Creation of all child shards partitions is completed");
// Finally mask the partition as completed.
coordinator.completePartition(streamPartition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void test_run_loadFile_correctly() throws InterruptedException {

loader.run();
// Run for a while
Thread.sleep(500);
Thread.sleep(1000);

// Should call s3 getObject
verify(s3Client).getObject(any(GetObjectRequest.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void test_normal_run() throws InterruptedException {
executor.submit(scheduler);

// Need to run a while
Thread.sleep(500);
Thread.sleep(2000);
// Should acquire the stream partition
verify(coordinator).acquireAvailablePartition(StreamPartition.PARTITION_TYPE);
// Should start a new consumer
Expand Down

0 comments on commit 7605763

Please sign in to comment.