Skip to content

Commit

Permalink
Fixing bugs, changed GC index mapping so that template/compatibility …
Browse files Browse the repository at this point in the history
…versions are of type text, added GC template document readers/writers, modified tests. Still need to add test cases for the new readers/writers

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Oct 12, 2023
1 parent 1dac1ee commit afeb2b6
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand Down
239 changes: 239 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,228 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

/**
* Converts a template object into a Global Context document
* @param builder the XContentBuilder
* @param params the params
* @return the XContentBuilder
* @throws IOException
*/
public XContentBuilder toDocumentSource(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(NAME_FIELD, this.name);
xContentBuilder.field(DESCRIPTION_FIELD, this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase);
xContentBuilder.startArray(OPERATIONS_FIELD);
for (String op : this.operations) {
xContentBuilder.value(op);
}
xContentBuilder.endArray();

if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) {
xContentBuilder.startObject(VERSION_FIELD);
if (this.templateVersion != null) {
xContentBuilder.field(TEMPLATE_FIELD, this.templateVersion);
}
if (!this.compatibilityVersion.isEmpty()) {
xContentBuilder.startArray(COMPATIBILITY_FIELD);
for (Version v : this.compatibilityVersion) {
xContentBuilder.value(v);
}
xContentBuilder.endArray();
}
xContentBuilder.endObject();
}

if (!this.userInputs.isEmpty()) {
xContentBuilder.startObject(USER_INPUTS_FIELD);
for (Entry<String, Object> e : userInputs.entrySet()) {
xContentBuilder.field(e.getKey(), e.getValue());
}
xContentBuilder.endObject();
}

try (XContentBuilder workflowBuilder = JsonXContent.contentBuilder()) {
workflowBuilder.startObject();
for (Entry<String, Workflow> e : workflows.entrySet()) {
workflowBuilder.field(e.getKey(), e.getValue());
}
workflowBuilder.endObject();
xContentBuilder.field(WORKFLOWS_FIELD, workflowBuilder.toString());
}

try (XContentBuilder userOutputsBuilder = JsonXContent.contentBuilder()) {
userOutputsBuilder.startObject();
for (Entry<String, Object> e : userOutputs.entrySet()) {
userOutputsBuilder.field(e.getKey(), e.getValue());
}
userOutputsBuilder.endObject();
xContentBuilder.field(USER_OUTPUTS_FIELD, userOutputsBuilder.toString());
}

try (XContentBuilder resourcesCreatedBuilder = JsonXContent.contentBuilder()) {
resourcesCreatedBuilder.startObject();
for (Entry<String, Object> e : resourcesCreated.entrySet()) {
resourcesCreatedBuilder.field(e.getKey(), e.getValue());
}
resourcesCreatedBuilder.endObject();
xContentBuilder.field(RESOURCES_CREATED_FIELD, resourcesCreatedBuilder.toString());
}

xContentBuilder.endObject();

return xContentBuilder;

}

/**
* Parse global context document source into a Template instance
*
* @param documentSource the document source string
* @return an instance of the template
* @throws IOException if content can't be parsed correctly
*/
public static Template parseFromDocumentSource(String documentSource) throws IOException {
XContentParser parser = jsonToParser(documentSource);

String name = null;
String description = "";
String useCase = "";
List<String> operations = new ArrayList<>();
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Object> userInputs = new HashMap<>();
Map<String, Workflow> workflows = new HashMap<>();
Map<String, Object> userOutputs = new HashMap<>();
Map<String, Object> resourcesCreated = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case NAME_FIELD:
name = parser.text();
break;
case DESCRIPTION_FIELD:
description = parser.text();
break;
case USE_CASE_FIELD:
useCase = parser.text();
break;
case OPERATIONS_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
operations.add(parser.text());
}
break;
case VERSION_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String versionFieldName = parser.currentName();
parser.nextToken();
switch (versionFieldName) {
case TEMPLATE_FIELD:
templateVersion = Version.fromString(parser.text());
break;
case COMPATIBILITY_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
compatibilityVersion.add(Version.fromString(parser.text()));
}
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a version object.");
}
}
break;
case USER_INPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String inputFieldName = parser.currentName();
switch (parser.nextToken()) {
case VALUE_STRING:
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
userInputs.put(inputFieldName, parseStringToStringMap(parser));
break;
default:
throw new IOException("Unable to parse field [" + inputFieldName + "] in a user inputs object.");
}
}
break;
case WORKFLOWS_FIELD:
String workflowsJson = parser.text();
XContentParser workflowsParser = jsonToParser(workflowsJson);
while (workflowsParser.nextToken() != XContentParser.Token.END_OBJECT) {
String workflowFieldName = workflowsParser.currentName();
workflowsParser.nextToken();
workflows.put(workflowFieldName, Workflow.parse(workflowsParser));
}
break;
case USER_OUTPUTS_FIELD:

