From 731de123b297998cfd158f37be0034ddc43ea237 Mon Sep 17 00:00:00 2001 From: Hai Yan <8153134+oeyh@users.noreply.github.com> Date: Mon, 15 Jul 2024 09:57:24 -0500 Subject: [PATCH] Load exported S3 files in RDS source (#4718) * Add s3 file loader Signed-off-by: Hai Yan * Make checkExportStatus a callable Signed-off-by: Hai Yan * Fix unit tests Signed-off-by: Hai Yan * Add load status and record converter Signed-off-by: Hai Yan * Update unit tests Signed-off-by: Hai Yan * Restore changes for test Signed-off-by: Hai Yan * Address review comments Signed-off-by: Hai Yan --------- Signed-off-by: Hai Yan --- data-prepper-plugins/rds-source/build.gradle | 2 + .../plugins/source/rds/ClientFactory.java | 8 + .../plugins/source/rds/RdsService.java | 29 +++- .../plugins/source/rds/RdsSource.java | 6 +- .../rds/converter/ExportRecordConverter.java | 36 ++++ .../rds/converter/MetadataKeyAttributes.java | 20 +++ .../rds/coordination/PartitionFactory.java | 5 +- .../partition/DataFilePartition.java | 77 +++++++++ .../state/DataFileProgressState.java | 44 +++++ .../source/rds/export/DataFileLoader.java | 83 +++++++++ .../source/rds/export/DataFileScheduler.java | 163 ++++++++++++++++++ .../source/rds/export/ExportScheduler.java | 130 +++++++++++--- .../source/rds/export/S3ObjectReader.java | 36 ++++ .../source/rds/model/ExportObjectKey.java | 68 ++++++++ .../plugins/source/rds/model/LoadStatus.java | 53 ++++++ .../plugins/source/rds/RdsServiceTest.java | 11 +- .../plugins/source/rds/RdsSourceTest.java | 6 +- .../converter/ExportRecordConverterTest.java | 51 ++++++ .../source/rds/export/DataFileLoaderTest.java | 67 +++++++ .../rds/export/DataFileSchedulerTest.java | 137 +++++++++++++++ .../rds/export/ExportSchedulerTest.java | 40 ++++- .../source/rds/export/S3ObjectReaderTest.java | 56 ++++++ .../source/rds/model/ExportObjectKeyTest.java | 37 ++++ 23 files changed, 1133 insertions(+), 32 deletions(-) create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/DataFilePartition.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReader.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/LoadStatus.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReaderTest.java create mode 100644 data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java diff --git a/data-prepper-plugins/rds-source/build.gradle b/data-prepper-plugins/rds-source/build.gradle index 580a312be0..f83b1332eb 100644 --- a/data-prepper-plugins/rds-source/build.gradle +++ b/data-prepper-plugins/rds-source/build.gradle @@ -8,6 +8,7 @@ dependencies { implementation project(path: ':data-prepper-plugins:buffer-common') implementation project(path: ':data-prepper-plugins:http-common') implementation project(path: ':data-prepper-plugins:common') + implementation project(path: ':data-prepper-plugins:parquet-codecs') implementation 'io.micrometer:micrometer-core' @@ -22,4 +23,5 @@ dependencies { testImplementation project(path: ':data-prepper-test-common') testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation project(path: ':data-prepper-test-event') } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java index 9cdb2bfa50..7831754f0f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/ClientFactory.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.s3.S3Client; public class ClientFactory { private final AwsCredentialsProvider awsCredentialsProvider; @@ -32,4 +33,11 @@ public RdsClient buildRdsClient() { .credentialsProvider(awsCredentialsProvider) .build(); } + + public S3Client buildS3Client() { + return S3Client.builder() + .region(awsAuthenticationConfig.getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .build(); + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index f059dd52bf..77956e6b0e 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -8,13 +8,16 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler; import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler; import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.s3.S3Client; import java.util.ArrayList; import java.util.List; @@ -24,23 +27,34 @@ public class RdsService { private static final Logger LOG = LoggerFactory.getLogger(RdsService.class); + /** + * Maximum concurrent data loader per node + */ + public static final int DATA_LOADER_MAX_JOB_COUNT = 1; + private final RdsClient rdsClient; + private final S3Client s3Client; private final EnhancedSourceCoordinator sourceCoordinator; + private final EventFactory eventFactory; private final PluginMetrics pluginMetrics; private final RdsSourceConfig sourceConfig; private ExecutorService executor; private LeaderScheduler leaderScheduler; private ExportScheduler exportScheduler; + private DataFileScheduler dataFileScheduler; public RdsService(final EnhancedSourceCoordinator sourceCoordinator, final RdsSourceConfig sourceConfig, + final EventFactory eventFactory, final ClientFactory clientFactory, final PluginMetrics pluginMetrics) { this.sourceCoordinator = sourceCoordinator; + this.eventFactory = eventFactory; this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; rdsClient = clientFactory.buildRdsClient(); + s3Client = clientFactory.buildS3Client(); } /** @@ -54,9 +68,15 @@ public void start(Buffer> buffer) { LOG.info("Start running RDS service"); final List runnableList = new ArrayList<>(); leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig); - exportScheduler = new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics); runnableList.add(leaderScheduler); - runnableList.add(exportScheduler); + + if (sourceConfig.isExportEnabled()) { + exportScheduler = new ExportScheduler(sourceCoordinator, rdsClient, s3Client, pluginMetrics); + dataFileScheduler = new DataFileScheduler( + sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer); + runnableList.add(exportScheduler); + runnableList.add(dataFileScheduler); + } executor = Executors.newFixedThreadPool(runnableList.size()); runnableList.forEach(executor::submit); @@ -69,7 +89,10 @@ public void start(Buffer> buffer) { public void shutdown() { if (executor != null) { LOG.info("shutdown RDS schedulers"); - exportScheduler.shutdown(); + if (sourceConfig.isExportEnabled()) { + exportScheduler.shutdown(); + dataFileScheduler.shutdown(); + } leaderScheduler.shutdown(); executor.shutdownNow(); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java index a9fe983572..43806c0475 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsSource.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; @@ -33,15 +34,18 @@ public class RdsSource implements Source>, UsesEnhancedSourceCoord private final ClientFactory clientFactory; private final PluginMetrics pluginMetrics; private final RdsSourceConfig sourceConfig; + private final EventFactory eventFactory; private EnhancedSourceCoordinator sourceCoordinator; private RdsService rdsService; @DataPrepperPluginConstructor public RdsSource(final PluginMetrics pluginMetrics, final RdsSourceConfig sourceConfig, + final EventFactory eventFactory, final AwsCredentialsSupplier awsCredentialsSupplier) { this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; + this.eventFactory = eventFactory; clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig()); } @@ -51,7 +55,7 @@ public void start(Buffer> buffer) { Objects.requireNonNull(sourceCoordinator); sourceCoordinator.createPartition(new LeaderPartition()); - rdsService = new RdsService(sourceCoordinator, sourceConfig, clientFactory, pluginMetrics); + rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics); LOG.info("Start RDS service"); rdsService.start(buffer); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java new file mode 100644 index 0000000000..11932cd512 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverter.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; + +public class ExportRecordConverter { + + private static final Logger LOG = LoggerFactory.getLogger(ExportRecordConverter.class); + + static final String EXPORT_EVENT_TYPE = "EXPORT"; + + public Event convert(Record record, String tableName, String primaryKeyName) { + Event event = record.getData(); + + EventMetadata eventMetadata = event.getMetadata(); + eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName); + eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, EXPORT_EVENT_TYPE); + + final Object primaryKeyValue = record.getData().get(primaryKeyName, Object.class); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue); + + return event; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java new file mode 100644 index 0000000000..91eecdf07b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/MetadataKeyAttributes.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +public class MetadataKeyAttributes { + static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key"; + + static final String EVENT_VERSION_FROM_TIMESTAMP = "document_version"; + + static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "event_timestamp"; + + static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action"; + + static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name"; + + static final String INGESTION_EVENT_TYPE_ATTRIBUTE = "ingestion_type"; +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java index db35f5076b..6213263b09 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/PartitionFactory.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition; @@ -25,8 +26,10 @@ public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { return new LeaderPartition(partitionStoreItem); - } if (ExportPartition.PARTITION_TYPE.equals(partitionType)) { + } else if (ExportPartition.PARTITION_TYPE.equals(partitionType)) { return new ExportPartition(partitionStoreItem); + } else if (DataFilePartition.PARTITION_TYPE.equals(partitionType)) { + return new DataFilePartition(partitionStoreItem); } else { // Unable to acquire other partitions. return new GlobalState(partitionStoreItem); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/DataFilePartition.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/DataFilePartition.java new file mode 100644 index 0000000000..985f48b652 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/partition/DataFilePartition.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; + +import java.util.Optional; + +/** + * An DataFilePartition represents an export data file needs to be loaded. + * The source identifier contains keyword 'DATAFILE' + */ +public class DataFilePartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "DATAFILE"; + + private final String exportTaskId; + private final String bucket; + private final String key; + private final DataFileProgressState state; + + public DataFilePartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + + setSourcePartitionStoreItem(sourcePartitionStoreItem); + String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|"); + exportTaskId = keySplits[0]; + bucket = keySplits[1]; + key = keySplits[2]; + state = convertStringToPartitionProgressState(DataFileProgressState.class, sourcePartitionStoreItem.getPartitionProgressState()); + + } + + public DataFilePartition(final String exportTaskId, + final String bucket, + final String key, + final Optional state) { + this.exportTaskId = exportTaskId; + this.bucket = bucket; + this.key = key; + this.state = state.orElse(null); + } + + @Override + public String getPartitionType() { + return PARTITION_TYPE; + } + + @Override + public String getPartitionKey() { + return exportTaskId + "|" + bucket + "|" + key; + } + + @Override + public Optional getProgressState() { + if (state != null) { + return Optional.of(state); + } + return Optional.empty(); + } + + public String getExportTaskId() { + return exportTaskId; + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java new file mode 100644 index 0000000000..c65c0bbe01 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/DataFileProgressState.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DataFileProgressState { + + @JsonProperty("isLoaded") + private boolean isLoaded = false; + + @JsonProperty("totalRecords") + private int totalRecords; + + @JsonProperty("sourceTable") + private String sourceTable; + + public int getTotalRecords() { + return totalRecords; + } + + public void setTotalRecords(int totalRecords) { + this.totalRecords = totalRecords; + } + + public boolean getLoaded() { + return isLoaded; + } + + public void setLoaded(boolean loaded) { + this.isLoaded = loaded; + } + + public String getSourceTable() { + return sourceTable; + } + + public void setSourceTable(String sourceTable) { + this.sourceTable = sourceTable; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java new file mode 100644 index 0000000000..e76a04e99d --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; + +public class DataFileLoader implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(DataFileLoader.class); + + private final DataFilePartition dataFilePartition; + private final String bucket; + private final String objectKey; + private final S3ObjectReader objectReader; + private final InputCodec codec; + private final BufferAccumulator> bufferAccumulator; + private final ExportRecordConverter recordConverter; + + private DataFileLoader(final DataFilePartition dataFilePartition, + final InputCodec codec, + final BufferAccumulator> bufferAccumulator, + final S3ObjectReader objectReader, + final ExportRecordConverter recordConverter) { + this.dataFilePartition = dataFilePartition; + bucket = dataFilePartition.getBucket(); + objectKey = dataFilePartition.getKey(); + this.objectReader = objectReader; + this.codec = codec; + this.bufferAccumulator = bufferAccumulator; + this.recordConverter = recordConverter; + } + + public static DataFileLoader create(final DataFilePartition dataFilePartition, + final InputCodec codec, + final BufferAccumulator> bufferAccumulator, + final S3ObjectReader objectReader, + final ExportRecordConverter recordConverter) { + return new DataFileLoader(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter); + } + + @Override + public void run() { + LOG.info("Start loading s3://{}/{}", bucket, objectKey); + + try (InputStream inputStream = objectReader.readFile(bucket, objectKey)) { + + codec.parse(inputStream, record -> { + try { + final String tableName = dataFilePartition.getProgressState().get().getSourceTable(); + // TODO: primary key to be obtained by querying database schema + final String primaryKeyName = "id"; + Record transformedRecord = new Record<>(recordConverter.convert(record, tableName, primaryKeyName)); + bufferAccumulator.add(transformedRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + LOG.info("Completed loading object s3://{}/{} to buffer", bucket, objectKey); + } catch (Exception e) { + LOG.error("Failed to load object s3://{}/{} to buffer", bucket, objectKey, e); + throw new RuntimeException(e); + } + + try { + bufferAccumulator.flush(); + } catch (Exception e) { + LOG.error("Failed to write events to buffer", e); + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java new file mode 100644 index 0000000000..d465d55076 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileScheduler.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.codec.parquet.ParquetInputCodec; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.dataprepper.plugins.source.rds.RdsService.DATA_LOADER_MAX_JOB_COUNT; + +public class DataFileScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(DataFileScheduler.class); + + private final AtomicInteger numOfWorkers = new AtomicInteger(0); + + /** + * Default interval to acquire a lease from coordination store + */ + private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2_000; + + private static final Duration DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT = Duration.ofMinutes(30); + + static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); + static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + + + private final EnhancedSourceCoordinator sourceCoordinator; + private final ExecutorService executor; + private final RdsSourceConfig sourceConfig; + private final S3ObjectReader objectReader; + private final InputCodec codec; + private final BufferAccumulator> bufferAccumulator; + private final ExportRecordConverter recordConverter; + + private volatile boolean shutdownRequested = false; + + public DataFileScheduler(final EnhancedSourceCoordinator sourceCoordinator, + final RdsSourceConfig sourceConfig, + final S3Client s3Client, + final EventFactory eventFactory, + final Buffer> buffer) { + this.sourceCoordinator = sourceCoordinator; + this.sourceConfig = sourceConfig; + codec = new ParquetInputCodec(eventFactory); + bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); + objectReader = new S3ObjectReader(s3Client); + recordConverter = new ExportRecordConverter(); + executor = Executors.newFixedThreadPool(DATA_LOADER_MAX_JOB_COUNT); + } + + @Override + public void run() { + LOG.debug("Starting Data File Scheduler to process S3 data files for export"); + + while (!shutdownRequested && !Thread.currentThread().isInterrupted()) { + try { + if (numOfWorkers.get() < DATA_LOADER_MAX_JOB_COUNT) { + final Optional sourcePartition = sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); + + if (sourcePartition.isPresent()) { + LOG.debug("Acquired data file partition"); + DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get(); + LOG.debug("Start processing data file partition"); + processDataFilePartition(dataFilePartition); + } + } + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception while processing an S3 data file, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } + } + LOG.warn("Data file scheduler is interrupted, stopping all data file loaders..."); + + executor.shutdown(); + } + + public void shutdown() { + shutdownRequested = true; + } + + private void processDataFilePartition(DataFilePartition dataFilePartition) { + Runnable loader = DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, objectReader, recordConverter); + CompletableFuture runLoader = CompletableFuture.runAsync(loader, executor); + + runLoader.whenComplete((v, ex) -> { + if (ex == null) { + // Update global state so we know if all s3 files have been loaded + updateLoadStatus(dataFilePartition.getExportTaskId(), DEFAULT_UPDATE_LOAD_STATUS_TIMEOUT); + sourceCoordinator.completePartition(dataFilePartition); + } else { + LOG.error("There was an exception while processing an S3 data file", (Throwable) ex); + sourceCoordinator.giveUpPartition(dataFilePartition); + } + numOfWorkers.decrementAndGet(); + }); + numOfWorkers.incrementAndGet(); + } + + private void updateLoadStatus(String exportTaskId, Duration timeout) { + + Instant endTime = Instant.now().plus(timeout); + // Keep retrying in case update fails due to conflicts until timed out + while (Instant.now().isBefore(endTime)) { + Optional globalStatePartition = sourceCoordinator.getPartition(exportTaskId); + if (globalStatePartition.isEmpty()) { + LOG.error("Failed to get data file load status for {}", exportTaskId); + return; + } + + GlobalState globalState = (GlobalState) globalStatePartition.get(); + LoadStatus loadStatus = LoadStatus.fromMap(globalState.getProgressState().get()); + loadStatus.setLoadedFiles(loadStatus.getLoadedFiles() + 1); + LOG.info("Current data file load status: total {} loaded {}", loadStatus.getTotalFiles(), loadStatus.getLoadedFiles()); + + globalState.setProgressState(loadStatus.toMap()); + + try { + sourceCoordinator.saveProgressStateForPartition(globalState, null); + // TODO: Stream is enabled and loadStatus.getLoadedFiles() == loadStatus.getTotalFiles(), create global state to indicate that stream can start + break; + } catch (Exception e) { + LOG.error("Failed to update the global status, looks like the status was out of date, will retry.."); + } + } + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java index 51db82248b..abcbd2c1f4 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportScheduler.java @@ -8,22 +8,36 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; +import org.opensearch.dataprepper.plugins.source.rds.model.ExportObjectKey; import org.opensearch.dataprepper.plugins.source.rds.model.ExportStatus; +import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus; import org.opensearch.dataprepper.plugins.source.rds.model.SnapshotInfo; import org.opensearch.dataprepper.plugins.source.rds.model.SnapshotStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.rds.RdsClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.stream.Collectors; public class ExportScheduler implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class); @@ -34,8 +48,10 @@ public class ExportScheduler implements Runnable { private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 5 * 60_000; private static final int DEFAULT_CHECK_STATUS_INTERVAL_MILLS = 30 * 1000; private static final Duration DEFAULT_SNAPSHOT_STATUS_CHECK_TIMEOUT = Duration.ofMinutes(60); + static final String PARQUET_SUFFIX = ".parquet"; private final RdsClient rdsClient; + private final S3Client s3Client; private final PluginMetrics pluginMetrics; private final EnhancedSourceCoordinator sourceCoordinator; private final ExecutorService executor; @@ -46,10 +62,12 @@ public class ExportScheduler implements Runnable { public ExportScheduler(final EnhancedSourceCoordinator sourceCoordinator, final RdsClient rdsClient, + final S3Client s3Client, final PluginMetrics pluginMetrics) { this.pluginMetrics = pluginMetrics; this.sourceCoordinator = sourceCoordinator; this.rdsClient = rdsClient; + this.s3Client = s3Client; this.executor = Executors.newCachedThreadPool(); this.exportTaskManager = new ExportTaskManager(rdsClient); this.snapshotManager = new SnapshotManager(rdsClient); @@ -72,7 +90,8 @@ public void run() { LOG.error("The export to S3 failed, it will be retried"); closeExportPartitionWithError(exportPartition); } else { - CompletableFuture checkStatus = CompletableFuture.supplyAsync(() -> checkExportStatus(exportPartition), executor); + CheckExportStatusRunner checkExportStatusRunner = new CheckExportStatusRunner(sourceCoordinator, exportTaskManager, exportPartition); + CompletableFuture checkStatus = CompletableFuture.supplyAsync(checkExportStatusRunner::call, executor); checkStatus.whenComplete(completeExport(exportPartition)); } } @@ -179,29 +198,46 @@ private String checkSnapshotStatus(String snapshotId, Duration timeout) { throw new RuntimeException("Snapshot status check timed out."); } - private String checkExportStatus(ExportPartition exportPartition) { - long lastCheckpointTime = System.currentTimeMillis(); - String exportTaskId = exportPartition.getProgressState().get().getExportTaskId(); + static class CheckExportStatusRunner implements Callable { + private final EnhancedSourceCoordinator sourceCoordinator; + private final ExportTaskManager exportTaskManager; + private final ExportPartition exportPartition; - LOG.debug("Start checking the status of export {}", exportTaskId); - while (true) { - if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - sourceCoordinator.saveProgressStateForPartition(exportPartition, null); - lastCheckpointTime = System.currentTimeMillis(); - } + CheckExportStatusRunner(EnhancedSourceCoordinator sourceCoordinator, ExportTaskManager exportTaskManager, ExportPartition exportPartition) { + this.sourceCoordinator = sourceCoordinator; + this.exportTaskManager = exportTaskManager; + this.exportPartition = exportPartition; + } - // Valid statuses are: CANCELED, CANCELING, COMPLETE, FAILED, IN_PROGRESS, STARTING - String status = exportTaskManager.checkExportStatus(exportTaskId); - LOG.debug("Current export status is {}.", status); - if (ExportStatus.isTerminal(status)) { - LOG.info("Export {} is completed with final status {}", exportTaskId, status); - return status; - } - LOG.debug("Export {} is still running in progress. Wait and check later", exportTaskId); - try { - Thread.sleep(DEFAULT_CHECK_STATUS_INTERVAL_MILLS); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public String call() { + return checkExportStatus(exportPartition); + } + + private String checkExportStatus(ExportPartition exportPartition) { + long lastCheckpointTime = System.currentTimeMillis(); + String exportTaskId = exportPartition.getProgressState().get().getExportTaskId(); + + LOG.debug("Start checking the status of export {}", exportTaskId); + while (true) { + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + sourceCoordinator.saveProgressStateForPartition(exportPartition, null); + lastCheckpointTime = System.currentTimeMillis(); + } + + // Valid statuses are: CANCELED, CANCELING, COMPLETE, FAILED, IN_PROGRESS, STARTING + String status = exportTaskManager.checkExportStatus(exportTaskId); + LOG.debug("Current export status is {}.", status); + if (ExportStatus.isTerminal(status)) { + LOG.info("Export {} is completed with final status {}", exportTaskId, status); + return status; + } + LOG.debug("Export {} is still running in progress. Wait and check later", exportTaskId); + try { + Thread.sleep(DEFAULT_CHECK_STATUS_INTERVAL_MILLS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } } @@ -219,11 +255,61 @@ private BiConsumer completeExport(ExportPartition exportParti } LOG.info("Export for {} completed successfully", exportPartition.getPartitionKey()); + ExportProgressState state = exportPartition.getProgressState().get(); + String bucket = state.getBucket(); + String prefix = state.getPrefix(); + String exportTaskId = state.getExportTaskId(); + + // Create data file partitions for processing S3 files + List dataFileObjectKeys = getDataFileObjectKeys(bucket, prefix, exportTaskId); + createDataFilePartitions(bucket, exportTaskId, dataFileObjectKeys); + completeExportPartition(exportPartition); } }; } + private List getDataFileObjectKeys(String bucket, String prefix, String exportTaskId) { + LOG.debug("Fetching object keys for export data files."); + ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder() + .bucket(bucket) + .prefix(prefix + "/" + exportTaskId); + + List objectKeys = new ArrayList<>(); + ListObjectsV2Response response = null; + do { + String nextToken = response == null ? null : response.nextContinuationToken(); + response = s3Client.listObjectsV2(requestBuilder + .continuationToken(nextToken) + .build()); + objectKeys.addAll(response.contents().stream() + .map(S3Object::key) + .filter(key -> key.endsWith(PARQUET_SUFFIX)) + .collect(Collectors.toList())); + + } while (response.isTruncated()); + return objectKeys; + } + + private void createDataFilePartitions(String bucket, String exportTaskId, List dataFileObjectKeys) { + LOG.info("Total of {} data files generated for export {}", dataFileObjectKeys.size(), exportTaskId); + AtomicInteger totalFiles = new AtomicInteger(); + for (final String objectKey : dataFileObjectKeys) { + DataFileProgressState progressState = new DataFileProgressState(); + ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKey); + String table = exportObjectKey.getTableName(); + progressState.setSourceTable(table); + + DataFilePartition dataFilePartition = new DataFilePartition(exportTaskId, bucket, objectKey, Optional.of(progressState)); + sourceCoordinator.createPartition(dataFilePartition); + totalFiles.getAndIncrement(); + } + + // Create a global state to track overall progress for data file processing + LoadStatus loadStatus = new LoadStatus(totalFiles.get(), 0); + sourceCoordinator.createPartition(new GlobalState(exportTaskId, loadStatus.toMap())); + } + private void completeExportPartition(ExportPartition exportPartition) { ExportProgressState progressState = exportPartition.getProgressState().get(); progressState.setStatus("Completed"); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReader.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReader.java new file mode 100644 index 0000000000..39c0079198 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReader.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +import java.io.InputStream; + +public class S3ObjectReader { + + private static final Logger LOG = LoggerFactory.getLogger(S3ObjectReader.class); + + private final S3Client s3Client; + + public S3ObjectReader(S3Client s3Client) { + this.s3Client = s3Client; + } + + public InputStream readFile(String bucketName, String s3Key) { + LOG.debug("Read file from s3://{}/{}", bucketName, s3Key); + + GetObjectRequest objectRequest = GetObjectRequest.builder() + .bucket(bucketName) + .key(s3Key) + .build(); + + return s3Client.getObject(objectRequest); + } + +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java new file mode 100644 index 0000000000..c69dcc7651 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKey.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +/** + * Represents the object key for an object exported to S3 by RDS. + * The object key has this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" + */ +public class ExportObjectKey { + + private final String prefix; + private final String exportTaskId; + private final String databaseName; + private final String tableName; + private final String numberedFolder; + private final String fileName; + + ExportObjectKey(final String prefix, final String exportTaskId, final String databaseName, final String tableName, final String numberedFolder, final String fileName) { + this.prefix = prefix; + this.exportTaskId = exportTaskId; + this.databaseName = databaseName; + this.tableName = tableName; + this.numberedFolder = numberedFolder; + this.fileName = fileName; + } + + public static ExportObjectKey fromString(final String objectKeyString) { + + final String[] parts = objectKeyString.split("/"); + if (parts.length != 6) { + throw new IllegalArgumentException("Export object key is not valid: " + objectKeyString); + } + final String prefix = parts[0]; + final String exportTaskId = parts[1]; + final String databaseName = parts[2]; + final String tableName = parts[3]; + final String numberedFolder = parts[4]; + final String fileName = parts[5]; + return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName); + } + + public String getPrefix() { + return prefix; + } + + public String getExportTaskId() { + return exportTaskId; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public String getNumberedFolder() { + return numberedFolder; + } + + public String getFileName() { + return fileName; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/LoadStatus.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/LoadStatus.java new file mode 100644 index 0000000000..a2762c1b38 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/model/LoadStatus.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import java.util.Map; + +public class LoadStatus { + + private static final String TOTAL_FILES = "totalFiles"; + private static final String LOADED_FILES = "loadedFiles"; + + private int totalFiles; + + private int loadedFiles; + + public LoadStatus(int totalFiles, int loadedFiles) { + this.totalFiles = totalFiles; + this.loadedFiles = loadedFiles; + } + + public int getTotalFiles() { + return totalFiles; + } + + public void setTotalFiles(int totalFiles) { + this.totalFiles = totalFiles; + } + + public int getLoadedFiles() { + return loadedFiles; + } + + public void setLoadedFiles(int loadedFiles) { + this.loadedFiles = loadedFiles; + } + + public Map toMap() { + return Map.of( + TOTAL_FILES, totalFiles, + LOADED_FILES, loadedFiles + ); + } + + public static LoadStatus fromMap(Map map) { + return new LoadStatus( + ((Number) map.get(TOTAL_FILES)).intValue(), + ((Number) map.get(LOADED_FILES)).intValue() + ); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java index 6aaa0b0bd5..7a18dd6159 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsServiceTest.java @@ -14,8 +14,10 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler; import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler; import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler; import software.amazon.awssdk.services.rds.RdsClient; @@ -47,6 +49,9 @@ class RdsServiceTest { @Mock private ExecutorService executor; + @Mock + private EventFactory eventFactory; + @Mock private ClientFactory clientFactory; @@ -59,8 +64,9 @@ void setUp() { } @Test - void test_normal_service_start() { + void test_normal_service_start_when_export_is_enabled() { RdsService rdsService = createObjectUnderTest(); + when(sourceConfig.isExportEnabled()).thenReturn(true); try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executor); rdsService.start(buffer); @@ -68,6 +74,7 @@ void test_normal_service_start() { verify(executor).submit(any(LeaderScheduler.class)); verify(executor).submit(any(ExportScheduler.class)); + verify(executor).submit(any(DataFileScheduler.class)); } @Test @@ -83,6 +90,6 @@ void test_service_shutdown_calls_executor_shutdownNow() { } private RdsService createObjectUnderTest() { - return new RdsService(sourceCoordinator, sourceConfig, clientFactory, pluginMetrics); + return new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java index edd409e5e4..682f16ed51 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/RdsSourceTest.java @@ -12,6 +12,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -27,6 +28,9 @@ class RdsSourceTest { @Mock private RdsSourceConfig sourceConfig; + @Mock + private EventFactory eventFactory; + @Mock AwsCredentialsSupplier awsCredentialsSupplier; @@ -45,6 +49,6 @@ void test_when_buffer_is_null_then_start_throws_exception() { } private RdsSource createObjectUnderTest() { - return new RdsSource(pluginMetrics, sourceConfig, awsCredentialsSupplier); + return new RdsSource(pluginMetrics, sourceConfig, eventFactory, awsCredentialsSupplier); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java new file mode 100644 index 0000000000..79c5597c3b --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/ExportRecordConverterTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.converter; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; +import static org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter.EXPORT_EVENT_TYPE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; + +@ExtendWith(MockitoExtension.class) +class ExportRecordConverterTest { + + @Test + void test_convert() { + final String tableName = UUID.randomUUID().toString(); + final String primaryKeyName = UUID.randomUUID().toString(); + final String primaryKeyValue = UUID.randomUUID().toString(); + final Event testEvent = TestEventFactory.getTestEventFactory().eventBuilder(EventBuilder.class) + .withEventType("EVENT") + .withData(Map.of(primaryKeyName, primaryKeyValue)) + .build(); + + Record testRecord = new Record<>(testEvent); + + ExportRecordConverter exportRecordConverter = new ExportRecordConverter(); + Event actualEvent = exportRecordConverter.convert(testRecord, tableName, primaryKeyName); + + // Assert + assertThat(actualEvent.getMetadata().getAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE), equalTo(tableName)); + assertThat(actualEvent.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(primaryKeyValue)); + assertThat(actualEvent.getMetadata().getAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE), equalTo(EXPORT_EVENT_TYPE)); + assertThat(actualEvent, sameInstance(testRecord.getData())); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java new file mode 100644 index 0000000000..1ed91bc031 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoaderTest.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; + +import java.io.InputStream; +import java.util.UUID; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DataFileLoaderTest { + + @Mock + private DataFilePartition dataFilePartition; + + @Mock + private BufferAccumulator> bufferAccumulator; + + @Mock + private InputCodec codec; + + @Mock + private S3ObjectReader s3ObjectReader; + + @Mock + private ExportRecordConverter recordConverter; + + @Test + void test_run() throws Exception { + final String bucket = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + when(dataFilePartition.getBucket()).thenReturn(bucket); + when(dataFilePartition.getKey()).thenReturn(key); + + InputStream inputStream = mock(InputStream.class); + when(s3ObjectReader.readFile(bucket, key)).thenReturn(inputStream); + + DataFileLoader objectUnderTest = createObjectUnderTest(); + objectUnderTest.run(); + + verify(codec).parse(eq(inputStream), any(Consumer.class)); + verify(bufferAccumulator).flush(); + } + + private DataFileLoader createObjectUnderTest() { + return DataFileLoader.create(dataFilePartition, codec, bufferAccumulator, s3ObjectReader, recordConverter); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java new file mode 100644 index 0000000000..ee0d0e2852 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileSchedulerTest.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; +import org.opensearch.dataprepper.plugins.source.rds.converter.ExportRecordConverter; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.source.rds.model.LoadStatus; +import software.amazon.awssdk.services.s3.S3Client; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DataFileSchedulerTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + + @Mock + private RdsSourceConfig sourceConfig; + + @Mock + private S3Client s3Client; + + @Mock + private EventFactory eventFactory; + + @Mock + private Buffer> buffer; + + @Mock + private DataFilePartition dataFilePartition; + + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + } + + @Test + void test_given_no_datafile_partition_then_no_export() throws InterruptedException { + when(sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).thenReturn(Optional.empty()); + + final DataFileScheduler objectUnderTest = createObjectUnderTest(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(objectUnderTest); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)); + Thread.sleep(100); + executorService.shutdownNow(); + + verifyNoInteractions(s3Client, buffer); + } + + @Test + void test_given_available_datafile_partition_then_load_datafile() { + DataFileScheduler objectUnderTest = createObjectUnderTest(); + final String exportTaskId = UUID.randomUUID().toString(); + when(dataFilePartition.getExportTaskId()).thenReturn(exportTaskId); + + when(sourceCoordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE)).thenReturn(Optional.of(dataFilePartition)); + final GlobalState globalStatePartition = mock(GlobalState.class); + final int totalFiles = random.nextInt() + 1; + final Map loadStatusMap = new LoadStatus(totalFiles, totalFiles - 1).toMap(); + when(globalStatePartition.getProgressState()).thenReturn(Optional.of(loadStatusMap)); + when(sourceCoordinator.getPartition(exportTaskId)).thenReturn(Optional.of(globalStatePartition)); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> { + // MockedStatic needs to be created on the same thread it's used + try (MockedStatic dataFileLoaderMockedStatic = mockStatic(DataFileLoader.class)) { + DataFileLoader dataFileLoader = mock(DataFileLoader.class); + dataFileLoaderMockedStatic.when(() -> DataFileLoader.create( + eq(dataFilePartition), any(InputCodec.class), any(BufferAccumulator.class), any(S3ObjectReader.class), any(ExportRecordConverter.class))) + .thenReturn(dataFileLoader); + doNothing().when(dataFileLoader).run(); + objectUnderTest.run(); + } + }); + await().atMost(Duration.ofSeconds(1)) + .untilAsserted(() -> verify(sourceCoordinator).completePartition(dataFilePartition)); + executorService.shutdownNow(); + + verify(sourceCoordinator).completePartition(dataFilePartition); + } + + @Test + void test_shutdown() { + DataFileScheduler objectUnderTest = createObjectUnderTest(); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(objectUnderTest); + + objectUnderTest.shutdown(); + + verifyNoMoreInteractions(sourceCoordinator); + executorService.shutdownNow(); + } + + private DataFileScheduler createObjectUnderTest() { + return new DataFileScheduler(sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java index d0560ab30d..32aff02a57 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportSchedulerTest.java @@ -15,6 +15,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; import software.amazon.awssdk.services.rds.RdsClient; @@ -27,9 +28,14 @@ import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse; import software.amazon.awssdk.services.rds.model.StartExportTaskRequest; import software.amazon.awssdk.services.rds.model.StartExportTaskResponse; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -44,6 +50,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler.PARQUET_SUFFIX; @ExtendWith(MockitoExtension.class) @@ -55,6 +62,9 @@ class ExportSchedulerTest { @Mock private RdsClient rdsClient; + @Mock + private S3Client s3Client; + @Mock private PluginMetrics pluginMetrics; @@ -96,6 +106,18 @@ void test_given_export_partition_and_task_id_then_complete_export() throws Inter when(describeExportTasksResponse.exportTasks().get(0).status()).thenReturn("COMPLETE"); when(rdsClient.describeExportTasks(any(DescribeExportTasksRequest.class))).thenReturn(describeExportTasksResponse); + // Mock list s3 objects response + ListObjectsV2Response listObjectsV2Response = mock(ListObjectsV2Response.class); + String exportTaskId = UUID.randomUUID().toString(); + String tableName = UUID.randomUUID().toString(); + // objectKey needs to have this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" + S3Object s3Object = S3Object.builder() + .key("prefix/" + exportTaskId + "/my_db/" + tableName + "/1/file1" + PARQUET_SUFFIX) + .build(); + when(listObjectsV2Response.contents()).thenReturn(List.of(s3Object)); + when(listObjectsV2Response.isTruncated()).thenReturn(false); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(exportScheduler); await().atMost(Duration.ofSeconds(1)) @@ -103,6 +125,7 @@ void test_given_export_partition_and_task_id_then_complete_export() throws Inter Thread.sleep(100); executorService.shutdownNow(); + verify(sourceCoordinator).createPartition(any(DataFilePartition.class)); verify(sourceCoordinator).completePartition(exportPartition); verify(rdsClient, never()).startExportTask(any(StartExportTaskRequest.class)); verify(rdsClient, never()).createDBSnapshot(any(CreateDbSnapshotRequest.class)); @@ -110,7 +133,7 @@ void test_given_export_partition_and_task_id_then_complete_export() throws Inter @Test - void test_given_export_partition_and_no_task_id_then_start_and_complete_export() throws InterruptedException { + void test_given_export_partition_without_task_id_then_start_and_complete_export() throws InterruptedException { when(sourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).thenReturn(Optional.of(exportPartition)); when(exportPartition.getPartitionKey()).thenReturn(UUID.randomUUID().toString()); when(exportProgressState.getExportTaskId()).thenReturn(null).thenReturn(UUID.randomUUID().toString()); @@ -142,6 +165,18 @@ void test_given_export_partition_and_no_task_id_then_start_and_complete_export() when(describeExportTasksResponse.exportTasks().get(0).status()).thenReturn("COMPLETE"); when(rdsClient.describeExportTasks(any(DescribeExportTasksRequest.class))).thenReturn(describeExportTasksResponse); + // Mock list s3 objects response + ListObjectsV2Response listObjectsV2Response = mock(ListObjectsV2Response.class); + String exportTaskId = UUID.randomUUID().toString(); + String tableName = UUID.randomUUID().toString(); + // objectKey needs to have this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}" + S3Object s3Object = S3Object.builder() + .key("prefix/" + exportTaskId + "/my_db/" + tableName + "/1/file1" + PARQUET_SUFFIX) + .build(); + when(listObjectsV2Response.contents()).thenReturn(List.of(s3Object)); + when(listObjectsV2Response.isTruncated()).thenReturn(false); + when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); + final ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(exportScheduler); await().atMost(Duration.ofSeconds(1)) @@ -151,6 +186,7 @@ void test_given_export_partition_and_no_task_id_then_start_and_complete_export() verify(rdsClient).createDBSnapshot(any(CreateDbSnapshotRequest.class)); verify(rdsClient).startExportTask(any(StartExportTaskRequest.class)); + verify(sourceCoordinator).createPartition(any(DataFilePartition.class)); verify(sourceCoordinator).completePartition(exportPartition); } @@ -166,6 +202,6 @@ void test_shutDown() { } private ExportScheduler createObjectUnderTest() { - return new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics); + return new ExportScheduler(sourceCoordinator, rdsClient, s3Client, pluginMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReaderTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReaderTest.java new file mode 100644 index 0000000000..44aa22f6ad --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/S3ObjectReaderTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.export; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class S3ObjectReaderTest { + + @Mock + private S3Client s3Client; + + private S3ObjectReader s3ObjectReader; + + + @BeforeEach + void setUp() { + s3ObjectReader = createObjectUnderTest(); + } + + @Test + void test_readFile() { + final String bucketName = UUID.randomUUID().toString(); + final String key = UUID.randomUUID().toString(); + + + s3ObjectReader.readFile(bucketName, key); + + ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); + verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture()); + + GetObjectRequest request = getObjectRequestArgumentCaptor.getValue(); + assertThat(request.bucket(), equalTo(bucketName)); + assertThat(request.key(), equalTo(key)); + } + + private S3ObjectReader createObjectUnderTest() { + return new S3ObjectReader(s3Client); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java new file mode 100644 index 0000000000..7056114572 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/model/ExportObjectKeyTest.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.rds.model; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ExportObjectKeyTest { + + @Test + void test_fromString_with_valid_input_string() { + final String objectKeyString = "prefix/export-task-id/db-name/table-name/1/file-name.parquet"; + final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString); + + assertThat(exportObjectKey.getPrefix(), equalTo("prefix")); + assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id")); + assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name")); + assertThat(exportObjectKey.getTableName(), equalTo("table-name")); + assertThat(exportObjectKey.getNumberedFolder(), equalTo("1")); + assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet")); + } + + @Test + void test_fromString_with_invalid_input_string() { + final String objectKeyString = "prefix/export-task-id/db-name/table-name/1/"; + + Throwable exception = assertThrows(IllegalArgumentException.class, () -> ExportObjectKey.fromString(objectKeyString)); + assertThat(exception.getMessage(), containsString("Export object key is not valid: " + objectKeyString)); + } +} \ No newline at end of file