Skip to content

Commit

Permalink
add action listener to update resource created
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Dec 1, 2023
1 parent e7186e1 commit 72122e1
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public enum WorkflowResources {
private final String workflowStep;
private final String resourceCreated;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
this.workflowStep = workflowStep;
Expand Down Expand Up @@ -82,6 +85,6 @@ public static String getResourceByWorkflowStep(String workflowStep) throws IOExc
* @return a set of all the resource created types
*/
public static Set<String> getAllResourcesCreated() {
return Stream.of(values()).map(WorkflowResources::getResourceCreated).collect(Collectors.toSet());
return allResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

Expand All @@ -50,7 +49,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
Expand Down Expand Up @@ -492,15 +490,15 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
* @param nodeId WorkflowData object with relevent step information
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param completableFuture the CompletableFuture used for this step
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
public void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
CompletableFuture<WorkflowData> completableFuture
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceId);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
Expand All @@ -516,10 +514,7 @@ public void updateResourceInStateIndex(

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
}, exception -> {
completableFuture.completeExceptionally(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
logger.error("Failed to update workflow state with newly created resource", exception);
}));

listener.onResponse(updateResponse);
}, exception -> { listener.onFailure(exception); }));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,27 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {

try {
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
createConnectorFuture.complete(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, mlCreateConnectorResponse.getConnectorId())),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
logger.info("Created connector successfully");
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
mlCreateConnectorResponse.getConnectorId(),
createConnectorFuture
ActionListener.wrap(response -> {
logger.info("successfully updated resources created in state index: {}", response.getIndex());
createConnectorFuture.complete(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, mlCreateConnectorResponse.getConnectorId())),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createConnectorFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,26 @@ public void onResponse(CreateIndexResponse createIndexResponse) {
try {
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
logger.info("created index: {}", createIndexResponse.index());
createIndexFuture.complete(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
createIndexResponse.index(),
createIndexFuture
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());
createIndexFuture.complete(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createIndexFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,28 @@ public CompletableFuture<WorkflowData> execute(

try {
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
// PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createIngestPipelineFuture.complete(
new WorkflowData(
Map.of(resourceName, putPipelineRequest.getId()),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
putPipelineRequest.getId(),
createIngestPipelineFuture
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createIngestPipelineFuture.complete(
new WorkflowData(
Map.of(resourceName, putPipelineRequest.getId()),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createIngestPipelineFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,29 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
try {
logger.info("Remote Model registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
registerModelGroupFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, mlRegisterModelGroupResponse.getModelGroupId()),
Map.entry(MODEL_GROUP_STATUS, mlRegisterModelGroupResponse.getStatus())
),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
mlRegisterModelGroupResponse.getModelGroupId(),
registerModelGroupFuture
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
registerModelGroupFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, mlRegisterModelGroupResponse.getModelGroupId()),
Map.entry(MODEL_GROUP_STATUS, mlRegisterModelGroupResponse.getStatus())
),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
registerModelGroupFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,22 +255,29 @@ void retryableGetMlTask(
try {
logger.info("Local Model registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
registerLocalModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
registerLocalModelFuture
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
registerLocalModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,29 @@ public void onResponse(MLRegisterModelResponse mlRegisterModelResponse) {
try {
logger.info("Remote Model registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
registerRemoteModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, mlRegisterModelResponse.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, mlRegisterModelResponse.getStatus())
),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
mlRegisterModelResponse.getModelId(),
registerRemoteModelFuture
ActionListener.wrap(response -> {
logger.info("successfully updated resources created in state index: {}", response.getIndex());
registerRemoteModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, mlRegisterModelResponse.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, mlRegisterModelResponse.getStatus())
),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
registerRemoteModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
Expand Down

0 comments on commit 72122e1

Please sign in to comment.