Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature/agent_framework] Changing resources created format #231

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ private CommonValue() {}
public static final String INDEX_NAME = "index_name";
/** Type field */
public static final String TYPE = "type";
/** default_mapping_option filed */
public static final String DEFAULT_MAPPING_OPTION = "default_mapping_option";
/** ID Field */
public static final String ID = "id";
/** Pipeline Id field */
Expand All @@ -103,6 +105,8 @@ private CommonValue() {}
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_ID = "model_group_id";
/** Model Group Id field */
public static final String MODEL_GROUP_STATUS = "model_group_status";
/** Description field */
public static final String DESCRIPTION_FIELD = "description";
/** Connector Id field */
Expand Down Expand Up @@ -158,10 +162,10 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourceCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourceCreated's resource name */
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";
/** The field name for the step ID where a resource is created */
public static final String WORKFLOW_STEP_ID = "workflow_step_id";
/** LLM Name for registering an agent */
public static final String LLM_FIELD = "llm";
/** The tools' field for an agent */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Enum encapsulating the different step names and the resources they create
*/
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name");

private final String workflowStep;
private final String resourceCreated;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
*/
public String getWorkflowStep() {
return workflowStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* gets the resources created type based on the workflowStep
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws IOException if workflow step doesn't exist in enum
*/
public static String getResourceByWorkflowStep(String workflowStep) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we will only ever create a single resource in a given step? Why not return a list here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I haven't seen a step that returns multiple resources. It makes it a little more complicated if we have to do a mapping of step 1 creates two fields "connector_id" and "model_id" for example and then we map each ID to the correct one when updating. If you think its likely we will have a step like this I can change the logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add this with my PR for converting the workflow-step.json to an enum if that is okay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried this out and there are some things to still decide on and I can expand in an issue. Basically making sure we map the right resource key to the right resource value could be a little tricky. For example if we have a list of resourcesCreatedTypes like (“connector_id”, “model_id”, “detector_id”). We need to create a map with each one of those as the key and to know which resource to get but that is arbitrary on how the resource looks like in the response.

 List<String> resourceNames = WorkflowResources.getResourcesByWorkflowStep(getName());
 Map<String, Object> map = resourceNames.stream()
            .collect(Collectors.toMap(
                    key -> key,                       // Key Mapper: Use the list item as the key.
                    value -> {//getting correct corresponding value might not be straightforward cause we would}
                        // a mapping of each possible resource to the correct method for example"
                        // "connector_id" -> mlCreateConnectorResponse.getConnectorId()
                        // "detector" -> however that client defines it as and etc. 
            ));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @dbwiddis here. We should return a list rather a single resource. Follow up PR should be fine.

Copy link
Member

@owaiskazi19 owaiskazi19 Dec 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amitgalitz why not create a global map here like we had List<WorkflowData> in the execute method, name it as Map<String, ResourcesCreated> and store the resources of a step in it as a new entry to the list.

This way with status API, we need to just iterate on the map which would have key as workflow_step_name and values as the resources created. It will be easy with the new Util method @dbwiddis has here #234

if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getResourceCreated();
}
}
}
logger.error("Unable to find resource type for step: " + workflowStep);
throw new IOException("Unable to find resource type for step: " + workflowStep);
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
*/
public static Set<String> getAllResourcesCreated() {
return allResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -435,6 +438,7 @@ public void updateFlowFrameworkSystemIndexDoc(
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand Down Expand Up @@ -468,7 +472,8 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
updateRequest.retryOnConflict(5);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Implement our own concurrency control to improve on retry mechanism
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
Expand All @@ -478,4 +483,38 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
}
}
}

/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId WorkflowData object with relevent step information
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
public void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceId);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
newResource.toXContent(builder, ToXContentObject.EMPTY_PARAMS);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
Collections.singletonMap("newResource", newResource)
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, exception -> { listener.onFailure(exception); }));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.common.WorkflowResources;

import java.io.IOException;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME;

