Skip to content

Commit

Permalink
Refactor state index update method using multitenant client
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 7, 2025
1 parent 1a9f856 commit 99d273b
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 70 deletions.
33 changes: 12 additions & 21 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ buildscript {

isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0]
swaggerVersion = "2.1.24"
jacksonVersion = "2.18.1"
jacksonVersion = "2.18.2"
swaggerCoreVersion = "2.2.26"
apacheHttpVersion = "5.3.1"
apacheHttpClientVersion = "5.4.1"
log4jVersion = "2.24.2"

}

Expand Down Expand Up @@ -178,9 +181,9 @@ dependencies {
implementation "org.dafny:DafnyRuntime:4.9.0"
implementation "software.amazon.smithy.dafny:conversion:0.1.1"
implementation 'org.bouncycastle:bcprov-jdk18on:1.79'
api "org.apache.httpcomponents.core5:httpcore5:5.3.1"
api "org.apache.httpcomponents.core5:httpcore5:${apacheHttpVersion}"
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "jakarta.json:jakarta.json-api:2.1.3"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.4"
implementation "com.google.code.gson:gson:2.11.0"
// Swagger-Parser dependencies for API consistency tests
Expand Down Expand Up @@ -211,24 +214,12 @@ dependencies {
configurations.all {
resolutionStrategy {
force "com.google.guava:guava:33.3.1-jre" // CVE for 31.1, keep to force transitive dependencies
force "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
force "com.fasterxml.jackson:jackson-bom:${jacksonVersion}"
force "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
force "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
force "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${jacksonVersion}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${jacksonVersion}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jacksonVersion}"
force "org.apache.httpcomponents.core5:httpcore5:5.3.1"
force "org.apache.httpcomponents.core5:httpcore5-h2:5.3.1"
force "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
force "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
force "org.eclipse:yasson:3.0.4"
force "commons-codec:commons-codec:1.16.1"
force "org.slf4j:slf4j-api:1.7.36"
force "org.apache.logging.log4j:log4j-api:2.24.2"
force "org.apache.logging.log4j:log4j-core:2.24.2"
force "org.eclipse.parsson:parsson:1.1.7"
// Force to prevent Jar Hell
force("org.apache.httpcomponents.core5:httpcore5:${apacheHttpVersion}")
force "org.apache.httpcomponents.core5:httpcore5-h2:${apacheHttpVersion}"
force("org.apache.httpcomponents.client5:httpclient5:${apacheHttpClientVersion}")
force "org.apache.logging.log4j:log4j-api:${log4jVersion}"
force "org.apache.logging.log4j:log4j-core:${log4jVersion}"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
Expand Down Expand Up @@ -53,6 +52,7 @@
import org.opensearch.remote.metadata.client.GetDataObjectRequest;
import org.opensearch.remote.metadata.client.PutDataObjectRequest;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.client.UpdateDataObjectRequest;
import org.opensearch.remote.metadata.common.SdkClientUtils;

import java.io.IOException;
Expand Down Expand Up @@ -835,35 +835,51 @@ public void deleteFlowFrameworkSystemIndexDoc(String documentId, String tenantId
* @param workflowStepName the workflow step name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @deprecated here temporarily until tenantID passed https://github.com/opensearch-project/flow-framework/issues/987
*/
@Deprecated
public void addResourceToStateIndex(
WorkflowData currentNodeInputs,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<WorkflowData> listener
) {
addResourceToStateIndex(currentNodeInputs, nodeId, workflowStepName, resourceId, null, listener);
}

/**
* Adds a resource to the state index, including common exception handling
* @param currentNodeInputs Inputs to the current node
* @param nodeId current process node (workflow step) id
* @param workflowStepName the workflow step name that created the resource
* @param resourceId the id of the newly created resource
* @param tenantId the tenant id
* @param listener the ActionListener for this step to handle completing the future after update
*/
public void addResourceToStateIndex(
WorkflowData currentNodeInputs,
String nodeId,
String workflowStepName,
String resourceId,
String tenantId,
ActionListener<WorkflowData> listener
) {
String workflowId = currentNodeInputs.getWorkflowId();
if (!validateStateIndexExists(workflowId, listener)) {
return;
}
String resourceName = getResourceByWorkflowStep(workflowStepName);
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceName, resourceId);
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update state for {} due to missing {} index",
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
WORKFLOW_STATE_INDEX
).getFormattedMessage();
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
newResource,
OpType.INDEX,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
tenantId,
newResource,
OpType.INDEX,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}

Expand All @@ -872,8 +888,42 @@ public void addResourceToStateIndex(
* @param workflowId The workflow document id in the state index
* @param resourceToDelete The resource to delete
* @param listener the ActionListener for this step to handle completing the future after update
* @deprecated here temporarily until tenantID passed https://github.com/opensearch-project/flow-framework/issues/987
*/
@Deprecated
public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener<WorkflowData> listener) {
deleteResourceFromStateIndex(workflowId, null, resourceToDelete, listener);
}

/**
* Removes a resource from the state index, including common exception handling
* @param workflowId The workflow document id in the state index
* @param tenantId The tenant id
* @param resourceToDelete The resource to delete
* @param listener the ActionListener for this step to handle completing the future after update
*/
public void deleteResourceFromStateIndex(
String workflowId,
String tenantId,
ResourceCreated resourceToDelete,
ActionListener<WorkflowData> listener
) {
if (!validateStateIndexExists(workflowId, listener)) {
return;
}
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
tenantId,
resourceToDelete,
OpType.DELETE,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}

private boolean validateStateIndexExists(String workflowId, ActionListener<WorkflowData> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update state for {} due to missing {} index",
Expand All @@ -882,62 +932,102 @@ public void deleteResourceFromStateIndex(String workflowId, ResourceCreated reso
).getFormattedMessage();
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
resourceToDelete,
OpType.DELETE,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
return false;
}
return true;
}

/**
* Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries
* @param workflowId The document id to update
* @param tenantId
* @param resource The resource to add or remove from the resources created list
* @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove)
* @param retries The number of retries on update version conflicts
* @param listener The listener to complete on success or failure
*/
private void getAndUpdateResourceInStateDocumentWithRetries(
String workflowId,
String tenantId,
ResourceCreated resource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, workflowId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (!getResponse.isExists()) {
listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND));
return;
GetDataObjectRequest getRequest = GetDataObjectRequest.builder()
.index(WORKFLOW_STATE_INDEX)
.id(workflowId)
.tenantId(tenantId)
.build();
sdkClient.getDataObjectAsync(
getRequest,
client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL)
).whenComplete((r, throwable) -> {
if (throwable == null) {
try {
GetResponse getResponse = GetResponse.fromXContent(r.parser());
handleStateGetResponse(workflowId, tenantId, resource, operation, retries, listener, getResponse);
} catch (IOException e) {
logger.error("Failed to parse get response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse get response", INTERNAL_SERVER_ERROR));
}
} else {
Exception ex = SdkClientUtils.unwrapAndConvertToException(throwable);
handleStateUpdateException(workflowId, tenantId, resource, operation, 0, listener, ex);
}
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
});
}

