Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK DDB client update and search interface implementation #2590

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.ml.sdkclient.util.JsonTransformer;
import org.opensearch.sdk.DeleteDataObjectRequest;
import org.opensearch.sdk.DeleteDataObjectResponse;
import org.opensearch.sdk.GetDataObjectRequest;
Expand All @@ -38,30 +39,46 @@
import org.opensearch.sdk.UpdateDataObjectRequest;
import org.opensearch.sdk.UpdateDataObjectResponse;

import lombok.AllArgsConstructor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.extern.log4j.Log4j2;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

/**
* DDB implementation of {@link SdkClient}. DDB table name will be mapped to index name.
*
*/
@AllArgsConstructor
@Log4j2
public class DDBOpenSearchClient implements SdkClient {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String DEFAULT_TENANT = "DEFAULT_TENANT";

private static final String HASH_KEY = "tenant_id";
private static final String RANGE_KEY = "id";
private static final String SOURCE = "source";

private DynamoDbClient dynamoDbClient;
private RemoteClusterIndicesClient remoteClusterIndicesClient;

/**
* Default constructor
*
* @param dynamoDbClient AWS DDB client to perform CRUD operations on a DDB table.
* @param remoteClusterIndicesClient Remote opensearch client to perform search operations. Documents written to DDB
* needs to be synced offline with remote opensearch.
*/
public DDBOpenSearchClient(DynamoDbClient dynamoDbClient, RemoteClusterIndicesClient remoteClusterIndicesClient) {
arjunkumargiri marked this conversation as resolved.
Show resolved Hide resolved
this.dynamoDbClient = dynamoDbClient;
this.remoteClusterIndicesClient = remoteClusterIndicesClient;
}

/**
* DDB implementation to write data objects to DDB table. Tenant ID will be used as hash key and document ID will
Expand All @@ -76,16 +93,18 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
final String tableName = getTableName(request.index());
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<PutDataObjectResponse>) () -> {
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
final Map<String, AttributeValue> item = Map
.ofEntries(
Map.entry(HASH_KEY, AttributeValue.builder().s(tenantId).build()),
Map.entry(RANGE_KEY, AttributeValue.builder().s(id).build()),
Map.entry(SOURCE, AttributeValue.builder().s(source).build())
);
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> item = JsonTransformer.convertJsonObjectToItem(jsonNode);
item.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
item.put(RANGE_KEY, AttributeValue.builder().s(id).build());
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();

dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
dynamoDbClient.putItem(putItemRequest);
return new PutDataObjectResponse.Builder().id(id).created(true).build();
} catch (IOException e) {
throw new OpenSearchStatusException("Failed to parse data object " + request.id(), RestStatus.BAD_REQUEST);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if parsing data object is fine, but indexing failed for some other reason may be DDB was not reachable, or throttled or some other reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDB would through runtime exception which will not be wrapped.

}
}), executor);
}

Expand All @@ -110,15 +129,16 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<GetDataObjectResponse>) () -> {
try {
final GetItemResponse getItemResponse = dynamoDbClient.getItem(getItemRequest);
String source;
ObjectNode sourceObject;
boolean found;
if (getItemResponse == null || getItemResponse.item() == null || getItemResponse.item().isEmpty()) {
found = false;
source = null;
sourceObject = null;
} else {
found = true;
source = getItemResponse.item().get(SOURCE).s();
sourceObject = JsonTransformer.convertToObjectNode((getItemResponse.item()));
}
final String source = OBJECT_MAPPER.writeValueAsString(sourceObject);
String simulatedGetResponse = "{\"_index\":\""
+ request.index()
+ "\",\"_id\":\""
Expand All @@ -128,6 +148,7 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
+ ",\"_source\":"
+ source
+ "}";

XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, simulatedGetResponse);
// This would consume parser content so we need to create a new parser for the map
Expand All @@ -145,10 +166,37 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
}), executor);
}

/**
* Makes use of DDB update request to update data object.
*
*/
@Override
public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDataObjectRequest request, Executor executor) {
// TODO: Implement update
return null;
final String tenantId = request.tenantId() != null ? request.tenantId() : DEFAULT_TENANT;
return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction<UpdateDataObjectResponse>) () -> {
try {
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> updateItem = JsonTransformer.convertJsonObjectToItem(jsonNode);
updateItem.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
updateItem.put(RANGE_KEY, AttributeValue.builder().s(request.id()).build());
UpdateItemRequest updateItemRequest = UpdateItemRequest
.builder()
.tableName(getTableName(request.index()))
.key(updateItem)
.build();
dynamoDbClient.updateItem(updateItemRequest);

return new UpdateDataObjectResponse.Builder().id(request.id()).shardId(request.index()).updated(true).build();
} catch (IOException e) {
log.error("Error updating {} in {}: {}", request.id(), request.index(), e.getMessage(), e);
// Rethrow unchecked exception on update IOException
throw new OpenSearchStatusException(
"Parsing error updating data object " + request.id() + " in index " + request.index(),
RestStatus.BAD_REQUEST
);
}
}), executor);
}

