diff --git a/build.gradle b/build.gradle index 812b19db6..7fdc16cea 100644 --- a/build.gradle +++ b/build.gradle @@ -35,8 +35,11 @@ buildscript { isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0] swaggerVersion = "2.1.24" - jacksonVersion = "2.18.1" + jacksonVersion = "2.18.2" swaggerCoreVersion = "2.2.26" + apacheHttpVersion = "5.3.1" + apacheHttpClientVersion = "5.4.1" + log4jVersion = "2.24.2" } @@ -178,9 +181,9 @@ dependencies { implementation "org.dafny:DafnyRuntime:4.9.0" implementation "software.amazon.smithy.dafny:conversion:0.1.1" implementation 'org.bouncycastle:bcprov-jdk18on:1.79' - api "org.apache.httpcomponents.core5:httpcore5:5.3.1" + api "org.apache.httpcomponents.core5:httpcore5:${apacheHttpVersion}" implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1" - implementation "jakarta.json:jakarta.json-api:2.1.3" + implementation "org.glassfish:jakarta.json:2.0.1" implementation "org.eclipse:yasson:3.0.4" implementation "com.google.code.gson:gson:2.11.0" // Swagger-Parser dependencies for API consistency tests @@ -211,24 +214,12 @@ dependencies { configurations.all { resolutionStrategy { force "com.google.guava:guava:33.3.1-jre" // CVE for 31.1, keep to force transitive dependencies - force "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" - force "com.fasterxml.jackson:jackson-bom:${jacksonVersion}" - force "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" - force "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" - force "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}" - force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jacksonVersion}" - force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${jacksonVersion}" - force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jacksonVersion}" - force "org.apache.httpcomponents.core5:httpcore5:5.3.1" - force "org.apache.httpcomponents.core5:httpcore5-h2:5.3.1" - force "org.opensearch.client:opensearch-rest-client:${opensearch_version}" - force "jakarta.json.bind:jakarta.json.bind-api:3.0.1" - force "org.eclipse:yasson:3.0.4" - force "commons-codec:commons-codec:1.16.1" - force "org.slf4j:slf4j-api:1.7.36" - force "org.apache.logging.log4j:log4j-api:2.24.2" - force "org.apache.logging.log4j:log4j-core:2.24.2" - force "org.eclipse.parsson:parsson:1.1.7" + // Force to prevent Jar Hell + force("org.apache.httpcomponents.core5:httpcore5:${apacheHttpVersion}") + force "org.apache.httpcomponents.core5:httpcore5-h2:${apacheHttpVersion}" + force("org.apache.httpcomponents.client5:httpclient5:${apacheHttpClientVersion}") + force "org.apache.logging.log4j:log4j-api:${log4jVersion}" + force "org.apache.logging.log4j:log4j-core:${log4jVersion}" } } } diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index fe431db74..46d8229dd 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -18,7 +18,6 @@ import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; @@ -53,6 +52,7 @@ import org.opensearch.remote.metadata.client.GetDataObjectRequest; import org.opensearch.remote.metadata.client.PutDataObjectRequest; import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.remote.metadata.client.UpdateDataObjectRequest; import org.opensearch.remote.metadata.common.SdkClientUtils; import java.io.IOException; @@ -835,35 +835,51 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, String tenantId * @param workflowStepName the workflow step name that created the resource * @param resourceId the id of the newly created resource * @param listener the ActionListener for this step to handle completing the future after update + * @deprecated here temporarily until tenantID passed https://github.com/opensearch-project/flow-framework/issues/987 */ + @Deprecated public void addResourceToStateIndex( WorkflowData currentNodeInputs, String nodeId, String workflowStepName, String resourceId, ActionListener listener + ) { + addResourceToStateIndex(currentNodeInputs, nodeId, workflowStepName, resourceId, null, listener); + } + + /** + * Adds a resource to the state index, including common exception handling + * @param currentNodeInputs Inputs to the current node + * @param nodeId current process node (workflow step) id + * @param workflowStepName the workflow step name that created the resource + * @param resourceId the id of the newly created resource + * @param tenantId the tenant id + * @param listener the ActionListener for this step to handle completing the future after update + */ + public void addResourceToStateIndex( + WorkflowData currentNodeInputs, + String nodeId, + String workflowStepName, + String resourceId, + String tenantId, + ActionListener listener ) { String workflowId = currentNodeInputs.getWorkflowId(); + if (!validateStateIndexExists(workflowId, listener)) { + return; + } String resourceName = getResourceByWorkflowStep(workflowStepName); ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceName, resourceId); - if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { - String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( - "Failed to update state for {} due to missing {} index", + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + getAndUpdateResourceInStateDocumentWithRetries( workflowId, - WORKFLOW_STATE_INDEX - ).getFormattedMessage(); - logger.error(errorMessage); - listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); - } else { - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - getAndUpdateResourceInStateDocumentWithRetries( - workflowId, - newResource, - OpType.INDEX, - RETRIES, - ActionListener.runBefore(listener, context::restore) - ); - } + tenantId, + newResource, + OpType.INDEX, + RETRIES, + ActionListener.runBefore(listener, context::restore) + ); } } @@ -872,8 +888,42 @@ public void addResourceToStateIndex( * @param workflowId The workflow document id in the state index * @param resourceToDelete The resource to delete * @param listener the ActionListener for this step to handle completing the future after update + * @deprecated here temporarily until tenantID passed https://github.com/opensearch-project/flow-framework/issues/987 */ + @Deprecated public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener listener) { + deleteResourceFromStateIndex(workflowId, null, resourceToDelete, listener); + } + + /** + * Removes a resource from the state index, including common exception handling + * @param workflowId The workflow document id in the state index + * @param tenantId The tenant id + * @param resourceToDelete The resource to delete + * @param listener the ActionListener for this step to handle completing the future after update + */ + public void deleteResourceFromStateIndex( + String workflowId, + String tenantId, + ResourceCreated resourceToDelete, + ActionListener listener + ) { + if (!validateStateIndexExists(workflowId, listener)) { + return; + } + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + getAndUpdateResourceInStateDocumentWithRetries( + workflowId, + tenantId, + resourceToDelete, + OpType.DELETE, + RETRIES, + ActionListener.runBefore(listener, context::restore) + ); + } + } + + private boolean validateStateIndexExists(String workflowId, ActionListener listener) { if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( "Failed to update state for {} due to missing {} index", @@ -882,22 +932,15 @@ public void deleteResourceFromStateIndex(String workflowId, ResourceCreated reso ).getFormattedMessage(); logger.error(errorMessage); listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); - } else { - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - getAndUpdateResourceInStateDocumentWithRetries( - workflowId, - resourceToDelete, - OpType.DELETE, - RETRIES, - ActionListener.runBefore(listener, context::restore) - ); - } + return false; } + return true; } /** * Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries * @param workflowId The document id to update + * @param tenantId * @param resource The resource to add or remove from the resources created list * @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove) * @param retries The number of retries on update version conflicts @@ -905,39 +948,86 @@ public void deleteResourceFromStateIndex(String workflowId, ResourceCreated reso */ private void getAndUpdateResourceInStateDocumentWithRetries( String workflowId, + String tenantId, ResourceCreated resource, OpType operation, int retries, ActionListener listener ) { - GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, workflowId); - client.get(getRequest, ActionListener.wrap(getResponse -> { - if (!getResponse.isExists()) { - listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND)); - return; + GetDataObjectRequest getRequest = GetDataObjectRequest.builder() + .index(WORKFLOW_STATE_INDEX) + .id(workflowId) + .tenantId(tenantId) + .build(); + sdkClient.getDataObjectAsync( + getRequest, + client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL) + ).whenComplete((r, throwable) -> { + if (throwable == null) { + try { + GetResponse getResponse = GetResponse.fromXContent(r.parser()); + handleStateGetResponse(workflowId, tenantId, resource, operation, retries, listener, getResponse); + } catch (IOException e) { + logger.error("Failed to parse get response", e); + listener.onFailure(new FlowFrameworkException("Failed to parse get response", INTERNAL_SERVER_ERROR)); + } + } else { + Exception ex = SdkClientUtils.unwrapAndConvertToException(throwable); + handleStateUpdateException(workflowId, tenantId, resource, operation, 0, listener, ex); } - WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString()); + }); + } + + private void handleStateGetResponse( + String workflowId, + String tenantId, + ResourceCreated resource, + OpType operation, + int retries, + ActionListener listener, + GetResponse getResponse + ) { + if (!getResponse.isExists()) { + listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND)); + return; + } + WorkflowState currentState; + try { + currentState = WorkflowState.parse(getResponse.getSourceAsString()); List resourcesCreated = new ArrayList<>(currentState.resourcesCreated()); if (operation == OpType.DELETE) { resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap())); } else { resourcesCreated.add(resource); } - XContentBuilder builder = XContentFactory.jsonBuilder(); WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build(); - newState.toXContent(builder, null); - UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowId).doc(builder) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setIfSeqNo(getResponse.getSeqNo()) - .setIfPrimaryTerm(getResponse.getPrimaryTerm()); - client.update( - updateRequest, - ActionListener.wrap( - r -> handleStateUpdateSuccess(workflowId, resource, operation, listener), - e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e) - ) - ); - }, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex))); + UpdateDataObjectRequest updateRequest2 = UpdateDataObjectRequest.builder() + .index(WORKFLOW_STATE_INDEX) + .id(workflowId) + .tenantId(tenantId) + .dataObject(newState) + .ifSeqNo(getResponse.getSeqNo()) + .ifPrimaryTerm(getResponse.getPrimaryTerm()) + .build(); + sdkClient.updateDataObjectAsync( + updateRequest2, + client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL) + ).whenComplete((r, throwable) -> { + if (throwable == null) { + handleStateUpdateSuccess(workflowId, resource, operation, listener); + } else { + Exception e = SdkClientUtils.unwrapAndConvertToException(throwable); + handleStateUpdateException(workflowId, tenantId, resource, operation, retries, listener, e); + } + }); + } catch (Exception e) { + String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( + "Failed to parse workflow state response for {}", + workflowId + ).getFormattedMessage(); + logger.error(errorMessage, e); + listener.onFailure(new FlowFrameworkException(errorMessage, INTERNAL_SERVER_ERROR)); + } } private void handleStateUpdateSuccess( @@ -962,6 +1052,7 @@ private void handleStateUpdateSuccess( private void handleStateUpdateException( String workflowId, + String tenantId, ResourceCreated newResource, OpType operation, int retries, @@ -970,7 +1061,7 @@ private void handleStateUpdateException( ) { if (e instanceof VersionConflictEngineException && retries > 0) { // Retry if we haven't exhausted retries - getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener); + getAndUpdateResourceInStateDocumentWithRetries(workflowId, tenantId, newResource, operation, retries - 1, listener); return; } String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(