Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Aug 2, 2024
1 parent 839d41c commit 578e3f2
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 80 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ dependencies {
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.3"
implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1'
implementation "com.google.code.gson:gson:2.10.1"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,39 @@
public enum WorkflowResources {

/** Workflow steps for creating/deleting a connector and associated created resource */
CREATE_CONNECTOR(CreateConnectorStep.NAME, WorkflowResources.CONNECTOR_ID, DeleteConnectorStep.NAME, null),
CREATE_CONNECTOR(CreateConnectorStep.NAME, null, DeleteConnectorStep.NAME, WorkflowResources.CONNECTOR_ID),
/** Workflow steps for registering/deleting a remote model and associated created resource */
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, null),
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local model and associated created resource */
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, null),
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local sparse encoding model and associated created resource */
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, null),
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local OpenSearch provided pretrained model and associated created resource */
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME, null),
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a model group and associated created resource */
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, WorkflowResources.MODEL_GROUP_ID, NoOpStep.NAME, null),
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, null, NoOpStep.NAME, WorkflowResources.MODEL_GROUP_ID),
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME, null),
DEPLOY_MODEL(DeployModelStep.NAME, null, UndeployModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(
CreateIngestPipelineStep.NAME,
WorkflowResources.PIPELINE_ID,
UpdateIngestPipelineStep.NAME,
DeleteIngestPipelineStep.NAME,
UpdateIngestPipelineStep.NAME
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(
CreateSearchPipelineStep.NAME,
WorkflowResources.PIPELINE_ID,
UpdateSearchPipelineStep.NAME,
DeleteSearchPipelineStep.NAME,
UpdateSearchPipelineStep.NAME
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME, UpdateIndexStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, UpdateIndexStep.NAME, DeleteIndexStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME, null),
REINDEX(ReindexStep.NAME, null, NoOpStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME, null);
REGISTER_AGENT(RegisterAgentStep.NAME, null, DeleteAgentStep.NAME, WorkflowResources.AGENT_ID);

/** Connector Id for a remote model connector */
public static final String CONNECTOR_ID = "connector_id";
Expand All @@ -93,36 +93,37 @@ public enum WorkflowResources {
/** Agent Id */
public static final String AGENT_ID = "agent_id";

private final String workflowStep;
private final String resourceCreated;
private final String deprovisionStep;
private final String createStep;
private final String updateStep;
private final String deprovisionStep;
private final String resourceCreated;

private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep, String updateStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
this.deprovisionStep = deprovisionStep;
WorkflowResources(String createStep, String updateStep, String deprovisionStep, String resourceCreated) {
this.createStep = createStep;
this.updateStep = updateStep;
this.deprovisionStep = deprovisionStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
* Returns the create step for the given enum Constant
* @return the create step of this data.
*/
public String getWorkflowStep() {
return workflowStep;
public String getCreateStep() {
return createStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
* Returns the updateStep for the given enum Constant
* @return the updateStep of this data.
*/
public String getResourceCreated() {
return resourceCreated;
public String getUpdateStep() {
return updateStep;
}

/**
Expand All @@ -134,11 +135,11 @@ public String getDeprovisionStep() {
}

/**
* Returns the updateStep for the given enum Constant
* @return the updateStep of this data.
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getUpdateStep() {
return updateStep;
public String getResourceCreated() {
return resourceCreated;
}

/**
Expand All @@ -150,7 +151,7 @@ public String getUpdateStep() {
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (workflowStep.equals(mapping.getWorkflowStep())
if (workflowStep.equals(mapping.getCreateStep())
|| workflowStep.equals(mapping.getDeprovisionStep())
|| workflowStep.equals(mapping.getUpdateStep())) {
return mapping.getResourceCreated();
Expand All @@ -170,7 +171,7 @@ public static String getResourceByWorkflowStep(String workflowStep) throws FlowF
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
Expand All @@ -188,7 +189,7 @@ public static String getDeprovisionStepByWorkflowStep(String workflowStep) throw
public static String getUpdateStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getUpdateStep();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
if (reprovision && useCase != null) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {
Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
ActionListener.wrap(reprovisionResponse -> {
listener.onResponse(new WorkflowResponse(reprovisionResponse.getWorkflowId()));
}, exception -> {
String errorMessage = "Reprovisioning failed.";
String errorMessage = "Reprovisioning failed for workflow " + workflowId;
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,34 +111,40 @@ protected void doExecute(Task task, ReprovisionWorkflowRequest request, ActionLi

String workflowId = request.getWorkflowId();

// Original template is retrieved from index, attempt to decrypt any exisiting credentials before processing
Template originalTemplate = encryptorUtils.decryptTemplateCredentials(request.getOriginalTemplate());
Template updatedTemplate = request.getUpdatedTemplate();

// Validate updated template prior to execution
Workflow provisionWorkflow = updatedTemplate.workflows().get(PROVISION_WORKFLOW);
List<ProcessNode> updatedProcessSequence = workflowProcessSorter.sortProcessNodes(
provisionWorkflow,
request.getWorkflowId(),
Collections.emptyMap() // TODO : Add suport to reprovision substitution templates
);

try {
workflowProcessSorter.validate(updatedProcessSequence, pluginsService);
} catch (Exception e) {
String errormessage = "Workflow validation failed for workflow " + request.getWorkflowId();
listener.onFailure(new FlowFrameworkException(errormessage, RestStatus.BAD_REQUEST));
}

// Retrieve resources created
// Retrieve state and resources created
GetWorkflowStateRequest getStateRequest = new GetWorkflowStateRequest(workflowId, true);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
logger.info("Querying state for workflow: {}", workflowId);
client.execute(GetWorkflowStateAction.INSTANCE, getStateRequest, ActionListener.wrap(response -> {
context.restore();

if (!ProvisioningProgress.DONE.equals(ProvisioningProgress.valueOf(response.getWorkflowState().getState()))) {
String errorMessage = "The template can not be reprovisioned unless its provisioning state is DONE: " + workflowId;
throw new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
}

// Generate reprovision sequence
List<ResourceCreated> resourceCreated = response.getWorkflowState().resourcesCreated();

// Original template is retrieved from index, attempt to decrypt any exisiting credentials before processing
Template originalTemplate = encryptorUtils.decryptTemplateCredentials(request.getOriginalTemplate());
Template updatedTemplate = request.getUpdatedTemplate();

// Validate updated template prior to execution
Workflow provisionWorkflow = updatedTemplate.workflows().get(PROVISION_WORKFLOW);
List<ProcessNode> updatedProcessSequence = workflowProcessSorter.sortProcessNodes(
provisionWorkflow,
request.getWorkflowId(),
Collections.emptyMap() // TODO : Add suport to reprovision substitution templates
);

try {
workflowProcessSorter.validate(updatedProcessSequence, pluginsService);
} catch (Exception e) {
String errormessage = "Workflow validation failed for workflow " + request.getWorkflowId();
logger.error(errormessage, e);
listener.onFailure(new FlowFrameworkException(errormessage, RestStatus.BAD_REQUEST));
}
List<ProcessNode> reprovisionProcessSequence = workflowProcessSorter.createReprovisionSequence(
workflowId,
originalTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -52,6 +49,20 @@ protected AbstractUpdatePipelineStep(Client client) {
this.clusterAdminClient = client.admin().cluster();
}

/**
* Executes a put search or ingest pipeline request
* @param pipelineId the pipeline id
* @param configuration the pipeline configuration bytes
* @param clusterAdminClient the cluster admin client
* @param listener listener
*/
public abstract void executePutPipelineRequest(
String pipelineId,
BytesReference configuration,
ClusterAdminClient clusterAdminClient,
ActionListener<AcknowledgedResponse> listener
);

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
Expand Down Expand Up @@ -111,17 +122,7 @@ public void onFailure(Exception ex) {

};

if (pipelineToBeCreated.equals(UpdateSearchPipelineStep.NAME)) {
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest(
pipelineId,
configurationsBytes,
XContentType.JSON
);
clusterAdminClient.putSearchPipeline(putSearchPipelineRequest, putPipelineActionListener);
} else {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, putPipelineActionListener);
}
executePutPipelineRequest(pipelineId, configurationsBytes, clusterAdminClient, putPipelineActionListener);

} catch (FlowFrameworkException e) {
createPipelineFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class UpdateIndexStep implements WorkflowStep {
/**
* Instantiate this class
*
* @param client Client to create an index
* @param client Client to update an index
*/
public UpdateIndexStep(Client client) {
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;

/**
* Step to update an ingest pipeline
Expand All @@ -29,6 +35,17 @@ public UpdateIngestPipelineStep(Client client) {
super(client);
}

@Override
public void executePutPipelineRequest(
String pipelineId,
BytesReference configuration,
ClusterAdminClient clusterAdminClient,
ActionListener<AcknowledgedResponse> listener
) {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configuration, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, listener);
}

@Override
public String getName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;

/**
* Step to update a search pipeline
Expand All @@ -29,6 +35,17 @@ public UpdateSearchPipelineStep(Client client) {
super(client);
}

@Override
public void executePutPipelineRequest(
String pipelineId,
BytesReference configuration,
ClusterAdminClient clusterAdminClient,
ActionListener<AcknowledgedResponse> listener
) {
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest(pipelineId, configuration, XContentType.JSON);
clusterAdminClient.putSearchPipeline(putSearchPipelineRequest, listener);
}

@Override
public String getName() {
return NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void setUp() throws Exception {
}

public void testParseFeature() throws IOException {
String workflowStepName = CREATE_CONNECTOR.getWorkflowStep();
String workflowStepName = CREATE_CONNECTOR.getCreateStep();
String resourceType = getResourceByWorkflowStep(workflowStepName);
ResourceCreated resourceCreated = new ResourceCreated(workflowStepName, "workflow_step_1", resourceType, "L85p1IsBbfF");
assertEquals(workflowStepName, resourceCreated.workflowStepName());
Expand Down
Loading

0 comments on commit 578e3f2

Please sign in to comment.