Skip to content

Commit

Permalink
Miscellaneous fixes/improvements to the DynamoDb source (opensearch-p…
Browse files Browse the repository at this point in the history
…roject#3489)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Oct 13, 2023
1 parent b4b4a98 commit ccbe50c
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,31 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;
package org.opensearch.dataprepper.model.opensearch;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum BulkAction {
public enum OpenSearchBulkActions {

CREATE("create"),
UPSERT("upsert"),
UPDATE("update"),
DELETE("delete"),
INDEX("index");

private static final Map<String, BulkAction> ACTIONS_MAP = Arrays.stream(BulkAction.values())
.collect(Collectors.toMap(
value -> value.action,
value -> value
));
private static final Map<String, OpenSearchBulkActions> ACTIONS_MAP = Arrays.stream(OpenSearchBulkActions.values())
.collect(Collectors.toMap(
value -> value.action,
value -> value
));

private final String action;

BulkAction(String action) {
OpenSearchBulkActions(String action) {
this.action = action.toLowerCase();
}

Expand All @@ -37,7 +37,7 @@ public String toString() {
}

@JsonCreator
public static BulkAction fromOptionValue(final String option) {
public static OpenSearchBulkActions fromOptionValue(final String option) {
return ACTIONS_MAP.get(option.toLowerCase());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;
package org.opensearch.dataprepper.model.opensearch;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -12,13 +12,11 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class BulkActionTest {

public class OpenSearchBulkActionsTest {
@ParameterizedTest
@EnumSource(BulkAction.class)
void fromOptionValue(final BulkAction action) {
assertThat(BulkAction.fromOptionValue(action.name()), is(action));
assertThat(action, instanceOf(BulkAction.class));
@EnumSource(OpenSearchBulkActions.class)
void fromOptionValue(final OpenSearchBulkActions action) {
assertThat(OpenSearchBulkActions.fromOptionValue(action.name()), is(action));
assertThat(action, instanceOf(OpenSearchBulkActions.class));
}

}
2 changes: 1 addition & 1 deletion data-prepper-plugins/dynamodb-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ source:

### Stream Configurations

* start_position (Optional): start position of the stream, can be either BEGINNING or LATEST. If export is required,
* start_position (Optional): start position of the stream, can be either TRIM_HORIZON or LATEST. If export is required,
this value will be ignored and set to LATEST by default. This is useful if customer don’t want to run initial export,
so they can
choose either from the beginning of the stream (up to 24 hours) or from latest (from the time point when pipeline is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamStartPosition;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
Expand Down Expand Up @@ -161,8 +162,8 @@ public void init() {

if (tableInfo.getMetadata().isStreamRequired()) {
List<String> shardIds;
// start position by default is beginning if not provided.
if (tableInfo.getMetadata().isExportRequired() || "LATEST".equals(tableInfo.getMetadata().getStreamStartPosition())) {
// start position by default is TRIM_HORIZON if not provided.
if (tableInfo.getMetadata().isExportRequired() || String.valueOf(StreamStartPosition.LATEST).equals(tableInfo.getMetadata().getStreamStartPosition())) {
// For a continued data extraction process that involves both export and stream
// The export must be completed and loaded before stream can start.
// Moreover, there should not be any gaps between the export time and the time start reading the stream
Expand Down Expand Up @@ -263,7 +264,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
}
}

String streamStartPosition = null;
StreamStartPosition streamStartPosition = null;

if (tableConfig.getStreamConfig() != null) {
// Validate if DynamoDB Stream is turn on or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
public class StreamConfig {

@JsonProperty(value = "start_position")
private String startPosition;
private StreamStartPosition startPosition = StreamStartPosition.LATEST;

public String getStartPosition() {
public StreamStartPosition getStartPosition() {
return startPosition;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;

public enum StreamStartPosition {
TRIM_HORIZON,
LATEST
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.converter;

public class MetadataKeyAttributes {
static final String COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "_id";

static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key";

static final String SORT_KEY_METADATA_ATTRIBUTE = "sort_key";

static final String EVENT_TIMESTAMP_METADATA_ATTRIBUTE = "ts";

static final String STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE = "op";

static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,29 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;

import java.time.Instant;
import java.util.List;
import java.util.Map;

import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE;

/**
* Base Record Processor definition.
* The record processor is to transform the source data into a JacksonEvent,
* and then write to buffer.
*/
public abstract class RecordConverter {


private static final String KEYS_TAG_NAME = "_id";

private static final String EVENT_TIMESTAMP_TAG_NAME = "ts";

private static final String EVENT_OP_TAG_NAME = "op";

private static final String EVENT_SOURCE_TAG_NAME = "source";

private static final String DEFAULT_ACTION = "index";
private static final String DEFAULT_ACTION = OpenSearchBulkActions.INDEX.toString();

private static final int DEFAULT_WRITE_TIMEOUT_MILLIS = 60_000;

Expand Down Expand Up @@ -61,32 +60,58 @@ String getId(Map<String, Object> data) {
return partitionKey + "_" + sortKey;
}

String getPartitionKey(final Map<String, Object> data) {
return String.valueOf(data.get(tableInfo.getMetadata().getPartitionKeyAttributeName()));
}

String getSortKey(final Map<String, Object> data) {
return String.valueOf(data.get(tableInfo.getMetadata().getSortKeyAttributeName()));
}

void writeEventsToBuffer(List<Record<Event>> events) throws Exception {
buffer.writeAll(events, DEFAULT_WRITE_TIMEOUT_MILLIS);
}

public Record<Event> convertToEvent(Map<String, Object> data, Instant eventCreationTime, String action) {
public Record<Event> convertToEvent(Map<String, Object> data, Instant eventCreationTime, String streamEventName) {
Event event;
event = JacksonEvent.builder()
.withEventType(getEventType())
.withData(data)
.build();
EventMetadata eventMetadata = event.getMetadata();

eventMetadata.setAttribute(EVENT_SOURCE_TAG_NAME, tableInfo.getTableArn());
eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName());
if (eventCreationTime != null) {
eventMetadata.setAttribute(EVENT_TIMESTAMP_TAG_NAME, eventCreationTime.toEpochMilli());
eventMetadata.setAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE, eventCreationTime.toEpochMilli());
}

eventMetadata.setAttribute(EVENT_OP_TAG_NAME, action);
eventMetadata.setAttribute(KEYS_TAG_NAME, getId(data));
eventMetadata.setAttribute(STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(streamEventName));
eventMetadata.setAttribute(COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data));
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, getPartitionKey(data));
eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, getSortKey(data));

return new Record<>(event);
}

public Record<Event> convertToEvent(Map<String, Object> data) {
return convertToEvent(data, null, DEFAULT_ACTION);
return convertToEvent(data, null, null);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
if (streamEventName == null) {
return DEFAULT_ACTION;
}

switch (streamEventName) {
case "INSERT":
return OpenSearchBulkActions.CREATE.toString();
case "MODIFY":
return OpenSearchBulkActions.UPSERT.toString();
case "REMOVE":
return OpenSearchBulkActions.DELETE.toString();
default:
return DEFAULT_ACTION;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.model;

import software.amazon.awssdk.arns.Arn;

public class TableInfo {

private final String tableArn;
private final String tableName;

private final TableMetadata metadata;

public TableInfo(String tableArn, TableMetadata metadata) {
this.tableArn = tableArn;
this.metadata = metadata;
this.tableName = Arn.fromString(tableArn).resource().resource();
}

public String getTableArn() {
return tableArn;
}

public String getTableName() { return tableName; }

public TableMetadata getMetadata() {
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.model;

import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamStartPosition;

import java.util.HashMap;
import java.util.Map;

Expand All @@ -22,7 +24,7 @@ public class TableMetadata {

private final String sortKeyAttributeName;

private final String streamStartPosition;
private final StreamStartPosition streamStartPosition;

private final String streamArn;

Expand Down Expand Up @@ -68,7 +70,7 @@ public static class Builder {

private String exportPrefix;

private String streamStartPosition;
private StreamStartPosition streamStartPosition;


public Builder partitionKeyAttributeName(String partitionKeyAttributeName) {
Expand Down Expand Up @@ -106,7 +108,7 @@ public Builder exportPrefix(String exportPrefix) {
return this;
}

public Builder streamStartPosition(String streamStartPosition) {
public Builder streamStartPosition(StreamStartPosition streamStartPosition) {
this.streamStartPosition = streamStartPosition;
return this;
}
Expand Down Expand Up @@ -160,7 +162,7 @@ public boolean isExportRequired() {
return exportRequired;
}

public String getStreamStartPosition() {
public StreamStartPosition getStreamStartPosition() {
return streamStartPosition;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamStartPosition;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.InitPartition;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -143,7 +144,7 @@ void setup() {
// Mock configurations
lenient().when(exportConfig.getS3Bucket()).thenReturn(bucketName);
lenient().when(exportConfig.getS3Prefix()).thenReturn(prefix);
lenient().when(streamConfig.getStartPosition()).thenReturn("LATEST");
lenient().when(streamConfig.getStartPosition()).thenReturn(StreamStartPosition.LATEST);
lenient().when(tableConfig.getTableArn()).thenReturn(tableArn);
lenient().when(tableConfig.getExportConfig()).thenReturn(exportConfig);
lenient().when(tableConfig.getStreamConfig()).thenReturn(streamConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamStartPosition;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
import software.amazon.awssdk.regions.Region;

Expand Down Expand Up @@ -39,7 +40,7 @@ void test_general_config() throws JsonProcessingException {
" s3_prefix: \"xxx/\"\n" +
" - table_arn: \"arn:aws:dynamodb:us-west-2:123456789012:table/table-c\"\n" +
" stream:\n" +
" start_position: \"BEGINNING\" \n" +
" start_position: \"TRIM_HORIZON\" \n" +
"aws:\n" +
" region: \"us-west-2\"\n" +
" sts_role_arn: \"arn:aws:iam::123456789012:role/DataPrepperRole\"";
Expand All @@ -66,7 +67,7 @@ void test_general_config() throws JsonProcessingException {

TableConfig streamOnlyConfig = sourceConfiguration.getTableConfigs().get(2);
assertThat(streamOnlyConfig.getStreamConfig(), notNullValue());
assertThat(streamOnlyConfig.getStreamConfig().getStartPosition(), equalTo("BEGINNING"));
assertThat(streamOnlyConfig.getStreamConfig().getStartPosition(), equalTo(StreamStartPosition.TRIM_HORIZON));
assertNull(streamOnlyConfig.getExportConfig());

AwsAuthenticationConfig awsAuthenticationConfig = sourceConfiguration.getAwsAuthenticationConfig();
Expand Down
Loading

0 comments on commit ccbe50c

Please sign in to comment.