Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace all CompletableFutures with PlainActionFutures #419

Merged
merged 8 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
integTest:
needs: [spotless, javadoc]
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
# Don't use 21.0.2 https://github.com/opensearch-project/flow-framework/issues/426
Expand All @@ -81,6 +82,6 @@ jobs:
run: |
./gradlew integTest yamlRestTest
- name: Multi Nodes Integration Testing
if: matrix.java == 21
if: matrix.java == '21.0.1'
run: |
./gradlew integTest -PnumNodes=3
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand All @@ -39,7 +40,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
Expand Down Expand Up @@ -160,9 +160,9 @@ private void executeDeprovisionSequence(
ProcessNode deprovisionNode = iter.next();
ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated);
String resourceNameAndId = getResourceNameAndId(resource);
CompletableFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
PlainActionFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
try {
deprovisionFuture.join();
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from list so we don't try again
iter.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand All @@ -40,7 +41,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
Expand Down Expand Up @@ -183,7 +183,7 @@
private void executeWorkflow(List<ProcessNode> workflowSequence, String workflowId) {
try {

List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();
List<PlainActionFuture<?>> workflowFutureList = new ArrayList<>();

Check warning on line 186 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L186

Added line #L186 was not covered by tests
for (ProcessNode processNode : workflowSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Expand All @@ -202,8 +202,8 @@
workflowFutureList.add(processNode.execute());
}

// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);
// Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally
workflowFutureList.forEach(PlainActionFuture::actionGet);

Check warning on line 206 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L206

Added line #L206 was not covered by tests

logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -28,7 +29,6 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.ALL_CONFIG;
Expand Down Expand Up @@ -75,14 +75,14 @@
}

@Override
public CompletableFuture<WorkflowData> execute(
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {

CompletableFuture<WorkflowData> registerLocalModelFuture = new CompletableFuture<>();
PlainActionFuture<WorkflowData> registerLocalModelFuture = PlainActionFuture.newFuture();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
Expand Down Expand Up @@ -180,7 +180,7 @@
"successfully updated resources created in state index: {}",
deployUpdateResponse.getIndex()
);
registerLocalModelFuture.complete(
registerLocalModelFuture.onResponse(

Check warning on line 183 in src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java#L183

Added line #L183 was not covered by tests
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, id),
Expand All @@ -192,7 +192,7 @@
);
}, deployUpdateException -> {
logger.error("Failed to update simulated deploy step resource", deployUpdateException);
registerLocalModelFuture.completeExceptionally(
registerLocalModelFuture.onFailure(

Check warning on line 195 in src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractRegisterLocalModelStep.java#L195

Added line #L195 was not covered by tests
new FlowFrameworkException(
deployUpdateException.getMessage(),
ExceptionsHelper.status(deployUpdateException)
Expand All @@ -201,24 +201,22 @@
})
);
} else {
registerLocalModelFuture.complete(
registerLocalModelFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, id), Map.entry(REGISTER_MODEL_STATUS, mlTask.getState().name())),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
}
}, exception -> { registerLocalModelFuture.completeExceptionally(exception); })
}, exception -> { registerLocalModelFuture.onFailure(exception); })
);
}, exception -> {
logger.error("Failed to register local model");
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
registerLocalModelFuture.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}));
} catch (FlowFrameworkException e) {
registerLocalModelFuture.completeExceptionally(e);
registerLocalModelFuture.onFailure(e);
}
return registerLocalModelFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -68,7 +69,7 @@ protected AbstractRetryableWorkflowStep(
protected void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> future,
PlainActionFuture<WorkflowData> future,
String taskId,
String workflowStep,
ActionListener<MLTask> mlTaskListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -33,7 +34,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.opensearch.flowframework.common.CommonValue.ACTIONS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
Expand Down Expand Up @@ -70,13 +70,13 @@

// TODO: need to add retry conflicts here
@Override
public CompletableFuture<WorkflowData> execute(
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {
CompletableFuture<WorkflowData> createConnectorFuture = new CompletableFuture<>();
PlainActionFuture<WorkflowData> createConnectorFuture = PlainActionFuture.newFuture();

ActionListener<MLCreateConnectorResponse> actionListener = new ActionListener<>() {

Expand All @@ -93,7 +93,7 @@
mlCreateConnectorResponse.getConnectorId(),
ActionListener.wrap(response -> {
logger.info("successfully updated resources created in state index: {}", response.getIndex());
createConnectorFuture.complete(
createConnectorFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, mlCreateConnectorResponse.getConnectorId())),
currentNodeInputs.getWorkflowId(),
Expand All @@ -102,22 +102,22 @@
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createConnectorFuture.completeExceptionally(
createConnectorFuture.onFailure(

Check warning on line 105 in src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java#L105

Added line #L105 was not covered by tests
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
createConnectorFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 113 in src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java#L113

Added line #L113 was not covered by tests
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to create connector");
createConnectorFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
};

Expand Down Expand Up @@ -171,7 +171,7 @@

mlClient.createConnector(mlInput, actionListener);
} catch (FlowFrameworkException e) {
createConnectorFuture.completeExceptionally(e);
createConnectorFuture.onFailure(e);

Check warning on line 174 in src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java#L174

Added line #L174 was not covered by tests
}
return createConnectorFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand All @@ -25,7 +26,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.flowframework.common.CommonValue.DEFAULT_MAPPING_OPTION;
Expand Down Expand Up @@ -59,13 +59,13 @@
}

@Override
public CompletableFuture<WorkflowData> execute(
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {
CompletableFuture<WorkflowData> createIndexFuture = new CompletableFuture<>();
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
Expand All @@ -80,7 +80,7 @@
createIndexResponse.index(),
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());
createIndexFuture.complete(
createIndexFuture.onResponse(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
Expand All @@ -89,21 +89,21 @@
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
createIndexFuture.completeExceptionally(
createIndexFuture.onFailure(

Check warning on line 92 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L92

Added line #L92 was not covered by tests
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
createIndexFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 99 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L99

Added line #L99 was not covered by tests
}
}

@Override
public void onFailure(Exception e) {
logger.error("Failed to create an index", e);
createIndexFuture.completeExceptionally(e);
createIndexFuture.onFailure(e);
}
};

Expand All @@ -128,7 +128,7 @@
}
} catch (Exception e) {
logger.error("Failed to find the correct resource for the workflow step", e);
createIndexFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 131 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L131

Added line #L131 was not covered by tests
}

// TODO:
Expand Down
Loading
Loading