Skip to content

Commit

Permalink
Replace all CompletableFutures with PlainActionFutures (#419)
Browse files Browse the repository at this point in the history
* Intial commit to remove CompletableFuture

Signed-off-by: Owais Kazi <[email protected]>

* Removed CompletableFuture from ProcessNode and tests

Signed-off-by: Owais Kazi <[email protected]>

* Removed CompletableFuture from create index and pipeline workflow steps

Signed-off-by: Owais Kazi <[email protected]>

* Passing tests

Signed-off-by: Owais Kazi <[email protected]>

* Addressed initial comments

Signed-off-by: Owais Kazi <[email protected]>

* Move log line

Signed-off-by: Daniel Widdis <[email protected]>

* Reenable multi-node tests

Signed-off-by: Daniel Widdis <[email protected]>

* Disable fail-fast

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Owais Kazi <[email protected]>
Signed-off-by: Daniel Widdis <[email protected]>
Co-authored-by: Daniel Widdis <[email protected]>
(cherry picked from commit 92d9108)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and dbwiddis committed Jan 22, 2024
1 parent 3d5d49e commit f06b525
Show file tree
Hide file tree
Showing 38 changed files with 245 additions and 264 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
integTest:
needs: [spotless, javadoc]
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
# Don't use 21.0.2 https://github.com/opensearch-project/flow-framework/issues/426
Expand All @@ -81,6 +82,6 @@ jobs:
run: |
./gradlew integTest yamlRestTest
- name: Multi Nodes Integration Testing
if: matrix.java == 21
if: matrix.java == '21.0.1'
run: |
./gradlew integTest -PnumNodes=3
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand All @@ -39,7 +40,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
Expand Down Expand Up @@ -160,9 +160,9 @@ private void executeDeprovisionSequence(
ProcessNode deprovisionNode = iter.next();
ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated);
String resourceNameAndId = getResourceNameAndId(resource);
CompletableFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
PlainActionFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
try {
deprovisionFuture.join();
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from list so we don't try again
iter.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand All @@ -40,7 +41,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
Expand Down Expand Up @@ -183,7 +183,7 @@ private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowS
private void executeWorkflow(List<ProcessNode> workflowSequence, String workflowId) {
try {

List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();
List<PlainActionFuture<?>> workflowFutureList = new ArrayList<>();

Check warning on line 186 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L186

Added line #L186 was not covered by tests
for (ProcessNode processNode : workflowSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Expand All @@ -202,8 +202,8 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
workflowFutureList.add(processNode.execute());
}

// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);
// Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally
workflowFutureList.forEach(PlainActionFuture::actionGet);

Check warning on line 206 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L206

Added line #L206 was not covered by tests

logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -28,7 +29,6 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.ALL_CONFIG;
Expand Down Expand Up @@ -75,14 +75,14 @@ protected AbstractRegisterLocalModelStep(
}

@Override
public CompletableFuture<WorkflowData> execute(
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {

CompletableFuture<WorkflowData> registerLocalModelFuture = new CompletableFuture<>();
PlainActionFuture<WorkflowData> registerLocalModelFuture = PlainActionFuture.newFuture();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
Expand Down Expand Up @@ -180,7 +180,7 @@ public CompletableFuture<WorkflowData> execute(
"successfully updated resources created in state index: {}",
deployUpdateResponse.getIndex()
);
registerLocalModelFuture.complete(
registerLocalModelFuture.onResponse(

Check warning on line 183 in src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java#L183

Added line #L183 was not covered by tests
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, id),
Expand All @@ -192,7 +192,7 @@ public CompletableFuture<WorkflowData> execute(
);
}, deployUpdateException -> {
logger.error("Failed to update simulated deploy step resource", deployUpdateException);
registerLocalModelFuture.completeExceptionally(
registerLocalModelFuture.onFailure(

Check warning on line 195 in src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java#L195

Added line #L195 was not covered by tests
new FlowFrameworkException(
deployUpdateException.getMessage(),
ExceptionsHelper.status(deployUpdateException)
Expand All @@ -201,24 +201,22 @@ public CompletableFuture<WorkflowData> execute(
})
);
} else {
registerLocalModelFuture.complete(
registerLocalModelFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, id), Map.entry(REGISTER_MODEL_STATUS, mlTask.getState().name())),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
}
}, exception -> { registerLocalModelFuture.completeExceptionally(exception); })
}, exception -> { registerLocalModelFuture.onFailure(exception); })
);
}, exception -> {
logger.error("Failed to register local model");
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
registerLocalModelFuture.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}));
} catch (FlowFrameworkException e) {
registerLocalModelFuture.completeExceptionally(e);
registerLocalModelFuture.onFailure(e);
}
return registerLocalModelFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -68,7 +69,7 @@ protected AbstractRetryableWorkflowStep(
protected void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> future,
PlainActionFuture<WorkflowData> future,
String taskId,
String workflowStep,
ActionListener<MLTask> mlTaskListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -33,7 +34,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.opensearch.flowframework.common.CommonValue.ACTIONS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
Expand Down Expand Up @@ -70,13 +70,13 @@ public CreateConnectorStep(MachineLearningNodeClient mlClient, FlowFrameworkIndi

// TODO: need to add retry conflicts here
@Override
public CompletableFuture<WorkflowData> execute(
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {
CompletableFuture<WorkflowData> createConnectorFuture = new CompletableFuture<>();
PlainActionFuture<WorkflowData> createConnectorFuture = PlainActionFuture.newFuture();

ActionListener<MLCreateConnectorResponse> actionListener = new ActionListener<>() {

Expand All @@ -93,7 +93,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
mlCreateConnectorResponse.getConnectorId(),
ActionListener.wrap(response -> {
logger.info("successfully updated resources created in state index: {}", response.getIndex());
createConnectorFuture.complete(
createConnectorFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, mlCreateConnectorResponse.getConnectorId())),
currentNodeInputs.getWorkflowId(),
Expand All @@ -102,22 +102,22 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createConnectorFuture.completeExceptionally(
createConnectorFuture.onFailure(

Check warning on line 105 in src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java#L105

Added line #L105 was not covered by tests
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
createConnectorFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 113 in src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java#L113

Added line #L113 was not covered by tests
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to create connector");
createConnectorFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
};

Expand Down Expand Up @@ -171,7 +171,7 @@ public void onFailure(Exception e) {

mlClient.createConnector(mlInput, actionListener);
} catch (FlowFrameworkException e) {
createConnectorFuture.completeExceptionally(e);
createConnectorFuture.onFailure(e);

Check warning on line 174 in src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java#L174

Added line #L174 was not covered by tests
}
return createConnectorFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand All @@ -25,7 +26,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.flowframework.common.CommonValue.DEFAULT_MAPPING_OPTION;
Expand Down Expand Up @@ -59,13 +59,13 @@ public CreateIndexStep(ClusterService clusterService, Client client, FlowFramewo
}

@Override
public CompletableFuture<WorkflowData> execute(
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {
CompletableFuture<WorkflowData> createIndexFuture = new CompletableFuture<>();
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
Expand All @@ -80,7 +80,7 @@ public void onResponse(CreateIndexResponse createIndexResponse) {
createIndexResponse.index(),
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());
createIndexFuture.complete(
createIndexFuture.onResponse(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
Expand All @@ -89,21 +89,21 @@ public void onResponse(CreateIndexResponse createIndexResponse) {
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createIndexFuture.completeExceptionally(
createIndexFuture.onFailure(

Check warning on line 92 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L92

Added line #L92 was not covered by tests
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
createIndexFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 99 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L99

Added line #L99 was not covered by tests
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to create an index", e);
createIndexFuture.completeExceptionally(e);
createIndexFuture.onFailure(e);
}
};

Expand All @@ -128,7 +128,7 @@ public void onFailure(Exception e) {
}
} catch (Exception e) {
logger.error("Failed to find the correct resource for the workflow step", e);
createIndexFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 131 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L131

Added line #L131 was not covered by tests
}

// TODO:
Expand Down
Loading

0 comments on commit f06b525

Please sign in to comment.