Skip to content

Commit

Permalink
addressed comments and added more fields to state index
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Oct 27, 2023
1 parent 818405e commit b334ca3
Show file tree
Hide file tree
Showing 18 changed files with 556 additions and 463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
Expand All @@ -30,34 +29,28 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.transport.WorkflowResponse;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.META;
import static org.opensearch.flowframework.common.CommonValue.NO_SCHEMA_VERSION;
import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.model.WorkflowState.WORKFLOW_ID_FIELD;

/**
* A handler for global context related operations
Expand Down Expand Up @@ -94,10 +87,18 @@ public static String getGlobalContextMappings() throws IOException {
return getIndexMappings(GLOBAL_CONTEXT_INDEX_MAPPING);
}

/**
* Create global context index if it's absent
* @param listener The action listener
*/
public void initGlobalContextIndexIfAbsent(ActionListener<Boolean> listener) {
initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener);
}

/**
* Create workflow state index if it's absent
* @param listener The action listener
*/
public void initWorkflowStateIndexIfAbsent(ActionListener<Boolean> listener) {
initFlowFrameworkIndexIfAbsent(FlowFrameworkIndex.WORKFLOW_STATE, listener);
}
Expand Down Expand Up @@ -253,7 +254,7 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand All @@ -269,13 +270,16 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
/**
* add document insert into global context index
* @param workflowId the workflowId, corresponds to document ID of
* @param user passes the user that created the workflow
* @param listener action listener
*/
public void putInitialStateToWorkflowState(String workflowId, User user, ActionListener<IndexResponse> listener) {
WorkflowState state = new WorkflowState.Builder().workflowId(workflowId)
.state(State.NOT_STARTED.name())
.provisioningProgress(ProvisioningProgress.NOT_STARTED.name())
.user(user)
.resourcesCreated(Collections.emptyMap())
.userOutputs(Collections.emptyMap())
.build();
initWorkflowStateIndexIfAbsent(ActionListener.wrap(indexCreated -> {
if (!indexCreated) {
Expand All @@ -289,6 +293,7 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL

) {
request.source(state.toXContent(builder, ToXContent.EMPTY_PARAMS)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
request.id(workflowId);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to put state index document", e);
Expand Down Expand Up @@ -320,7 +325,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
request.source(template.toDocumentSource(builder, ToXContent.EMPTY_PARAMS))
request.source(template.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand All @@ -336,72 +341,24 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
* @param updatedFields the fields to update the global state index with
* @param listener action listener
*/
public void updateWorkflowState(
String workflowStateDocId,
ThreadContext.StoredContext context,
Map<String, Object> updatedFields,
ActionListener<UpdateResponse> listener
) {
public void updateWorkflowState(String workflowStateDocId, Map<String, Object> updatedFields, ActionListener<UpdateResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String exceptionMessage = "Failed to update state for given workflow due to missing workflow_state index";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
} else {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowStateDocId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
}
}

public void getWorkflowStateID(String workflowId, ActionListener<String> listener) {
BoolQueryBuilder query = new BoolQueryBuilder();
query.filter(new TermQueryBuilder(WORKFLOW_ID_FIELD, workflowId));
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(query).size(1); // we are making the assumption there is only one document with this workflowID
searchRequest.source(sourceBuilder).indices(WORKFLOW_STATE_INDEX);
client.search(searchRequest, ActionListener.wrap(searchResponse -> {
if (searchResponse == null
|| searchResponse.getHits().getTotalHits() == null
|| !(searchResponse.getHits().getTotalHits().value == 1)) {
logger.error("There are either one or no workflow state documents with the same workflowID: " + workflowId);
listener.onFailure(new FlowFrameworkException("Workflow state cannot be updated", INTERNAL_SERVER_ERROR));
return;
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowStateDocId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update workflow_state entry : {}. {}", workflowStateDocId, e.getMessage());
listener.onFailure(e);
}
String stateWorkflowDocID = searchResponse.getHits().getHits()[0].getId();
listener.onResponse(stateWorkflowDocID);
}, exception -> {
logger.error("Failed to find workflow state for workflowID : {}. {}", workflowId, exception.getMessage());
listener.onFailure(new FlowFrameworkException("Failed to find workflow state for workflowID: " + workflowId, BAD_REQUEST));
}));
}

public void getAndUpdateWorkflowStateDoc(
String workflowId,
Map<String, Object> updatedFields,
ActionListener<WorkflowResponse> workflowResponseListener
) {
try {
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
getWorkflowStateID(workflowId, ActionListener.wrap(stateWorkflowId -> {
updateWorkflowState(stateWorkflowId, context, updatedFields, ActionListener.wrap(r -> {}, e -> {
logger.error("Failed to update workflow state : {}", e.getMessage());
workflowResponseListener.onFailure(
new FlowFrameworkException("Failed to update workflow state", RestStatus.BAD_REQUEST)
);
}));
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
workflowResponseListener.onFailure(new FlowFrameworkException("couldn't find workflow state", RestStatus.BAD_REQUEST));
}));
} catch (Exception e) {
logger.error("Failed to update workflow state : {}", e.getMessage());
workflowResponseListener.onFailure(new FlowFrameworkException("Failed to update workflow state", RestStatus.BAD_REQUEST));
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
*/
package org.opensearch.flowframework.model;

/**
* Enum relating to the provisioning progress
*/
// TODO: transfer this to more detailed array for each step
public enum ProvisioningProgress {
IN_PROGRESS,
DONE,
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.model;

/**
* Enum relating to the state of a workflow
*/
public enum State {
NOT_STARTED,
PROVISIONING,
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.yaml.YamlXContent;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -45,13 +46,16 @@ public class Template implements ToXContentObject {
public static final String COMPATIBILITY_FIELD = "compatibility";
/** The template field name for template workflows */
public static final String WORKFLOWS_FIELD = "workflows";
/** The template field name for the user who created the workflow **/
public static final String USER_FIELD = "user";

private final String name;
private final String description;
private final String useCase; // probably an ENUM actually
private final Version templateVersion;
private final List<Version> compatibilityVersion;
private final Map<String, Workflow> workflows;
private final User user;

/**
* Instantiate the object representing a use case template
Expand All @@ -62,21 +66,24 @@ public class Template implements ToXContentObject {
* @param templateVersion The version of this template
* @param compatibilityVersion OpenSearch version compatibility of this template
* @param workflows Workflow graph definitions corresponding to the defined operations.
* @param user The user extracted from the thread context from the request
*/
public Template(
String name,
String description,
String useCase,
Version templateVersion,
List<Version> compatibilityVersion,
Map<String, Workflow> workflows
Map<String, Workflow> workflows,
User user
) {
this.name = name;
this.description = description;
this.useCase = useCase;
this.templateVersion = templateVersion;
this.compatibilityVersion = List.copyOf(compatibilityVersion);
this.workflows = Map.copyOf(workflows);
this.user = user;
}

@Override
Expand Down Expand Up @@ -106,6 +113,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
xContentBuilder.field(e.getKey(), e.getValue(), params);
}
xContentBuilder.endObject();
if (user != null) {
xContentBuilder.field(USER_FIELD, user);
}

return xContentBuilder.endObject();
}
Expand All @@ -124,6 +134,7 @@ public static Template parse(XContentParser parser) throws IOException {
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Workflow> workflows = new HashMap<>();
User user = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -167,6 +178,9 @@ public static Template parse(XContentParser parser) throws IOException {
workflows.put(workflowFieldName, Workflow.parse(parser));
}
break;
case USER_FIELD:
user = User.parse(parser);
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a template object.");
}
Expand All @@ -175,7 +189,7 @@ public static Template parse(XContentParser parser) throws IOException {
throw new IOException("An template object requires a name.");
}

return new Template(name, description, useCase, templateVersion, compatibilityVersion, workflows);
return new Template(name, description, useCase, templateVersion, compatibilityVersion, workflows, user);
}

/**
Expand Down Expand Up @@ -271,6 +285,14 @@ public Map<String, Workflow> workflows() {
return workflows;
}

/**
* User that created and owns this template
* @return the user
*/
public User getUser() {
return user;
}

@Override
public String toString() {
return "Template [name="
Expand All @@ -285,6 +307,8 @@ public String toString() {
+ compatibilityVersion
+ ", workflows="
+ workflows
+ ", user="
+ user
+ "]";
}
}
Loading

0 comments on commit b334ca3

Please sign in to comment.