Skip to content

Commit

Permalink
Added Json transformer utility
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunkumargiri committed Jun 26, 2024
1 parent 27e0e49 commit 6bc3621
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -30,6 +26,7 @@
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.ml.sdkclient.util.JsonTransformer;
import org.opensearch.sdk.DeleteDataObjectRequest;
import org.opensearch.sdk.DeleteDataObjectResponse;
import org.opensearch.sdk.GetDataObjectRequest;
Expand All @@ -44,9 +41,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;

import lombok.extern.log4j.Log4j2;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand All @@ -73,6 +68,13 @@ public class DDBOpenSearchClient implements SdkClient {
private DynamoDbClient dynamoDbClient;
private RemoteClusterIndicesClient remoteClusterIndicesClient;

/**
* Default constructor
*
* @param dynamoDbClient AWS DDB client to perform CRUD operations on a DDB table.
* @param remoteClusterIndicesClient Remote opensearch client to perform search operations. Documents written to DDB
* needs to be synced offline with remote opensearch.
*/
public DDBOpenSearchClient(DynamoDbClient dynamoDbClient, RemoteClusterIndicesClient remoteClusterIndicesClient) {
this.dynamoDbClient = dynamoDbClient;
this.remoteClusterIndicesClient = remoteClusterIndicesClient;
Expand All @@ -93,7 +95,7 @@ public CompletionStage<PutDataObjectResponse> putDataObjectAsync(PutDataObjectRe
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> item = convertJsonObjectToItem(jsonNode);
Map<String, AttributeValue> item = JsonTransformer.convertJsonObjectToItem(jsonNode);
item.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
item.put(RANGE_KEY, AttributeValue.builder().s(id).build());
final PutItemRequest putItemRequest = PutItemRequest.builder().tableName(tableName).item(item).build();
Expand Down Expand Up @@ -134,9 +136,8 @@ public CompletionStage<GetDataObjectResponse> getDataObjectAsync(GetDataObjectRe
sourceObject = null;
} else {
found = true;
sourceObject = convertToObjectNode((getItemResponse.item()));
sourceObject = JsonTransformer.convertToObjectNode((getItemResponse.item()));
}

final String source = OBJECT_MAPPER.writeValueAsString(sourceObject);
String simulatedGetResponse = "{\"_index\":\""
+ request.index()
Expand Down Expand Up @@ -176,7 +177,7 @@ public CompletionStage<UpdateDataObjectResponse> updateDataObjectAsync(UpdateDat
try {
String source = Strings.toString(MediaTypeRegistry.JSON, request.dataObject());
JsonNode jsonNode = OBJECT_MAPPER.readTree(source);
Map<String, AttributeValue> updateItem = convertJsonObjectToItem(jsonNode);
Map<String, AttributeValue> updateItem = JsonTransformer.convertJsonObjectToItem(jsonNode);
updateItem.put(HASH_KEY, AttributeValue.builder().s(tenantId).build());
updateItem.put(RANGE_KEY, AttributeValue.builder().s(request.id()).build());
UpdateItemRequest updateItemRequest = UpdateItemRequest
Expand Down Expand Up @@ -241,121 +242,4 @@ private String getTableName(String index) {
return index.replaceAll("\\.", "");
}

@VisibleForTesting
static Map<String, AttributeValue> convertJsonObjectToItem(JsonNode jsonNode) {
Map<String, AttributeValue> item = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();

while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();

if (field.getValue().isTextual()) {
item.put(field.getKey(), AttributeValue.builder().s(field.getValue().asText()).build());
} else if (field.getValue().isNumber()) {
item.put(field.getKey(), AttributeValue.builder().n(field.getValue().asText()).build());
} else if (field.getValue().isBoolean()) {
item.put(field.getKey(), AttributeValue.builder().bool(field.getValue().asBoolean()).build());
} else if (field.getValue().isNull()) {
item.put(field.getKey(), AttributeValue.builder().nul(true).build());
} else if (field.getValue().isObject()) {
item.put(field.getKey(), AttributeValue.builder().m(convertJsonObjectToItem(field.getValue())).build());
} else if (field.getValue().isArray()) {
item.put(field.getKey(), AttributeValue.builder().l(convertJsonArrayToAttributeValueList(field.getValue())).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + field.getValue());
}
}

return item;
}

@VisibleForTesting
static List<AttributeValue> convertJsonArrayToAttributeValueList(JsonNode jsonArray) {
List<AttributeValue> attributeValues = new ArrayList<>();

for (JsonNode element : jsonArray) {
if (element.isTextual()) {
attributeValues.add(AttributeValue.builder().s(element.asText()).build());
} else if (element.isNumber()) {
attributeValues.add(AttributeValue.builder().n(element.asText()).build());
} else if (element.isBoolean()) {
attributeValues.add(AttributeValue.builder().bool(element.asBoolean()).build());
} else if (element.isNull()) {
attributeValues.add(AttributeValue.builder().nul(true).build());
} else if (element.isObject()) {
attributeValues.add(AttributeValue.builder().m(convertJsonObjectToItem(element)).build());
} else if (element.isArray()) {
attributeValues.add(AttributeValue.builder().l(convertJsonArrayToAttributeValueList(element)).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + element);
}

}

return attributeValues;
}

@VisibleForTesting
static ObjectNode convertToObjectNode(Map<String, AttributeValue> item) {
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();

item.forEach((key, value) -> {
switch (value.type()) {
case S:
objectNode.put(key, value.s());
break;
case N:
objectNode.put(key, value.n());
break;
case BOOL:
objectNode.put(key, value.bool());
break;
case L:
objectNode.put(key, convertToArrayNode(value.l()));
break;
case M:
objectNode.set(key, convertToObjectNode(value.m()));
break;
case NUL:
objectNode.putNull(key);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + value.type());
}
});

return objectNode;

}

@VisibleForTesting
static ArrayNode convertToArrayNode(final List<AttributeValue> attributeValueList) {
ArrayNode arrayNode = OBJECT_MAPPER.createArrayNode();
attributeValueList.forEach(attribute -> {
switch (attribute.type()) {
case S:
arrayNode.add(attribute.s());
break;
case N:
arrayNode.add(attribute.n());
break;
case BOOL:
arrayNode.add(attribute.bool());
break;
case L:
arrayNode.add(convertToArrayNode(attribute.l()));
break;
case M:
arrayNode.add(convertToObjectNode(attribute.m()));
break;
case NUL:
arrayNode.add((JsonNode) null);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + attribute.type());
}
});
return arrayNode;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public SdkClientModule() {
SdkClientModule(String remoteMetadataType, String remoteMetadataEndpoint, String region) {
this.remoteMetadataType = remoteMetadataType;
this.remoteMetadataEndpoint = remoteMetadataEndpoint;
this.region = region == null ? "us-west-2" : region;
this.region = region;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ml.sdkclient.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class JsonTransformer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@VisibleForTesting
public static Map<String, AttributeValue> convertJsonObjectToItem(JsonNode jsonNode) {
Map<String, AttributeValue> item = new HashMap<>();
Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields();

while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (field.getValue().isTextual()) {
item.put(field.getKey(), AttributeValue.builder().s(field.getValue().asText()).build());
} else if (field.getValue().isNumber()) {
item.put(field.getKey(), AttributeValue.builder().n(field.getValue().asText()).build());
} else if (field.getValue().isBoolean()) {
item.put(field.getKey(), AttributeValue.builder().bool(field.getValue().asBoolean()).build());
} else if (field.getValue().isNull()) {
item.put(field.getKey(), AttributeValue.builder().nul(true).build());
} else if (field.getValue().isObject()) {
item.put(field.getKey(), AttributeValue.builder().m(convertJsonObjectToItem(field.getValue())).build());
} else if (field.getValue().isArray()) {
item.put(field.getKey(), AttributeValue.builder().l(convertJsonArrayToAttributeValueList(field.getValue())).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + field.getValue());
}
}

return item;
}

@VisibleForTesting
public static List<AttributeValue> convertJsonArrayToAttributeValueList(JsonNode jsonArray) {
List<AttributeValue> attributeValues = new ArrayList<>();

for (JsonNode element : jsonArray) {
if (element.isTextual()) {
attributeValues.add(AttributeValue.builder().s(element.asText()).build());
} else if (element.isNumber()) {
attributeValues.add(AttributeValue.builder().n(element.asText()).build());
} else if (element.isBoolean()) {
attributeValues.add(AttributeValue.builder().bool(element.asBoolean()).build());
} else if (element.isNull()) {
attributeValues.add(AttributeValue.builder().nul(true).build());
} else if (element.isObject()) {
attributeValues.add(AttributeValue.builder().m(convertJsonObjectToItem(element)).build());
} else if (element.isArray()) {
attributeValues.add(AttributeValue.builder().l(convertJsonArrayToAttributeValueList(element)).build());
} else {
throw new IllegalArgumentException("Unsupported field type: " + element);
}

}

return attributeValues;
}

public static ObjectNode convertToObjectNode(Map<String, AttributeValue> item) {
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();

item.forEach((key, value) -> {
switch (value.type()) {
case S:
objectNode.put(key, value.s());
break;
case N:
objectNode.put(key, value.n());
break;
case BOOL:
objectNode.put(key, value.bool());
break;
case L:
objectNode.put(key, convertToArrayNode(value.l()));
break;
case M:
objectNode.set(key, convertToObjectNode(value.m()));
break;
case NUL:
objectNode.putNull(key);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + value.type());
}
});

return objectNode;

}

public static ArrayNode convertToArrayNode(final List<AttributeValue> attributeValueList) {
ArrayNode arrayNode = OBJECT_MAPPER.createArrayNode();
attributeValueList.forEach(attribute -> {
switch (attribute.type()) {
case S:
arrayNode.add(attribute.s());
break;
case N:
arrayNode.add(attribute.n());
break;
case BOOL:
arrayNode.add(attribute.bool());
break;
case L:
arrayNode.add(convertToArrayNode(attribute.l()));
break;
case M:
arrayNode.add(convertToObjectNode(attribute.m()));
break;
case NUL:
arrayNode.add((JsonNode) null);
break;
default:
throw new IllegalArgumentException("Unsupported AttributeValue type: " + attribute.type());
}
});
return arrayNode;

}
}
Loading

0 comments on commit 6bc3621

Please sign in to comment.