Skip to content

Commit

Permalink
Merge branch 'main' into whitesource-remediate/org.eclipse-yasson-3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
amitgalitz authored Aug 19, 2024
2 parents 5c73ca2 + 10cddd7 commit fea736b
Show file tree
Hide file tree
Showing 46 changed files with 1,411 additions and 529 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Documentation
### Maintenance
### Refactoring
- Refactor workflow step resource updates to eliminate duplication ([#796](https://github.com/opensearch-project/flow-framework/pull/796))
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ configurations {

dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.3'
implementation 'org.junit.jupiter:junit-jupiter:5.11.0'
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.16.0'
Expand All @@ -185,7 +185,7 @@ dependencies {

configurations.all {
resolutionStrategy {
force("com.google.guava:guava:33.2.1-jre") // CVE for 31.1, keep to force transitive dependencies
force("com.google.guava:guava:33.3.0-jre") // CVE for 31.1, keep to force transitive dependencies
force("com.fasterxml.jackson.core:jackson-core:2.17.2") // Dependency Jar Hell
force("org.apache.httpcomponents.core5:httpcore5:5.2.5") // Dependency Jar Hell
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

Expand Down Expand Up @@ -666,13 +667,13 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId WorkflowData object with relevent step information
* @param nodeId current process node (workflow step) id
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
public void updateResourceInStateIndex(
private void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
Expand All @@ -697,6 +698,44 @@ public void updateResourceInStateIndex(
updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, exception -> { listener.onFailure(exception); }));
}, listener::onFailure));
}

/**
* Adds a resource to the state index, including common exception handling
* @param currentNodeInputs Inputs to the current node
* @param nodeId current process node (workflow step) id
* @param workflowStepName the workflow step name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
*/
public void addResourceToStateIndex(
WorkflowData currentNodeInputs,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<WorkflowData> listener
) {
String resourceName = getResourceByWorkflowStep(workflowStepName);
try {
updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
nodeId,
workflowStepName,
resourceId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), currentNodeInputs.getWorkflowId(), nodeId));
}, exception -> {
String errorMessage = "Failed to update new created " + nodeId + " resource " + workflowStepName + " id " + resourceId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.util.Strings;
import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.yaml.YamlXContent;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.common.Strings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
Expand Down Expand Up @@ -372,10 +372,10 @@ public static Template updateExistingTemplate(Template existingTemplate, Templat
if (templateWithNewFields.name() != null) {
builder.name(templateWithNewFields.name());
}
if (!Strings.isBlank(templateWithNewFields.description())) {
if (Strings.hasText(templateWithNewFields.description())) {
builder.description(templateWithNewFields.description());
}
if (!Strings.isBlank(templateWithNewFields.useCase())) {
if (Strings.hasText(templateWithNewFields.useCase())) {
builder.useCase(templateWithNewFields.useCase());
}
if (templateWithNewFields.templateVersion() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
return processError(ffe, params, request);
}
if (reprovision && !params.isEmpty()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
try {
Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,21 @@ public static void flattenSettings(String prefix, Map<String, Object> settings,
}
}
}

/**
* Ensures index is prepended to flattened setting keys
* @param originalSettings the original settings map
* @return new map with keys prepended with index
*/
public static Map<String, Object> prependIndexToSettings(Map<String, Object> originalSettings) {
Map<String, Object> newSettings = new HashMap<>();
originalSettings.entrySet().stream().forEach(x -> {
if (!x.getKey().startsWith("index.")) {
newSettings.put("index." + x.getKey(), x.getValue());
} else {
newSettings.put(x.getKey(), x.getValue());
}
});
return newSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
Expand Down Expand Up @@ -98,43 +97,14 @@ public PlainActionFuture<WorkflowData> execute(

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
flowFrameworkIndicesHandler.addResourceToStateIndex(
currentNodeInputs,
currentNodeId,
getName(),
pipelineId,
createPipelineFuture
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_FORMAT;
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.URL;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
Expand Down Expand Up @@ -189,58 +188,38 @@ public PlainActionFuture<WorkflowData> execute(

// Attempt to retrieve the model ID
retryableGetMlTask(
currentNodeInputs.getWorkflowId(),
currentNodeInputs,
currentNodeId,
registerLocalModelFuture,
taskId,
"Local model registration",
ActionListener.wrap(mlTask -> {

ActionListener.wrap(mlTaskWorkflowData -> {
// Registered Model Resource has been updated
String resourceName = getResourceByWorkflowStep(getName());
String id = getResourceId(mlTask);

if (Boolean.TRUE.equals(deploy)) {

// Simulate Model deployment step and update resources created
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
DeployModelStep.NAME,
id,
ActionListener.wrap(deployUpdateResponse -> {
logger.info(
"successfully updated resources created in state index: {}",
deployUpdateResponse.getIndex()
);
registerLocalModelFuture.onResponse(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, id),
Map.entry(REGISTER_MODEL_STATUS, mlTask.getState().name())
),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
);
}, deployUpdateException -> {
String id = (String) mlTaskWorkflowData.getContent().get(resourceName);
ActionListener<WorkflowData> deployUpdateListener = ActionListener.wrap(
deployUpdateResponse -> registerLocalModelFuture.onResponse(mlTaskWorkflowData),
deployUpdateException -> {
String errorMessage = "Failed to update simulated deploy step resource " + id;
logger.error(errorMessage, deployUpdateException);
registerLocalModelFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(deployUpdateException))
);
})
}
);
} else {
registerLocalModelFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, id), Map.entry(REGISTER_MODEL_STATUS, mlTask.getState().name())),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
// Simulate Model deployment step and update resources created
flowFrameworkIndicesHandler.addResourceToStateIndex(
currentNodeInputs,
currentNodeId,
DeployModelStep.NAME,
id,
deployUpdateListener
);
} else {
registerLocalModelFuture.onResponse(mlTaskWorkflowData);
}
}, exception -> { registerLocalModelFuture.onFailure(exception); })
}, registerLocalModelFuture::onFailure)
);
}, exception -> {
Exception e = getSafeException(exception);
Expand Down
Loading

0 comments on commit fea736b

Please sign in to comment.