Skip to content

Commit

Permalink
Migrate initial state document creation to metadata client
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Dec 26, 2024
1 parent ae6539b commit 2d79e13
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
Expand All @@ -37,7 +36,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
Expand Down Expand Up @@ -74,6 +72,7 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.META;
import static org.opensearch.flowframework.common.CommonValue.NO_SCHEMA_VERSION;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_MAPPING;
Expand Down Expand Up @@ -419,21 +418,32 @@ public void putInitialStateToWorkflowState(String workflowId, String tenantId, U
listener.onFailure(new FlowFrameworkException("No response to create workflow_state index", INTERNAL_SERVER_ERROR));
return;
}
IndexRequest request = new IndexRequest(WORKFLOW_STATE_INDEX);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();

) {
request.source(state.toXContent(builder, ToXContent.EMPTY_PARAMS)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
request.id(workflowId);
client.index(request, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to put state index document";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
PutDataObjectRequest putRequest = PutDataObjectRequest.builder()
.index(WORKFLOW_STATE_INDEX)
.id(workflowId)
.tenantId(tenantId)
.dataObject(state)
.build();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.putDataObjectAsync(putRequest, client.threadPool().executor(PROVISION_WORKFLOW_THREAD_POOL))
.whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
IndexResponse indexResponse = IndexResponse.fromXContent(r.parser());
listener.onResponse(indexResponse);
} catch (IOException e) {
logger.error("Failed to parse index response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse index response", INTERNAL_SERVER_ERROR));
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = "Failed to put state index document";
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
});
}

}, e -> {
String errorMessage = "Failed to create workflow_state index";
logger.error(errorMessage, e);
Expand Down

0 comments on commit 2d79e13

Please sign in to comment.