Skip to content

Commit

Permalink
Catch exceptions and backoff and retry ddb source threads instead of …
Browse files Browse the repository at this point in the history
…shutting down on exception (opensearch-project#3554)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Oct 27, 2023
1 parent 267d3bc commit ca3d6ac
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void addToBuffer(Map<String, Object> data, Map<String, Object> keys, long
String sortKey = getAttributeValue(keys, tableInfo.getMetadata().getSortKeyAttributeName());
if (sortKey != null) {
eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, sortKey);
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey + "_" + sortKey);
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey + "|" + sortKey);
} else {
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,32 @@ public void run() {
LOG.debug("Start running Data File Scheduler");

while (!Thread.currentThread().isInterrupted()) {
if (numOfWorkers.get() < MAX_JOB_COUNT) {
final Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE);

if (sourcePartition.isPresent()) {
activeExportS3ObjectConsumersGauge.incrementAndGet();
DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get();
processDataFilePartition(dataFilePartition);
activeExportS3ObjectConsumersGauge.decrementAndGet();
}
}
try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("InterruptedException occurred");
break;
if (numOfWorkers.get() < MAX_JOB_COUNT) {
final Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE);

if (sourcePartition.isPresent()) {
activeExportS3ObjectConsumersGauge.incrementAndGet();
DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get();
processDataFilePartition(dataFilePartition);
activeExportS3ObjectConsumersGauge.decrementAndGet();
}
}
try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing");
break;
}
} catch (final Exception e) {
LOG.error("Received an exception while processing an S3 data file, backing off and retrying", e);
try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException ex) {
LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing");
break;
}
}

}
LOG.warn("Data file scheduler is interrupted, Stop all data file loaders...");
// Cannot call executor.shutdownNow() here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,32 +94,41 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna
public void run() {
LOG.debug("Start running Export Scheduler");
while (!Thread.currentThread().isInterrupted()) {
// Does not have limit on max leases
// As most of the time it's just to wait
final Optional<EnhancedSourcePartition> sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE);
try {
// Does not have limit on max leases
// As most of the time it's just to wait
final Optional<EnhancedSourcePartition> sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE);

if (sourcePartition.isPresent()) {
if (sourcePartition.isPresent()) {

ExportPartition exportPartition = (ExportPartition) sourcePartition.get();
LOG.debug("Acquired an export partition: " + exportPartition.getPartitionKey());
ExportPartition exportPartition = (ExportPartition) sourcePartition.get();
LOG.debug("Acquired an export partition: " + exportPartition.getPartitionKey());

String exportArn = getOrCreateExportArn(exportPartition);
String exportArn = getOrCreateExportArn(exportPartition);

if (exportArn == null) {
closeExportPartitionWithError(exportPartition);
} else {
CompletableFuture<String> checkStatus = CompletableFuture.supplyAsync(() -> checkExportStatus(exportPartition), executor);
checkStatus.whenComplete(completeExport(exportPartition));
}
if (exportArn == null) {
closeExportPartitionWithError(exportPartition);
} else {
CompletableFuture<String> checkStatus = CompletableFuture.supplyAsync(() -> checkExportStatus(exportPartition), executor);
checkStatus.whenComplete(completeExport(exportPartition));
}

}
try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
break;
}
} catch (final Exception e) {
LOG.error("Received an exception during export from DynamoDB to S3, backing off and retrying", e);
try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException ex) {
LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
break;
}
}
try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("InterruptedException occurred");
break;
}

}
LOG.warn("Export scheduler interrupted, looks like shutdown has triggered");
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,32 @@ private void processStreamPartition(StreamPartition streamPartition) {
public void run() {
LOG.debug("Stream Scheduler start to run...");
while (!Thread.currentThread().isInterrupted()) {
if (numOfWorkers.get() < MAX_JOB_COUNT) {
final Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE);
if (sourcePartition.isPresent()) {
activeChangeEventConsumers.incrementAndGet();
StreamPartition streamPartition = (StreamPartition) sourcePartition.get();
processStreamPartition(streamPartition);
activeChangeEventConsumers.decrementAndGet();
try {

if (numOfWorkers.get() < MAX_JOB_COUNT) {
final Optional<EnhancedSourcePartition> sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE);
if (sourcePartition.isPresent()) {
activeChangeEventConsumers.incrementAndGet();
StreamPartition streamPartition = (StreamPartition) sourcePartition.get();
processStreamPartition(streamPartition);
activeChangeEventConsumers.decrementAndGet();
}
}
}

try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("InterruptedException occurred");
break;
try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("InterruptedException occurred");
break;
}
} catch (final Exception e) {
LOG.error("Received an exception while trying to get an S3 data file to process, backing off and retrying", e);
try {
Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException ex) {
LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing");
break;
}
}
}
// Should Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void test_writeSingleRecordToBuffer() throws Exception {

assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(pk));
assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sk));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(pk + "_" + sk));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(pk + "|" + sk));
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void test_writeSingleRecordToBuffer() throws Exception {
String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s();
assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey));
assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "_" + sortKey));
assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey));
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.CREATE.toString()));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,20 @@ public void test_run_DataFileLoader_correctly() throws InterruptedException {

}

@Test
void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException {
given(coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).willThrow(RuntimeException.class);

scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics);

ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> future = executorService.submit(() -> scheduler.run());
Thread.sleep(100);
assertThat(future.isDone(), equalTo(false));
executorService.shutdown();
future.cancel(true);
assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
Expand All @@ -43,10 +47,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_RECORDS_TOTAL_COUNT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT;


@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -99,19 +103,6 @@ class ExportSchedulerTest {
@BeforeEach
void setup() {

when(exportPartition.getTableArn()).thenReturn(tableArn);
when(exportPartition.getExportTime()).thenReturn(exportTime);

ExportProgressState state = new ExportProgressState();
state.setBucket(bucketName);
state.setPrefix(prefix);
when(exportPartition.getProgressState()).thenReturn(Optional.of(state));

given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess);
given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors);
given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal);
given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal);

ExportSummary summary = mock(ExportSummary.class);
lenient().when(manifestFileReader.parseSummaryFile(anyString(), anyString())).thenReturn(summary);
lenient().when(summary.getS3Bucket()).thenReturn(bucketName);
Expand All @@ -127,6 +118,19 @@ void setup() {

@Test
public void test_run_exportJob_correctly() throws InterruptedException {
when(exportPartition.getTableArn()).thenReturn(tableArn);
when(exportPartition.getExportTime()).thenReturn(exportTime);

ExportProgressState state = new ExportProgressState();
state.setBucket(bucketName);
state.setPrefix(prefix);
when(exportPartition.getProgressState()).thenReturn(Optional.of(state));

given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess);
given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors);
given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal);
given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal);

given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)).willReturn(Optional.empty());

// Set up mock behavior
Expand Down Expand Up @@ -169,4 +173,19 @@ public void test_run_exportJob_correctly() throws InterruptedException {

}

@Test
void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException {
given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willThrow(RuntimeException.class);

scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics);

ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> future = executorService.submit(() -> scheduler.run());
Thread.sleep(100);
assertThat(future.isDone(), equalTo(false));
executorService.shutdown();
future.cancel(true);
assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.ACTIVE_CHANGE_EVENT_CONSUMERS;

@ExtendWith(MockitoExtension.class)
@Disabled
class StreamSchedulerTest {

@Mock
Expand Down Expand Up @@ -104,6 +103,7 @@ void setup() {


@Test
@Disabled
public void test_normal_run() throws InterruptedException {
given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty());

Expand All @@ -128,4 +128,19 @@ public void test_normal_run() throws InterruptedException {

executorService.shutdownNow();
}

@Test
void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException {
given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willThrow(RuntimeException.class);

scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics);

ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> future = executorService.submit(() -> scheduler.run());
Thread.sleep(100);
assertThat(future.isDone(), equalTo(false));
executorService.shutdown();
future.cancel(true);
assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true));
}
}

0 comments on commit ca3d6ac

Please sign in to comment.