Skip to content

Commit

Permalink
Fix process node zero timeout and handle not found exceptions
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 17, 2024
1 parent b9d295f commit 9562827
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
Expand Down Expand Up @@ -53,14 +54,13 @@
*/
public class DeprovisionWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, WorkflowResponse> {

private static final String DEPROVISION_SUFFIX = "_deprovision";

private final Logger logger = LogManager.getLogger(DeprovisionWorkflowTransportAction.class);

private final ThreadPool threadPool;
private final Client client;
private final WorkflowStepFactory workflowStepFactory;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new ProvisionWorkflowTransportAction
Expand All @@ -70,6 +70,7 @@ public class DeprovisionWorkflowTransportAction extends HandledTransportAction<W
* @param client The node client to retrieve a stored use case template
* @param workflowStepFactory The factory instantiating workflow steps
* @param flowFrameworkIndicesHandler Class to handle all internal system indices actions
* @param flowFrameworkSettings The plugin settings
*/
@Inject
public DeprovisionWorkflowTransportAction(
Expand All @@ -78,13 +79,15 @@ public DeprovisionWorkflowTransportAction(
ThreadPool threadPool,
Client client,
WorkflowStepFactory workflowStepFactory,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
FlowFrameworkSettings flowFrameworkSettings
) {
super(DeprovisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.workflowStepFactory = workflowStepFactory;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
Expand Down Expand Up @@ -128,8 +131,8 @@ private void executeDeprovisionSequence(
if (deprovisionStep == null) {
continue;
}
// New ID is old ID with deprovision added
String deprovisionStepId = workflowStepId + DEPROVISION_SUFFIX;
// New ID is old ID with (deprovision step type) prepended
String deprovisionStepId = "(deprovision_" + stepName + ") " + workflowStepId;
deprovisionProcessSequence.add(
new ProcessNode(
deprovisionStepId,
Expand All @@ -138,7 +141,7 @@ private void executeDeprovisionSequence(
new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId),
Collections.emptyList(),
this.threadPool,
TimeValue.ZERO
flowFrameworkSettings.getRequestTimeout()
)
);
}
Expand All @@ -164,12 +167,20 @@ private void executeDeprovisionSequence(
// Pause briefly before next step
Thread.sleep(100);
} catch (Throwable t) {
logger.info(
"Failed {} for {}: {}",
deprovisionNode.id(),
resourceNameAndId,
t.getCause() == null ? t.getMessage() : t.getCause().getMessage()
);
// If any deprovision fails due to not found, it's a success
if (t.getCause() instanceof OpenSearchStatusException
&& ((OpenSearchStatusException) t.getCause()).status() == RestStatus.NOT_FOUND) {
logger.info("Successful (not found) {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from list so we don't try again
iter.remove();
} else {
logger.info(
"Failed {} for {}: {}",
deprovisionNode.id(),
resourceNameAndId,
t.getCause() == null ? t.getMessage() : t.getCause().getMessage()
);
}
}
}
if (deprovisionProcessSequence.size() < resourceCount) {
Expand Down Expand Up @@ -257,17 +268,10 @@ private void updateWorkflowState(
}

private static ResourceCreated getResourceFromDeprovisionNode(ProcessNode deprovisionNode, List<ResourceCreated> resourcesCreated) {
String deprovisionId = deprovisionNode.id();
int pos = deprovisionId.indexOf(DEPROVISION_SUFFIX);
ResourceCreated resource = null;
if (pos > 0) {
for (ResourceCreated resourceCreated : resourcesCreated) {
if (resourceCreated.workflowStepId().equals(deprovisionId.substring(0, pos))) {
resource = resourceCreated;
}
}
}
return resource;
return resourcesCreated.stream()
.filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId()))
.findFirst()
.orElse(null);
}

private static String getResourceNameAndId(ResourceCreated resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.WorkflowState;
Expand Down Expand Up @@ -70,21 +71,25 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase
private DeleteConnectorStep deleteConnectorStep;
private DeprovisionWorkflowTransportAction deprovisionWorkflowTransportAction;
private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private FlowFrameworkSettings flowFrameworkSettings;

@Override
public void setUp() throws Exception {
super.setUp();
this.client = mock(Client.class);
this.workflowStepFactory = mock(WorkflowStepFactory.class);
this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
flowFrameworkSettings = mock(FlowFrameworkSettings.class);
when(flowFrameworkSettings.getRequestTimeout()).thenReturn(TimeValue.timeValueSeconds(10));

this.deprovisionWorkflowTransportAction = new DeprovisionWorkflowTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
threadPool,
client,
workflowStepFactory,
flowFrameworkIndicesHandler
flowFrameworkIndicesHandler,
flowFrameworkSettings
);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
Expand Down

0 comments on commit 9562827

Please sign in to comment.