private void handleStateGetResponse(
String workflowId,
String tenantId,
ResourceCreated resource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener,
GetResponse getResponse
) {
if (!getResponse.isExists()) {
listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND));
return;
}
WorkflowState currentState;
try {
currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
if (operation == OpType.DELETE) {
resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap()));
} else {
resourcesCreated.add(resource);
}
XContentBuilder builder = XContentFactory.jsonBuilder();
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
newState.toXContent(builder, null);
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowId).doc(builder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setIfSeqNo(getResponse.getSeqNo())
.setIfPrimaryTerm(getResponse.getPrimaryTerm());
client.update(
updateRequest,
ActionListener.wrap(
r -> handleStateUpdateSuccess(workflowId, resource, operation, listener),
e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e)
)
);
}, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex)));
UpdateDataObjectRequest updateRequest2 = UpdateDataObjectRequest.builder()
.index(WORKFLOW_STATE_INDEX)
.id(workflowId)
.tenantId(tenantId)
.dataObject(newState)
.ifSeqNo(getResponse.getSeqNo())
.ifPrimaryTerm(getResponse.getPrimaryTerm())
.build();
sdkClient.updateDataObjectAsync(
updateRequest2,
client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL)
).whenComplete((r, throwable) -> {
if (throwable == null) {
handleStateUpdateSuccess(workflowId, resource, operation, listener);
} else {
Exception e = SdkClientUtils.unwrapAndConvertToException(throwable);
handleStateUpdateException(workflowId, tenantId, resource, operation, retries, listener, e);
}
});
} catch (Exception e) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to parse workflow state response for {}",
workflowId
).getFormattedMessage();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, INTERNAL_SERVER_ERROR));
}
}

private void handleStateUpdateSuccess(
Expand All @@ -962,6 +1052,7 @@ private void handleStateUpdateSuccess(

private void handleStateUpdateException(
String workflowId,
String tenantId,
ResourceCreated newResource,
OpType operation,
int retries,
Expand All @@ -970,7 +1061,7 @@ private void handleStateUpdateException(
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener);
getAndUpdateResourceInStateDocumentWithRetries(workflowId, tenantId, newResource, operation, retries - 1, listener);
return;
}
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
Expand Down

0 comments on commit 99d273b

Please sign in to comment.