Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature/agent_framework] Changing resources created format #231

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private CommonValue() {}
public static final String INDEX_NAME = "index_name";
/** Type field */
public static final String TYPE = "type";
/** default_mapping_option filed */
public static final String DEFAULT_MAPPING_OPTION = "default_mapping_option";
/** ID Field */
public static final String ID = "id";
/** Pipeline Id field */
Expand All @@ -103,6 +105,8 @@ private CommonValue() {}
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_ID = "model_group_id";
/** Model Group Id field */
public static final String MODEL_GROUP_STATUS = "model_group_status";
/** Description field */
public static final String DESCRIPTION_FIELD = "description";
/** Connector Id field */
Expand Down Expand Up @@ -158,10 +162,10 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourceCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourceCreated's resource name */
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";
/** The field name for the step ID where a resource is created */
public static final String WORKFLOW_STEP_ID = "workflow_step_id";
/** LLM Name for registering an agent */
public static final String LLM_FIELD = "llm";
/** The tools' field for an agent */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Enum encapsulating the different step names and the resources they create
*/
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name");

private final String workflowStep;
private final String resourceCreated;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
*/
public String getWorkflowStep() {
return workflowStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* gets the resources created type based on the workflowStep
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getResourceCreated();
}
}
}
logger.error("Unable to find resource type for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
*/
public static Set<String> getAllResourcesCreated() {
return allResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -435,6 +438,7 @@ public void updateFlowFrameworkSystemIndexDoc(
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand Down Expand Up @@ -468,7 +472,8 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
updateRequest.retryOnConflict(3);
// TODO: Implement our own concurrency control to improve on retry mechanism
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
Expand All @@ -478,4 +483,38 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
}
}
}

/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId WorkflowData object with relevent step information
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
public void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceId);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
newResource.toXContent(builder, ToXContentObject.EMPTY_PARAMS);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
Collections.singletonMap("newResource", newResource)
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, exception -> { listener.onFailure(exception); }));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.io.IOException;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME;

/**
Expand All @@ -27,16 +32,21 @@
// TODO: create an enum to add the resource name itself for each step example (create_connector_step -> connector)
public class ResourceCreated implements ToXContentObject, Writeable {

private static final Logger logger = LogManager.getLogger(ResourceCreated.class);

private final String workflowStepName;
private final String workflowStepId;
private final String resourceId;

/**
* Create this resources created object with given resource name and ID.
* Create this resources created object with given workflow step name, ID and resource ID.
* @param workflowStepName The workflow step name associating to the step where it was created
* @param workflowStepId The workflow step ID associating to the step where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourceCreated(String workflowStepName, String resourceId) {
public ResourceCreated(String workflowStepName, String workflowStepId, String resourceId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we keep the resource type together with its ID? In order to know the meaning of the resource ID here I need to know its step. This way forces me to look it up from the enum (assuming a 1:1 mapping which I'm not sure of).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by resource type here? the field of connector_id will be the key here for resourceId. Since we wanted to have a dynamic key it means it could be anything so I label it as resourceId here

this.workflowStepName = workflowStepName;
this.workflowStepId = workflowStepId;
this.resourceId = resourceId;
}

Expand All @@ -47,20 +57,23 @@ public ResourceCreated(String workflowStepName, String resourceId) {
*/
public ResourceCreated(StreamInput input) throws IOException {
this.workflowStepName = input.readString();
this.workflowStepId = input.readString();
this.resourceId = input.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(RESOURCE_ID_FIELD, resourceId);
.field(WORKFLOW_STEP_ID, workflowStepId)
.field(WorkflowResources.getResourceByWorkflowStep(workflowStepName), resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowStepName);
out.writeString(workflowStepId);
out.writeString(resourceId);
}

Expand All @@ -82,6 +95,15 @@ public String workflowStepName() {
return workflowStepName;
}

/**
* Gets the workflow step id associated to the created resource
*
* @return the workflowStepId.
*/
public String workflowStepId() {
return workflowStepId;
}

/**
* Parse raw JSON content into a ResourceCreated instance.
*
Expand All @@ -91,6 +113,7 @@ public String workflowStepName() {
*/
public static ResourceCreated parse(XContentParser parser) throws IOException {
String workflowStepName = null;
String workflowStepId = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
Expand All @@ -102,22 +125,50 @@ public static ResourceCreated parse(XContentParser parser) throws IOException {
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
case WORKFLOW_STEP_ID:
workflowStepId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
if (!isValidFieldName(fieldName)) {
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (fieldName.equals(WorkflowResources.getResourceByWorkflowStep(workflowStepName))) {
resourceId = parser.text();
}
break;
}
}
}
if (workflowStepName == null || resourceId == null) {
throw new IOException("A ResourceCreated object requires both a workflowStepName and resourceId.");
if (workflowStepName == null) {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Resource created object failed parsing: workflowStepName: {}", workflowStepName);
throw new FlowFrameworkException("A ResourceCreated object requires workflowStepName", RestStatus.BAD_REQUEST);
}
if (workflowStepId == null) {
logger.error("Resource created object failed parsing: workflowStepId: {}", workflowStepId);
throw new FlowFrameworkException("A ResourceCreated object requires workflowStepId", RestStatus.BAD_REQUEST);
}
if (resourceId == null) {
logger.error("Resource created object failed parsing: resourceId: {}", resourceId);
throw new FlowFrameworkException("A ResourceCreated object requires resourceId", RestStatus.BAD_REQUEST);
}
return new ResourceCreated(workflowStepName, resourceId);
return new ResourceCreated(workflowStepName, workflowStepId, resourceId);
}

private static boolean isValidFieldName(String fieldName) {
return (WORKFLOW_STEP_NAME.equals(fieldName)
|| WORKFLOW_STEP_ID.equals(fieldName)
|| WorkflowResources.getAllResourcesCreated().contains(fieldName));
}

@Override
public String toString() {
return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]";
return "resources_Created [workflow_step_name= "
+ workflowStepName
+ ", workflow_step_id= "
+ workflowStepName
+ ", resource_id= "
+ resourceId
+ "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the an object of workflow steps json which maps each step to expected inputs and outputs
* This represents an object of workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowStepValidator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage()); })
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); })
);
} catch (Exception ex) {
logger.error("Provisioning failed for workflow {} : {}", workflowId, ex);
logger.error("Provisioning failed for workflow: {}", workflowId, ex);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
ImmutableMap.of(
Expand All @@ -235,7 +235,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage()); })
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage(), ex); })
);
}
}
Expand Down
Loading
Loading