Skip to content

Commit

Permalink
Change s3 scan and opensearch to only save state every 5 minutes, fix… (
Browse files Browse the repository at this point in the history
opensearch-project#3581)

Change s3 scan and opensearch to only save state every 5 minutes, fix bug where any action was valid in OpenSearch sink
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 5, 2023
1 parent 377671c commit fa69169
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public static boolean isValidFormatExpressions(final String format, final Expres
if (Objects.isNull(expressionEvaluator)) {
return false;
}

int fromIndex = 0;
int position = 0;
while ((position = format.indexOf("${", fromIndex)) != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
Expand All @@ -18,8 +18,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
Expand All @@ -41,14 +40,12 @@
import java.util.zip.GZIPOutputStream;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader.BUFFER_TIMEOUT;
Expand All @@ -57,9 +54,6 @@
@ExtendWith(MockitoExtension.class)
class DataFileLoaderTest {

@Mock
private EnhancedSourceCoordinator coordinator;

@Mock
private S3Client s3Client;

Expand All @@ -73,12 +67,13 @@ class DataFileLoaderTest {
private BufferAccumulator<Record<Event>> bufferAccumulator;

@Mock
private Counter testCounter;
private ExportRecordConverter exportRecordConverter;

@Mock
private DataFileCheckpointer checkpointer;

private S3ObjectReader objectReader;

private DataFileCheckpointer checkpointer;
private S3ObjectReader objectReader;

private DataFilePartition dataFilePartition;

Expand All @@ -93,7 +88,6 @@ class DataFileLoaderTest {

private final String manifestKey = UUID.randomUUID().toString();
private final String bucketName = UUID.randomUUID().toString();
private final String prefix = UUID.randomUUID().toString();

private final String exportArn = tableArn + "/export/01693291918297-bfeccbea";

Expand All @@ -102,7 +96,7 @@ class DataFileLoaderTest {
private final int total = random.nextInt(10);

@BeforeEach
void setup() throws Exception {
void setup() {

DataFileProgressState state = new DataFileProgressState();
state.setLoaded(0);
Expand All @@ -120,18 +114,6 @@ void setup() throws Exception {

when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(generateGzipInputStream(total));
objectReader = new S3ObjectReader(s3Client);

lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true);
lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class));
lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class), eq(null));
lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class));

doNothing().when(bufferAccumulator).add(any(Record.class));
doNothing().when(bufferAccumulator).flush();
checkpointer = new DataFileCheckpointer(coordinator, dataFilePartition);

given(pluginMetrics.counter(anyString())).willReturn(testCounter);

}

private ResponseInputStream<GetObjectResponse> generateGzipInputStream(int numberOfRecords) {
Expand Down Expand Up @@ -166,10 +148,13 @@ private ResponseInputStream<GetObjectResponse> generateGzipInputStream(int numbe
}

@Test
void test_run_loadFile_correctly() throws Exception {
void test_run_loadFile_correctly() {
DataFileLoader loader;
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<ExportRecordConverter> recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> {
exportRecordConverter = mock;
})) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(bucketName)
Expand All @@ -184,23 +169,24 @@ void test_run_loadFile_correctly() throws Exception {
// Should call s3 getObject
verify(s3Client).getObject(any(GetObjectRequest.class));

// Should write to buffer
verify(bufferAccumulator, times(total)).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(exportRecordConverter).writeToBuffer(eq(null), anyList());

// Should do one last checkpoint when done.
verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null));
verify(checkpointer).checkpoint(total);
verify(checkpointer, never()).updateDatafileForAcknowledgmentWait(any(Duration.class));
}

@Test
void run_loadFile_with_acknowledgments_processes_correctly() throws Exception {
void run_loadFile_with_acknowledgments_processes_correctly() {

final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class);
final Duration acknowledgmentTimeout = Duration.ofSeconds(30);

DataFileLoader loader;
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<ExportRecordConverter> recordConverterMockedConstruction = mockConstruction(ExportRecordConverter.class, (mock, context) -> {
exportRecordConverter = mock;
})) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
loader = DataFileLoader.builder(objectReader, pluginMetrics, buffer)
.bucketName(bucketName)
Expand All @@ -217,12 +203,11 @@ void run_loadFile_with_acknowledgments_processes_correctly() throws Exception {
// Should call s3 getObject
verify(s3Client).getObject(any(GetObjectRequest.class));

// Should write to buffer
verify(bufferAccumulator, times(total)).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(exportRecordConverter).writeToBuffer(eq(acknowledgementSet), anyList());

verify(checkpointer).checkpoint(total);
verify(checkpointer).updateDatafileForAcknowledgmentWait(acknowledgmentTimeout);

// Should do one last checkpoint when done.
verify(coordinator).saveProgressStateForPartition(any(DataFilePartition.class), eq(null));

verify(acknowledgementSet).complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
Expand Down Expand Up @@ -128,6 +129,8 @@ public void run() {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
long lastCheckpointTime = System.currentTimeMillis();

LOG.info("Started processing for index: '{}'", indexName);

Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();
Expand Down Expand Up @@ -165,7 +168,12 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
});

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
lastCheckpointTime = System.currentTimeMillis();
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
Expand Down Expand Up @@ -150,6 +151,7 @@ public void run() {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
long lastCheckpointTime = System.currentTimeMillis();

LOG.info("Starting processing for index: '{}'", indexName);
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();
Expand Down Expand Up @@ -203,7 +205,12 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis());
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
lastCheckpointTime = System.currentTimeMillis();
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.BACKOFF_ON_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.DEFAULT_CHECKPOINT_INTERVAL_MILLS;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
Expand Down Expand Up @@ -145,6 +146,8 @@ public void run() {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
long lastCheckpointTime = System.currentTimeMillis();

LOG.info("Started processing for index: '{}'", indexName);

final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize();
Expand All @@ -168,7 +171,12 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
.build());

writeDocumentsToBuffer(searchScrollResponse.getDocuments(), acknowledgementSet);
sourceCoordinator.saveProgressStateForPartition(indexName, null);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, null);
lastCheckpointTime = System.currentTimeMillis();
}
} catch (final Exception e) {
deleteScroll(createScrollResponse.getScrollId());
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class WorkerCommonUtils {

static final Duration BACKOFF_ON_EXCEPTION = Duration.ofSeconds(60);

static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
static final long DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000;
static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(1);
static final Duration STARTING_BACKOFF = Duration.ofMillis(500);
static final Duration MAX_BACKOFF = Duration.ofSeconds(60);
static final int BACKOFF_RATE = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha
assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true));

verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<NoSearchContextSearchRequest> noSearchContextSearchRequests = searchRequestArgumentCaptor.getAllValues();
assertThat(noSearchContextSearchRequests.size(), equalTo(2));
Expand Down Expand Up @@ -283,7 +283,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true));

verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<NoSearchContextSearchRequest> noSearchContextSearchRequests = searchRequestArgumentCaptor.getAllValues();
assertThat(noSearchContextSearchRequests.size(), equalTo(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_
assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE));

verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<SearchPointInTimeRequest> searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues();
assertThat(searchPointInTimeRequestList.size(), equalTo(2));
Expand Down Expand Up @@ -292,7 +292,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa
assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE));

verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class));

final List<SearchPointInTimeRequest> searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues();
assertThat(searchPointInTimeRequestList.size(), equalTo(2));
Expand Down Expand Up @@ -378,7 +378,7 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create

verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class));
verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState));
verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class));

verify(documentsProcessedCounter, times(3)).increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro
assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH));

verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null));

final List<SearchScrollRequest> searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues();
assertThat(searchScrollRequests.size(), equalTo(2));
Expand Down Expand Up @@ -286,7 +286,7 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a
assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH));

verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class));
verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(null));
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null));

final List<SearchScrollRequest> searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues();
assertThat(searchScrollRequests.size(), equalTo(2));
Expand Down
Loading

0 comments on commit fa69169

Please sign in to comment.