Skip to content

Commit

Permalink
AWS DDB SDK client support for remote data store
Browse files Browse the repository at this point in the history
Signed-off-by: Arjun kumar Giri <[email protected]>
  • Loading branch information
arjunkumargiri committed Jun 13, 2024
1 parent 0e350f8 commit 1a2f244
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 184 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.ml.sdkclient;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

import org.opensearch.OpenSearchException;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -17,23 +32,20 @@
import org.opensearch.sdk.PutDataObjectRequest;
import org.opensearch.sdk.PutDataObjectResponse;
import org.opensearch.sdk.SdkClient;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

/**
* DDB implementation of {@link SdkClient}. DDB table name will be mapped to index name.
*
*/
@AllArgsConstructor
@Log4j2
public class DDBOpenSearchClient implements SdkClient {
Expand All @@ -45,6 +57,13 @@ public class DDBOpenSearchClient implements SdkClient {
private static final String SOURCE = "source";

private DynamoDbClient dynamoDbClient;

/**
* DDB implementation to write data objects to DDB table. Tenant ID will be used as hash key and document ID will
* be used as range key. If tenant ID is not defined a default tenant ID will be used. If document ID is not defined
* a random UUID will be generated. Data object will be written as a nested DDB attribute.
*
*/
@Override
public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRequest request, Executor executor) {
final String id = request.id() != null ? request.id() : UUID.randomUUID().toString();
Expand All @@ -55,35 +74,41 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
XContentBuilder builder = request.dataObject().toXContent(sourceBuilder, ToXContent.EMPTY_PARAMS);
String source = builder.toString();

final Map<String, AttributeValue> item = Map.ofEntries(
final Map<String, AttributeValue> item = Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(id).build()),
Map.entry(SOURCE, AttributeValue.builder().s(source).build())
);
final PutItemRequest putItemRequest = PutItemRequest.builder()
.tableName(tableName)
.item(item)
.build();
);
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();

dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
} catch (Exception e){
dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
} catch (Exception e) {
log.error("Exception while inserting data into DDB: " + e.getMessage(), e);
throw new OpenSearchException(e);
}
}
}), executor);
}

/**
* Fetches data document from DDB. Default tenant ID will be used if tenant ID is not specified.
*
*/
@Override
public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRequest request, Executor executor) {
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final GetItemRequest getItemRequest = GetItemRequest.builder()
.tableName(getTableName(request.index()))
.key(Map.ofEntries(
final GetItemRequest getItemRequest = GetItemRequest
.builder()
.tableName(getTableName(request.index()))
.key(
Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(request.id()).build())
))
.build();
)
)
.build();
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<GetDataObjectResponse>) () -> {
try {
final GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest);
Expand All @@ -93,7 +118,7 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe

String source = getItemResponse.item().get(SOURCE).s();
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source);
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, source);
return new GetDataObjectResponse.Builder().id(request.id()).parser(Optional.of(parser)).build();
} catch (Exception e) {
log.error("Exception while fetching data from DDB: " + e.getMessage(), e);
Expand All @@ -102,15 +127,24 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
}), executor);
}

/**
* Deletes data document from DDB. Default tenant ID will be used if tenant ID is not specified.
*
*/
@Override
public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDataObjectRequest request, Executor executor) {
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
final DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
.tableName(getTableName(request.index()))
.key(Map.ofEntries(
final DeleteItemRequest deleteItemRequest = DeleteItemRequest
.builder()
.tableName(getTableName(request.index()))
.key(
Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(request.id()).build())
)).build();
)
)
.build();
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<DeleteDataObjectResponse>) () -> {
dynamoDbClient.deleteItem(deleteItemRequest);
return new DeleteDataObjectResponse.Builder().id(request.id()).deleted(true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,26 @@
*/
package org.opensearch.ml.sdkclient;

import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.client.RestClient;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.sdk.PutDataObjectResponse;
import org.opensearch.sdk.SdkClient;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;

import java.security.AccessController;
import java.security.PrivilegedAction;
import lombok.extern.log4j.Log4j2;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;

/**
* A module for binding this plugin's desired implementation of {@link SdkClient}.
Expand All @@ -49,33 +45,6 @@ public class SdkClientModule extends AbstractModule {
private final String remoteMetadataEndpoint;
private final String region; // not using with RestClient

static {
// Aws v2 sdk tries to load a default profile from home path which is restricted. Hence, setting these to random valid paths.
// @SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path")
if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) {
SocketAccess.doPrivileged(
() -> System.setProperty(
ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.property(),
System.getProperty("opensearch.path.conf")
)
);
}
if (ProfileFileSystemSetting.AWS_CONFIG_FILE.getStringValue().isEmpty()) {
SocketAccess.doPrivileged(
() -> System.setProperty(ProfileFileSystemSetting.AWS_CONFIG_FILE.property(), System.getProperty("opensearch.path.conf"))
);
}
}

private static final class SocketAccess {
private SocketAccess() {}

public static <T> T doPrivileged(PrivilegedAction<T> operation) {
SpecialPermission.check();
return AccessController.doPrivileged(operation);
}
}

/**
* Instantiate this module using environment variables
*/
Expand All @@ -91,11 +60,11 @@ public SdkClientModule() {
SdkClientModule(String remoteStoreType, String remoteMetadataEndpoint, String region) {
this.remoteStoreType = remoteStoreType;
this.remoteMetadataEndpoint = remoteMetadataEndpoint;
this.region = region == null ? "us-west-2" : region;
this.region = region;
}

@Override
protected void configure() {/*
protected void configure() {
if (this.remoteStoreType == null) {
log.info("Using local opensearch cluster as metadata store");
bind(SdkClient.class).to(LocalClusterIndicesClient.class);
Expand All @@ -114,18 +83,22 @@ protected void configure() {/*
default:
log.info("Using local opensearch cluster as metadata store");
bind(SdkClient.class).to(LocalClusterIndicesClient.class);
}*/
bind(SdkClient.class).toInstance(new DDBOpenSearchClient(createDynamoDbClient()));
}
}

private DynamoDbClient createDynamoDbClient() {
if (this.region == null) {
throw new IllegalStateException("REGION environment variable needs to be set!");
}

return DynamoDbClient.builder()
.region(Region.of(this.region))
.build();
AwsCredentialsProviderChain credentialsProviderChain = AwsCredentialsProviderChain
.builder()
.addCredentialsProvider(EnvironmentVariableCredentialsProvider.create())
.addCredentialsProvider(ContainerCredentialsProvider.builder().build())
.addCredentialsProvider(InstanceProfileCredentialsProvider.create())
.build();

return DynamoDbClient.builder().region(Region.of(this.region)).credentialsProvider(credentialsProviderChain).build();
}

private OpenSearchClient createOpenSearchClient() {
Expand Down
Loading

0 comments on commit 1a2f244

Please sign in to comment.