Skip to content

Commit

Permalink
Load exported S3 files in RDS source (opensearch-project#4718)
Browse files Browse the repository at this point in the history
* Add s3 file loader

Signed-off-by: Hai Yan <[email protected]>

* Make checkExportStatus a callable

Signed-off-by: Hai Yan <[email protected]>

* Fix unit tests

Signed-off-by: Hai Yan <[email protected]>

* Add load status and record converter

Signed-off-by: Hai Yan <[email protected]>

* Update unit tests

Signed-off-by: Hai Yan <[email protected]>

* Restore changes for test

Signed-off-by: Hai Yan <[email protected]>

* Address review comments

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Jul 15, 2024
1 parent 1ea308b commit 731de12
Show file tree
Hide file tree
Showing 23 changed files with 1,133 additions and 32 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:http-common')
implementation project(path: ':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:parquet-codecs')

implementation 'io.micrometer:micrometer-core'

Expand All @@ -22,4 +23,5 @@ dependencies {

testImplementation project(path: ':data-prepper-test-common')
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(path: ':data-prepper-test-event')
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.plugins.source.rds.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.s3.S3Client;

public class ClientFactory {
private final AwsCredentialsProvider awsCredentialsProvider;
Expand All @@ -32,4 +33,11 @@ public RdsClient buildRdsClient() {
.credentialsProvider(awsCredentialsProvider)
.build();
}

public S3Client buildS3Client() {
return S3Client.builder()
.region(awsAuthenticationConfig.getAwsRegion())
.credentialsProvider(awsCredentialsProvider)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.export.DataFileScheduler;
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -24,23 +27,34 @@
public class RdsService {
private static final Logger LOG = LoggerFactory.getLogger(RdsService.class);

/**
* Maximum concurrent data loader per node
*/
public static final int DATA_LOADER_MAX_JOB_COUNT = 1;

private final RdsClient rdsClient;
private final S3Client s3Client;
private final EnhancedSourceCoordinator sourceCoordinator;
private final EventFactory eventFactory;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private ExecutorService executor;
private LeaderScheduler leaderScheduler;
private ExportScheduler exportScheduler;
private DataFileScheduler dataFileScheduler;

public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final ClientFactory clientFactory,
final PluginMetrics pluginMetrics) {
this.sourceCoordinator = sourceCoordinator;
this.eventFactory = eventFactory;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;

rdsClient = clientFactory.buildRdsClient();
s3Client = clientFactory.buildS3Client();
}

/**
Expand All @@ -54,9 +68,15 @@ public void start(Buffer<Record<Event>> buffer) {
LOG.info("Start running RDS service");
final List<Runnable> runnableList = new ArrayList<>();
leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig);
exportScheduler = new ExportScheduler(sourceCoordinator, rdsClient, pluginMetrics);
runnableList.add(leaderScheduler);
runnableList.add(exportScheduler);

if (sourceConfig.isExportEnabled()) {
exportScheduler = new ExportScheduler(sourceCoordinator, rdsClient, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
sourceCoordinator, sourceConfig, s3Client, eventFactory, buffer);
runnableList.add(exportScheduler);
runnableList.add(dataFileScheduler);
}

executor = Executors.newFixedThreadPool(runnableList.size());
runnableList.forEach(executor::submit);
Expand All @@ -69,7 +89,10 @@ public void start(Buffer<Record<Event>> buffer) {
public void shutdown() {
if (executor != null) {
LOG.info("shutdown RDS schedulers");
exportScheduler.shutdown();
if (sourceConfig.isExportEnabled()) {
exportScheduler.shutdown();
dataFileScheduler.shutdown();
}
leaderScheduler.shutdown();
executor.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
Expand All @@ -33,15 +34,18 @@ public class RdsSource implements Source<Record<Event>>, UsesEnhancedSourceCoord
private final ClientFactory clientFactory;
private final PluginMetrics pluginMetrics;
private final RdsSourceConfig sourceConfig;
private final EventFactory eventFactory;
private EnhancedSourceCoordinator sourceCoordinator;
private RdsService rdsService;

@DataPrepperPluginConstructor
public RdsSource(final PluginMetrics pluginMetrics,
final RdsSourceConfig sourceConfig,
final EventFactory eventFactory,
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.eventFactory = eventFactory;

clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig());
}
Expand All @@ -51,7 +55,7 @@ public void start(Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);
sourceCoordinator.createPartition(new LeaderPartition());

rdsService = new RdsService(sourceCoordinator, sourceConfig, clientFactory, pluginMetrics);
rdsService = new RdsService(sourceCoordinator, sourceConfig, eventFactory, clientFactory, pluginMetrics);

LOG.info("Start RDS service");
rdsService.start(buffer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.converter;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.INGESTION_EVENT_TYPE_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.rds.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;

public class ExportRecordConverter {

private static final Logger LOG = LoggerFactory.getLogger(ExportRecordConverter.class);

static final String EXPORT_EVENT_TYPE = "EXPORT";

public Event convert(Record<Event> record, String tableName, String primaryKeyName) {
Event event = record.getData();

EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableName);
eventMetadata.setAttribute(INGESTION_EVENT_TYPE_ATTRIBUTE, EXPORT_EVENT_TYPE);

final Object primaryKeyValue = record.getData().get(primaryKeyName, Object.class);
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue);

return event;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.converter;

public class MetadataKeyAttributes {
static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key";

static final String EVENT_VERSION_FROM_TIMESTAMP = "document_version";

static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "event_timestamp";

static final String EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "opensearch_action";

static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";

static final String INGESTION_EVENT_TYPE_ATTRIBUTE = "ingestion_type";
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition;
Expand All @@ -25,8 +26,10 @@ public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem

if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) {
return new LeaderPartition(partitionStoreItem);
} if (ExportPartition.PARTITION_TYPE.equals(partitionType)) {
} else if (ExportPartition.PARTITION_TYPE.equals(partitionType)) {
return new ExportPartition(partitionStoreItem);
} else if (DataFilePartition.PARTITION_TYPE.equals(partitionType)) {
return new DataFilePartition(partitionStoreItem);
} else {
// Unable to acquire other partitions.
return new GlobalState(partitionStoreItem);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.partition;

import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.DataFileProgressState;

import java.util.Optional;

/**
* An DataFilePartition represents an export data file needs to be loaded.
* The source identifier contains keyword 'DATAFILE'
*/
public class DataFilePartition extends EnhancedSourcePartition<DataFileProgressState> {

public static final String PARTITION_TYPE = "DATAFILE";

private final String exportTaskId;
private final String bucket;
private final String key;
private final DataFileProgressState state;

public DataFilePartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {

setSourcePartitionStoreItem(sourcePartitionStoreItem);
String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|");
exportTaskId = keySplits[0];
bucket = keySplits[1];
key = keySplits[2];
state = convertStringToPartitionProgressState(DataFileProgressState.class, sourcePartitionStoreItem.getPartitionProgressState());

}

public DataFilePartition(final String exportTaskId,
final String bucket,
final String key,
final Optional<DataFileProgressState> state) {
this.exportTaskId = exportTaskId;
this.bucket = bucket;
this.key = key;
this.state = state.orElse(null);
}

@Override
public String getPartitionType() {
return PARTITION_TYPE;
}

@Override
public String getPartitionKey() {
return exportTaskId + "|" + bucket + "|" + key;
}

@Override
public Optional<DataFileProgressState> getProgressState() {
if (state != null) {
return Optional.of(state);
}
return Optional.empty();
}

public String getExportTaskId() {
return exportTaskId;
}

public String getBucket() {
return bucket;
}

public String getKey() {
return key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;

public class DataFileProgressState {

@JsonProperty("isLoaded")
private boolean isLoaded = false;

@JsonProperty("totalRecords")
private int totalRecords;

@JsonProperty("sourceTable")
private String sourceTable;

public int getTotalRecords() {
return totalRecords;
}

public void setTotalRecords(int totalRecords) {
this.totalRecords = totalRecords;
}

public boolean getLoaded() {
return isLoaded;
}

public void setLoaded(boolean loaded) {
this.isLoaded = loaded;
}

public String getSourceTable() {
return sourceTable;
}

public void setSourceTable(String sourceTable) {
this.sourceTable = sourceTable;
}
}
Loading

0 comments on commit 731de12

Please sign in to comment.