Skip to content

Commit

Permalink
Update tests for new update async code
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 8, 2025
1 parent 13d1b83 commit b0b2d1d
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessageFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
Expand Down Expand Up @@ -47,7 +48,6 @@
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.remote.metadata.client.DeleteDataObjectRequest;
import org.opensearch.remote.metadata.client.GetDataObjectRequest;
import org.opensearch.remote.metadata.client.PutDataObjectRequest;
Expand Down Expand Up @@ -991,17 +991,16 @@ private void handleStateGetResponse(
listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND));
return;
}
WorkflowState currentState;
try {
currentState = WorkflowState.parse(getResponse.getSourceAsString());
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
if (operation == OpType.DELETE) {
resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap()));
} else {
resourcesCreated.add(resource);
}
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
UpdateDataObjectRequest updateRequest2 = UpdateDataObjectRequest.builder()
UpdateDataObjectRequest updateRequest = UpdateDataObjectRequest.builder()
.index(WORKFLOW_STATE_INDEX)
.id(workflowId)
.tenantId(tenantId)
Expand All @@ -1010,7 +1009,7 @@ private void handleStateGetResponse(
.ifPrimaryTerm(getResponse.getPrimaryTerm())
.build();
sdkClient.updateDataObjectAsync(
updateRequest2,
updateRequest,
client.threadPool().executor(operation == OpType.DELETE ? DEPROVISION_WORKFLOW_THREAD_POOL : PROVISION_WORKFLOW_THREAD_POOL)
).whenComplete((r, throwable) -> {
if (throwable == null) {
Expand Down Expand Up @@ -1059,7 +1058,7 @@ private void handleStateUpdateException(
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
if (e instanceof OpenSearchStatusException && ((OpenSearchStatusException) e).status() == RestStatus.CONFLICT && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, tenantId, newResource, operation, retries - 1, listener);
return;
Expand Down
Loading

0 comments on commit b0b2d1d

Please sign in to comment.