diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index c9fcd19a..4d698ae5 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -577,6 +577,29 @@ public void canDeleteWorkflowStateDoc( } } + /** + * Deletes a document in the workflow state index + * @param documentId the document ID + * @param listener action listener + */ + public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener listener) { + if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { + String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + } else { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore)); + } catch (Exception e) { + String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId; + logger.error(errorMessage, e); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + } + } + /** * Updates a complete document in the workflow state index * @param documentId the document ID @@ -641,29 +664,6 @@ public void updateFlowFrameworkSystemIndexDoc( } } - /** - * Deletes a document in the workflow state index - * @param documentId the document ID - * @param listener action listener - */ - public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener listener) { - if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { - String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; - logger.error(errorMessage); - listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); - } else { - try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId); - deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore)); - } catch (Exception e) { - String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId; - logger.error(errorMessage, e); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } - } - } - /** * Adds a resource to the state index, including common exception handling * @param currentNodeInputs Inputs to the current node