From ca3d6acb077d1305d0214a07b9843814ca5190b2 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 27 Oct 2023 10:37:39 -0500 Subject: [PATCH] Catch exceptions and backoff and retry ddb source threads instead of shutting down on exception (#3554) Signed-off-by: Taylor Gray --- .../dynamodb/converter/RecordConverter.java | 2 +- .../dynamodb/export/DataFileScheduler.java | 39 +++++++++------ .../dynamodb/export/ExportScheduler.java | 49 +++++++++++-------- .../dynamodb/stream/StreamScheduler.java | 37 +++++++++----- .../converter/ExportRecordConverterTest.java | 2 +- .../converter/StreamRecordConverterTest.java | 2 +- .../export/DataFileSchedulerTest.java | 15 ++++++ .../dynamodb/export/ExportSchedulerTest.java | 47 ++++++++++++------ .../dynamodb/stream/StreamSchedulerTest.java | 17 ++++++- 9 files changed, 144 insertions(+), 66 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index d5c5a059a8..4d5693db64 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -86,7 +86,7 @@ public void addToBuffer(Map data, Map keys, long String sortKey = getAttributeValue(keys, tableInfo.getMetadata().getSortKeyAttributeName()); if (sortKey != null) { eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, sortKey); - eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey + "_" + sortKey); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey + "|" + sortKey); } else { eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, partitionKey); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 8c1979e320..91432f3dad 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -88,23 +88,32 @@ public void run() { LOG.debug("Start running Data File Scheduler"); while (!Thread.currentThread().isInterrupted()) { - if (numOfWorkers.get() < MAX_JOB_COUNT) { - final Optional sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); - - if (sourcePartition.isPresent()) { - activeExportS3ObjectConsumersGauge.incrementAndGet(); - DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get(); - processDataFilePartition(dataFilePartition); - activeExportS3ObjectConsumersGauge.decrementAndGet(); - } - } try { - Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); - } catch (final InterruptedException e) { - LOG.info("InterruptedException occurred"); - break; + if (numOfWorkers.get() < MAX_JOB_COUNT) { + final Optional sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); + + if (sourcePartition.isPresent()) { + activeExportS3ObjectConsumersGauge.incrementAndGet(); + DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get(); + processDataFilePartition(dataFilePartition); + activeExportS3ObjectConsumersGauge.decrementAndGet(); + } + } + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception while processing an S3 data file, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing"); + break; + } } - } LOG.warn("Data file scheduler is interrupted, Stop all data file loaders..."); // Cannot call executor.shutdownNow() here diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 93f9e5b51a..9ea0d0dde9 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -94,32 +94,41 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna public void run() { LOG.debug("Start running Export Scheduler"); while (!Thread.currentThread().isInterrupted()) { - // Does not have limit on max leases - // As most of the time it's just to wait - final Optional sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); + try { + // Does not have limit on max leases + // As most of the time it's just to wait + final Optional sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); - if (sourcePartition.isPresent()) { + if (sourcePartition.isPresent()) { - ExportPartition exportPartition = (ExportPartition) sourcePartition.get(); - LOG.debug("Acquired an export partition: " + exportPartition.getPartitionKey()); + ExportPartition exportPartition = (ExportPartition) sourcePartition.get(); + LOG.debug("Acquired an export partition: " + exportPartition.getPartitionKey()); - String exportArn = getOrCreateExportArn(exportPartition); + String exportArn = getOrCreateExportArn(exportPartition); - if (exportArn == null) { - closeExportPartitionWithError(exportPartition); - } else { - CompletableFuture checkStatus = CompletableFuture.supplyAsync(() -> checkExportStatus(exportPartition), executor); - checkStatus.whenComplete(completeExport(exportPartition)); - } + if (exportArn == null) { + closeExportPartitionWithError(exportPartition); + } else { + CompletableFuture checkStatus = CompletableFuture.supplyAsync(() -> checkExportStatus(exportPartition), executor); + checkStatus.whenComplete(completeExport(exportPartition)); + } + } + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception during export from DynamoDB to S3, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing"); + break; + } } - try { - Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); - } catch (final InterruptedException e) { - LOG.info("InterruptedException occurred"); - break; - } - } LOG.warn("Export scheduler interrupted, looks like shutdown has triggered"); executor.shutdownNow(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index 19e4dd28e0..cd4d43baaf 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -83,21 +83,32 @@ private void processStreamPartition(StreamPartition streamPartition) { public void run() { LOG.debug("Stream Scheduler start to run..."); while (!Thread.currentThread().isInterrupted()) { - if (numOfWorkers.get() < MAX_JOB_COUNT) { - final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); - if (sourcePartition.isPresent()) { - activeChangeEventConsumers.incrementAndGet(); - StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); - processStreamPartition(streamPartition); - activeChangeEventConsumers.decrementAndGet(); + try { + + if (numOfWorkers.get() < MAX_JOB_COUNT) { + final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + activeChangeEventConsumers.incrementAndGet(); + StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); + processStreamPartition(streamPartition); + activeChangeEventConsumers.decrementAndGet(); + } } - } - try { - Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); - } catch (final InterruptedException e) { - LOG.info("InterruptedException occurred"); - break; + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("InterruptedException occurred"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception while trying to get an S3 data file to process, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); + break; + } } } // Should Stop diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index e79d231cb3..a31a642b8e 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -137,7 +137,7 @@ void test_writeSingleRecordToBuffer() throws Exception { assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(pk)); assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sk)); - assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(pk + "_" + sk)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(pk + "|" + sk)); assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString())); assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), notNullValue()); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index 23ac32f6df..cd02172da2 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -133,7 +133,7 @@ void test_writeSingleRecordToBuffer() throws Exception { String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); - assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "_" + sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey)); assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.CREATE.toString())); assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java index dd7562341b..66378ee406 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java @@ -151,5 +151,20 @@ public void test_run_DataFileLoader_correctly() throws InterruptedException { } + @Test + void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { + given(coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).willThrow(RuntimeException.class); + + scheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(100); + assertThat(future.isDone(), equalTo(false)); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java index 2a1506643f..0a74f2c94f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java @@ -33,7 +33,11 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; @@ -43,10 +47,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_RECORDS_TOTAL_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT; @ExtendWith(MockitoExtension.class) @@ -99,19 +103,6 @@ class ExportSchedulerTest { @BeforeEach void setup() { - when(exportPartition.getTableArn()).thenReturn(tableArn); - when(exportPartition.getExportTime()).thenReturn(exportTime); - - ExportProgressState state = new ExportProgressState(); - state.setBucket(bucketName); - state.setPrefix(prefix); - when(exportPartition.getProgressState()).thenReturn(Optional.of(state)); - - given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess); - given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors); - given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal); - given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal); - ExportSummary summary = mock(ExportSummary.class); lenient().when(manifestFileReader.parseSummaryFile(anyString(), anyString())).thenReturn(summary); lenient().when(summary.getS3Bucket()).thenReturn(bucketName); @@ -127,6 +118,19 @@ void setup() { @Test public void test_run_exportJob_correctly() throws InterruptedException { + when(exportPartition.getTableArn()).thenReturn(tableArn); + when(exportPartition.getExportTime()).thenReturn(exportTime); + + ExportProgressState state = new ExportProgressState(); + state.setBucket(bucketName); + state.setPrefix(prefix); + when(exportPartition.getProgressState()).thenReturn(Optional.of(state)); + + given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess); + given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors); + given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal); + given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal); + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)).willReturn(Optional.empty()); // Set up mock behavior @@ -169,4 +173,19 @@ public void test_run_exportJob_correctly() throws InterruptedException { } + @Test + void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willThrow(RuntimeException.class); + + scheduler = new ExportScheduler(coordinator, dynamoDBClient, manifestFileReader, pluginMetrics); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(100); + assertThat(future.isDone(), equalTo(false)); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index c041323a0d..586a3c188a 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -40,7 +40,6 @@ import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler.ACTIVE_CHANGE_EVENT_CONSUMERS; @ExtendWith(MockitoExtension.class) -@Disabled class StreamSchedulerTest { @Mock @@ -104,6 +103,7 @@ void setup() { @Test + @Disabled public void test_normal_run() throws InterruptedException { given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); @@ -128,4 +128,19 @@ public void test_normal_run() throws InterruptedException { executorService.shutdownNow(); } + + @Test + void run_catches_exception_and_retries_when_exception_is_thrown_during_processing() throws InterruptedException { + given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willThrow(RuntimeException.class); + + scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + final Future future = executorService.submit(() -> scheduler.run()); + Thread.sleep(100); + assertThat(future.isDone(), equalTo(false)); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1000, TimeUnit.MILLISECONDS), equalTo(true)); + } } \ No newline at end of file