/**
Expand All @@ -175,16 +223,23 @@ public CompletionStage<DeleteDataObjectResponse> deleteDataObjectAsync(DeleteDat
}), executor);
}

/**
* DDB data needs to be synced with opensearch cluster. {@link RemoteClusterIndicesClient} will then be used to
* search data in opensearch cluster.
Comment on lines +227 to +228
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education, what's happening here?

Are we writing to DDB and reading from OS for all "fetch" operations or are we only syncing up for search?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDBOpenSearchClient will make use of DDB for CRUD operations. Data written to DDB will be synced with OpenSearch using offline integration between DDB and opensearch. SDKClient will then rely on opensearch only for search operations.

*
* @param request
* @param executor
* @return Search data object response
*/
@Override
public CompletionStage<SearchDataObjectResponse> searchDataObjectAsync(SearchDataObjectRequest request, Executor executor) {
// TODO will implement this later.

return null;
return this.remoteClusterIndicesClient.searchDataObjectAsync(request, executor);
}

private String getTableName(String index) {
// Table name will be same as index name. As DDB table name does not support dot(.)
// it will be removed from name.
return index.replaceAll("\\.", "");
}

arjunkumargiri marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected void configure() {
return;
case AWS_DYNAMO_DB:
log.info("Using dynamo DB as metadata store");
bind(SdkClient.class).toInstance(new DDBOpenSearchClient(createDynamoDbClient()));
bind(SdkClient.class)
.toInstance(new DDBOpenSearchClient(createDynamoDbClient(), new RemoteClusterIndicesClient(createOpenSearchClient())));
arjunkumargiri marked this conversation as resolved.
Show resolved Hide resolved
return;
default:
log.info("Using local opensearch cluster as metadata store");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ml.sdkclient.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class JsonTransformer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@VisibleForTesting
public static Map<String, AttributeValue> convertJsonObjectToItem(JsonNode jsonNode) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convertJsonObjectToDDbAttributeMap? as this method is only taking care of Dynamodb? Same goes for the other method name.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the class too? JsonTransformer sounds too generic in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update method name, however would like to keep the name generic to support other json transformation functions in future

Map<String, AttributeValue> item = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();

while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (field.getValue().isTextual()) {
item.put(field.getKey(), AttributeValue.builder().s(field.getValue().asText()).build());
} else if (field.getValue().isNumber()) {
item.put(field.getKey(), AttributeValue.builder().n(field.getValue().asText()).build());
} else if (field.getValue().isBoolean()) {
item.put(field.getKey(), AttributeValue.builder().bool(field.getValue().asBoolean()).build());
} else if (field.getValue().isNull()) {
item.put(field.getKey(), AttributeValue.builder().nul(true).build());
} else if (field.getValue().isObject()) {
item.put(field.getKey(), AttributeValue.builder().m(convertJsonObjectToItem(field.getValue())).build());
} else if (field.getValue().isArray()) {
item.put(field.getKey(), AttributeValue.builder().l(convertJsonArrayToAttributeValueList(field.getValue())).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + field.getValue());
}
}

return item;
}

@VisibleForTesting
public static List<AttributeValue> convertJsonArrayToAttributeValueList(JsonNode jsonArray) {
List<AttributeValue> attributeValues = new ArrayList<>();

for (JsonNode element : jsonArray) {
if (element.isTextual()) {
attributeValues.add(AttributeValue.builder().s(element.asText()).build());
} else if (element.isNumber()) {
attributeValues.add(AttributeValue.builder().n(element.asText()).build());
} else if (element.isBoolean()) {
attributeValues.add(AttributeValue.builder().bool(element.asBoolean()).build());
} else if (element.isNull()) {
attributeValues.add(AttributeValue.builder().nul(true).build());
} else if (element.isObject()) {
attributeValues.add(AttributeValue.builder().m(convertJsonObjectToItem(element)).build());
} else if (element.isArray()) {
attributeValues.add(AttributeValue.builder().l(convertJsonArrayToAttributeValueList(element)).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + element);
}

}

return attributeValues;
}

public static ObjectNode convertToObjectNode(Map<String, AttributeValue> item) {
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();

item.forEach((key, value) -> {
switch (value.type()) {
case S:
objectNode.put(key, value.s());
break;
case N:
objectNode.put(key, value.n());
break;
case BOOL:
objectNode.put(key, value.bool());
break;
case L:
objectNode.put(key, convertToArrayNode(value.l()));
break;
case M:
objectNode.set(key, convertToObjectNode(value.m()));
break;
case NUL:
objectNode.putNull(key);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + value.type());
}
});

return objectNode;

}

public static ArrayNode convertToArrayNode(final List<AttributeValue> attributeValueList) {
ArrayNode arrayNode = OBJECT_MAPPER.createArrayNode();
attributeValueList.forEach(attribute -> {
switch (attribute.type()) {
case S:
arrayNode.add(attribute.s());
break;
case N:
arrayNode.add(attribute.n());
break;
case BOOL:
arrayNode.add(attribute.bool());
break;
case L:
arrayNode.add(convertToArrayNode(attribute.l()));
break;
case M:
arrayNode.add(convertToObjectNode(attribute.m()));
break;
case NUL:
arrayNode.add((JsonNode) null);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + attribute.type());
}
});
return arrayNode;

}
}
Loading
Loading