Skip to content

Commit

Permalink
AWS DDB SDK client support for remote data store
Browse files Browse the repository at this point in the history
Signed-off-by: Arjun kumar Giri <[email protected]>
  • Loading branch information
arjunkumargiri committed Jun 13, 2024
1 parent c26c11d commit 0e350f8
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ public class DeleteDataObjectRequest {
private final String index;
private final String id;

private final String tenantId;

/**
* Instantiate this request with an index and id.
* <p>
* For data storage implementations other than OpenSearch, an index may be referred to as a table and the id may be referred to as a primary key.
* @param index the index location to delete the object
* @param id the document id
*/
public DeleteDataObjectRequest(String index, String id) {
public DeleteDataObjectRequest(String index, String id, String tenantId) {
this.index = index;
this.id = id;
this.tenantId = tenantId;
}

/**
Expand All @@ -41,12 +44,17 @@ public String id() {
return this.id;
}

public String tenantId() {
return this.tenantId;
}

/**
* Class for constructing a Builder for this Request Object
*/
public static class Builder {
private String index = null;
private String id = null;
private String tenantId = null;

/**
* Empty Constructor for the Builder object
Expand All @@ -73,12 +81,17 @@ public Builder id(String id) {
return this;
}

public Builder tenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}

/**
* Builds the object
* @return A {@link DeleteDataObjectRequest}
*/
public DeleteDataObjectRequest build() {
return new DeleteDataObjectRequest(this.index, this.id);
return new DeleteDataObjectRequest(this.index, this.id, this.tenantId);
}
}
}
16 changes: 14 additions & 2 deletions common/src/main/java/org/opensearch/sdk/GetDataObjectRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class GetDataObjectRequest {

private final String index;
private final String id;
private final String tenantId;
private final FetchSourceContext fetchSourceContext;

/**
Expand All @@ -24,9 +25,10 @@ public class GetDataObjectRequest {
* @param id the document id
* @param fetchSourceContext the context to use when fetching _source
*/
public GetDataObjectRequest(String index, String id, FetchSourceContext fetchSourceContext) {
public GetDataObjectRequest(String index, String id, String tenantId, FetchSourceContext fetchSourceContext) {
this.index = index;
this.id = id;
this.tenantId = tenantId;
this.fetchSourceContext = fetchSourceContext;
}

Expand All @@ -46,6 +48,10 @@ public String id() {
return this.id;
}

public String tenantId() {
return this.tenantId;
}

/**
* Returns the context for fetching _source
* @return the fetchSourceContext
Expand All @@ -60,6 +66,7 @@ public FetchSourceContext fetchSourceContext() {
public static class Builder {
private String index = null;
private String id = null;
private String tenantId = null;
private FetchSourceContext fetchSourceContext;

/**
Expand Down Expand Up @@ -87,6 +94,11 @@ public Builder id(String id) {
return this;
}

public Builder tenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}

/**
* Add a fetchSourceContext to this builder
* @param fetchSourceContext the fetchSourceContext
Expand All @@ -102,7 +114,7 @@ public Builder fetchSourceContext(FetchSourceContext fetchSourceContext) {
* @return A {@link GetDataObjectRequest}
*/
public GetDataObjectRequest build() {
return new GetDataObjectRequest(this.index, this.id, this.fetchSourceContext);
return new GetDataObjectRequest(this.index, this.id, this.tenantId, this.fetchSourceContext);
}
}
}
28 changes: 26 additions & 2 deletions common/src/main/java/org/opensearch/sdk/PutDataObjectRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
public class PutDataObjectRequest {

private final String index;
private final String id;
private final String tenantId;
private final ToXContentObject dataObject;

/**
Expand All @@ -22,8 +24,10 @@ public class PutDataObjectRequest {
* @param index the index location to put the object
* @param dataObject the data object
*/
public PutDataObjectRequest(String index, ToXContentObject dataObject) {
public PutDataObjectRequest(String index, String id, String tenantId, ToXContentObject dataObject) {
this.index = index;
this.id = id;
this.tenantId = tenantId;
this.dataObject = dataObject;
}

Expand All @@ -35,6 +39,14 @@ public String index() {
return this.index;
}

public String id() {
return this.id;
}

public String tenantId() {
return this.tenantId;
}

/**
* Returns the data object
* @return the data object
Expand All @@ -48,6 +60,8 @@ public ToXContentObject dataObject() {
*/
public static class Builder {
private String index = null;
private String id = null;
private String tenantId = null;
private ToXContentObject dataObject = null;

/**
Expand All @@ -65,6 +79,16 @@ public Builder index(String index) {
return this;
}

public Builder id(String id) {
this.id = id;
return this;
}

public Builder tenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}

/**
* Add a data object to this builder
* @param dataObject the data object
Expand All @@ -80,7 +104,7 @@ public Builder dataObject(ToXContentObject dataObject) {
* @return A {@link PutDataObjectRequest}
*/
public PutDataObjectRequest build() {
return new PutDataObjectRequest(this.index, this.dataObject);
return new PutDataObjectRequest(this.index, this.id, this.tenantId, this.dataObject);
}
}
}
22 changes: 22 additions & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ dependencies {
implementation 'com.jayway.jsonpath:json-path:2.9.0'

implementation "org.opensearch.client:opensearch-java:2.10.2"
// Dynamo dependencies
implementation("software.amazon.awssdk:sdk-core:2.25.40")
implementation("software.amazon.awssdk:aws-core:2.25.40")
implementation "software.amazon.awssdk:aws-json-protocol:2.25.40"
implementation("software.amazon.awssdk:auth:2.25.40")
implementation("software.amazon.awssdk:checksums:2.25.40")
implementation("software.amazon.awssdk:checksums-spi:2.25.40")
implementation("software.amazon.awssdk:dynamodb:2.25.40")
implementation("software.amazon.awssdk:endpoints-spi:2.25.40")
implementation("software.amazon.awssdk:http-auth-aws:2.25.40")
implementation("software.amazon.awssdk:http-auth-spi:2.25.40")
implementation("software.amazon.awssdk:http-client-spi:2.25.40")
implementation("software.amazon.awssdk:identity-spi:2.25.40")
implementation "software.amazon.awssdk:json-utils:2.25.40"
implementation "software.amazon.awssdk:metrics-spi:2.25.40"
implementation("software.amazon.awssdk:profiles:2.25.40")
implementation "software.amazon.awssdk:protocol-core:2.25.40"
implementation("software.amazon.awssdk:regions:2.25.40")
implementation "software.amazon.awssdk:third-party-jackson-core:2.25.40"
implementation("software.amazon.awssdk:url-connection-client:2.25.40")
implementation("software.amazon.awssdk:utils:2.25.40")


configurations.all {
resolutionStrategy.force 'org.apache.httpcomponents.core5:httpcore5:5.2.4'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.opensearch.ml.sdkclient;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.opensearch.OpenSearchException;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.sdk.DeleteDataObjectRequest;
import org.opensearch.sdk.DeleteDataObjectResponse;
import org.opensearch.sdk.GetDataObjectRequest;
import org.opensearch.sdk.GetDataObjectResponse;
import org.opensearch.sdk.PutDataObjectRequest;
import org.opensearch.sdk.PutDataObjectResponse;
import org.opensearch.sdk.SdkClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

@AllArgsConstructor
@Log4j2
public class DDBOpenSearchClient implements SdkClient {

private static final String DEFAULT_TENANT = "DEFAULT_TENANT";

private static final String HASH_KEY = "tenant_id";
private static final String RANGE_KEY = "id";
private static final String SOURCE = "source";

private DynamoDbClient dynamoDbClient;
@Override
public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRequest request, Executor executor) {
final String id = request.id() != null ? request.id() : UUID.randomUUID().toString();
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final String tableName = getTableName(request.index());
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<PutDataObjectResponse>) () -> {
try (XContentBuilder sourceBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder builder = request.dataObject().toXContent(sourceBuilder, ToXContent.EMPTY_PARAMS);
String source = builder.toString();

final Map<String, AttributeValue> item = Map.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(id).build()),
Map.entry(SOURCE, AttributeValue.builder().s(source).build())
);
final PutItemRequest putItemRequest = PutItemRequest.builder()
.tableName(tableName)
.item(item)
.build();

dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
} catch (Exception e){
log.error("Exception while inserting data into DDB: " + e.getMessage(), e);
throw new OpenSearchException(e);
}
}), executor);
}