/**
Expand All @@ -27,16 +30,21 @@
// TODO: create an enum to add the resource name itself for each step example (create_connector_step -> connector)
public class ResourceCreated implements ToXContentObject, Writeable {

private static final Logger logger = LogManager.getLogger(ResourceCreated.class);

private final String workflowStepName;
private final String workflowStepId;
private final String resourceId;

/**
* Create this resources created object with given resource name and ID.
* Create this resources created object with given workflow step name, ID and resource ID.
* @param workflowStepName The workflow step name associating to the step where it was created
* @param workflowStepId The workflow step ID associating to the step where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourceCreated(String workflowStepName, String resourceId) {
public ResourceCreated(String workflowStepName, String workflowStepId, String resourceId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we keep the resource type together with its ID? In order to know the meaning of the resource ID here I need to know its step. This way forces me to look it up from the enum (assuming a 1:1 mapping which I'm not sure of).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by resource type here? the field of connector_id will be the key here for resourceId. Since we wanted to have a dynamic key it means it could be anything so I label it as resourceId here

this.workflowStepName = workflowStepName;
this.workflowStepId = workflowStepId;
this.resourceId = resourceId;
}

Expand All @@ -47,20 +55,23 @@ public ResourceCreated(String workflowStepName, String resourceId) {
*/
public ResourceCreated(StreamInput input) throws IOException {
this.workflowStepName = input.readString();
this.workflowStepId = input.readString();
this.resourceId = input.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(RESOURCE_ID_FIELD, resourceId);
.field(WORKFLOW_STEP_ID, workflowStepId)
.field(WorkflowResources.getResourceByWorkflowStep(workflowStepName), resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowStepName);
out.writeString(workflowStepId);
out.writeString(resourceId);
}

Expand All @@ -82,6 +93,15 @@ public String workflowStepName() {
return workflowStepName;
}

/**
* Gets the workflow step id associated to the created resource
*
* @return the workflowStepId.
*/
public String workflowStepId() {
return workflowStepId;
}

/**
* Parse raw JSON content into a ResourceCreated instance.
*
Expand All @@ -91,6 +111,7 @@ public String workflowStepName() {
*/
public static ResourceCreated parse(XContentParser parser) throws IOException {
String workflowStepName = null;
String workflowStepId = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
Expand All @@ -102,22 +123,50 @@ public static ResourceCreated parse(XContentParser parser) throws IOException {
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
case WORKFLOW_STEP_ID:
workflowStepId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
if (!isValidFieldName(fieldName)) {
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
} else {
if (fieldName.equals(WorkflowResources.getResourceByWorkflowStep(workflowStepName))) {
resourceId = parser.text();
}
break;
}
}
}
if (workflowStepName == null || resourceId == null) {
throw new IOException("A ResourceCreated object requires both a workflowStepName and resourceId.");
if (workflowStepName == null) {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Resource created object failed parsing: workflowStepName: {}", workflowStepName);
throw new IOException("A ResourceCreated object requires workflowStepName");
}
if (workflowStepId == null) {
logger.error("Resource created object failed parsing: workflowStepId: {}", workflowStepId);
throw new IOException("A ResourceCreated object requires workflowStepId");
}
if (resourceId == null) {
logger.error("Resource created object failed parsing: resourceId: {}", resourceId);
throw new IOException("A ResourceCreated object requires resourceId");
}
return new ResourceCreated(workflowStepName, resourceId);
return new ResourceCreated(workflowStepName, workflowStepId, resourceId);
}

private static boolean isValidFieldName(String fieldName) {
return (WORKFLOW_STEP_NAME.equals(fieldName)
|| WORKFLOW_STEP_ID.equals(fieldName)
|| WorkflowResources.getAllResourcesCreated().contains(fieldName));
}

@Override
public String toString() {
return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]";
return "resources_Created [workflow_step_name= "
+ workflowStepName
+ ", workflow_step_id= "
+ workflowStepName
+ ", resource_id= "
+ resourceId
+ "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the an object of workflow steps json which maps each step to expected inputs and outputs
* This represents an object of workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowStepValidator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage()); })
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); })
);
} catch (Exception ex) {
logger.error("Provisioning failed for workflow {} : {}", workflowId, ex);
logger.error("Provisioning failed for workflow: {}", workflowId, ex);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
ImmutableMap.of(
Expand All @@ -235,7 +235,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", workflowId, State.COMPLETED);
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage()); })
}, exceptionState -> { logger.error("Failed to update workflow state : {}", exceptionState.getMessage(), ex); })
);
}
}
Expand Down
Loading