String userOutputsJson = parser.text();
XContentParser userOuputsParser = jsonToParser(userOutputsJson);
while (userOuputsParser.nextToken() != XContentParser.Token.END_OBJECT) {
String userOutputsFieldName = userOuputsParser.currentName();
switch (userOuputsParser.nextToken()) {
case VALUE_STRING:
userOutputs.put(userOutputsFieldName, userOuputsParser.text());
break;
case START_OBJECT:
userOutputs.put(userOutputsFieldName, parseStringToStringMap(userOuputsParser));
break;
default:
throw new IOException("Unable to parse field [" + userOutputsFieldName + "] in a user_outputs object.");
}
}
break;

case RESOURCES_CREATED_FIELD:

String resourcesCreatedJson = parser.text();
XContentParser resourcesCreatedParser = jsonToParser(resourcesCreatedJson);
while (resourcesCreatedParser.nextToken() != XContentParser.Token.END_OBJECT) {
String resourcesCreatedField = resourcesCreatedParser.currentName();
switch (resourcesCreatedParser.nextToken()) {
case VALUE_STRING:
resourcesCreated.put(resourcesCreatedField, resourcesCreatedParser.text());
break;
case START_OBJECT:
resourcesCreated.put(resourcesCreatedField, parseStringToStringMap(resourcesCreatedParser));
break;
default:
throw new IOException(
"Unable to parse field [" + resourcesCreatedField + "] in a resources_created object."
);
}
}
break;

default:
throw new IOException("Unable to parse field [" + fieldName + "] in a template object.");
}
}
if (name == null) {
throw new IOException("An template object requires a name.");
}

return new Template(
name,
description,
useCase,
operations,
templateVersion,
compatibilityVersion,
userInputs,
workflows,
userOutputs,
resourcesCreated
);
}

/**
* Parse raw json content into a Template instance.
*
Expand Down Expand Up @@ -300,6 +522,23 @@ public static Template parse(XContentParser parser) throws IOException {
);
}

/**
* Converts a JSON string into an XContentParser
*
* @param json the json string
* @return The XContent parser for the json string
* @throws IOException on failure to create the parser
*/
public static XContentParser jsonToParser(String json) throws IOException {
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
json
);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
return parser;
}

/**
* Parse a JSON use case template
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
context.restore();

// Parse template from document source
Template template = Template.parse(response.getSourceAsString());
Template template = Template.parseFromDocumentSource(response.getSourceAsString());

// TODO : Update state index entry to PROVISIONING, given workflowId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void setUp() throws Exception {

public void testPutTemplateToGlobalContext() throws IOException {
Template template = mock(Template.class);
when(template.toXContent(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> {
when(template.toDocumentSource(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> {
XContentBuilder builder = invocation.getArgument(0);
return builder;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testProvisionWorkflow() {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
this.template.toXContent(builder, ToXContent.EMPTY_PARAMS);
this.template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS);
BytesReference templateBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(GLOBAL_CONTEXT_INDEX, workflowId, 1, 1, 1, true, templateBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
Expand Down

0 comments on commit afeb2b6

Please sign in to comment.