@Override
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request, Executor executor) {
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final GetItemRequest getItemRequest = GetItemRequest.builder()
.tableName(getTableName(request.index()))
.key(Map.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(request.id()).build())
))
.build();
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<GetDataObjectResponse>) () -> {
try {
final GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest);
if (getItemResponse == null || getItemResponse.item() == null || getItemResponse.item().isEmpty()) {
return new GetDataObjectResponse.Builder().id(request.id()).parser(Optional.empty()).build();
}

String source = getItemResponse.item().get(SOURCE).s();
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source);
return new GetDataObjectResponse.Builder().id(request.id()).parser(Optional.of(parser)).build();
} catch (Exception e) {
log.error("Exception while fetching data from DDB: " + e.getMessage(), e);
throw new OpenSearchException(e);
}
}), executor);
}

@Override
public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDataObjectRequest request, Executor executor) {
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
.tableName(getTableName(request.index()))
.key(Map.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(request.id()).build())
)).build();
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<DeleteDataObjectResponse>) () -> {
dynamoDbClient.deleteItem(deleteItemRequest);
return new DeleteDataObjectResponse.Builder().id(request.id()).deleted(true).build();
}), executor);
}

private String getTableName(String index) {
// Table name will be same as index name. As DDB table name does not support dot(.)
// it will be removed form name.
return index.replaceAll("\\.", "");
}
}
Loading

0 comments on commit 0e350f8

Please sign in to comment.