Skip to content

Commit

Permalink
Migrate template deletion to sdkClient
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Dec 21, 2024
1 parent 38a8b70 commit 95446e8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Booleans;
import org.opensearch.action.delete.DeleteRequest;
import org.apache.logging.log4j.message.ParameterizedMessageFactory;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
Expand All @@ -28,12 +28,15 @@
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.TenantAwareHelper;
import org.opensearch.remote.metadata.client.DeleteDataObjectRequest;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.common.SdkClientUtils;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.CLEAR_STATUS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
Expand Down Expand Up @@ -106,7 +109,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
filterByEnabled,
flowFrameworkSettings.isMultiTenancyEnabled(),
listener,
() -> executeDeleteRequest(request, listener, context),
() -> executeDeleteRequest(request, tenantId, listener, context),
client,
sdkClient,
clusterService,
Expand All @@ -123,18 +126,44 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Dele
/**
* Executes the delete request
* @param request the workflow request
* @param tenantId
* @param listener the action listener
* @param context the thread context
*/
private void executeDeleteRequest(
WorkflowRequest request,
String tenantId,
ActionListener<DeleteResponse> listener,
ThreadContext.StoredContext context
) {
System.err.println("A");
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);
logger.info("Deleting workflow doc: {}", workflowId);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
DeleteDataObjectRequest deleteRequest = DeleteDataObjectRequest.builder()
.index(GLOBAL_CONTEXT_INDEX)
.id(workflowId)
.tenantId(tenantId)
.build();
sdkClient.deleteDataObjectAsync(deleteRequest, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
context.restore();
System.err.println("B");
if (throwable == null) {
System.err.println("C");
try {
DeleteResponse response = DeleteResponse.fromXContent(r.parser());
System.err.println("D " + response);
listener.onResponse(response);
} catch (Exception e) {
logger.error("Failed to parse delete response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse delete response", RestStatus.INTERNAL_SERVER_ERROR));
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage("Failed to delete template {}", workflowId)
.getFormattedMessage();
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
});

// Whether to force deletion of corresponding state
final boolean clearStatus = Booleans.parseBoolean(request.getParams().get(CLEAR_STATUS), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,8 @@ private CompletableFuture<Void> cacheMasterKeyFromConfigIndex(String tenantId) {
).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
GetResponse response;
try {
response = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
GetResponse response = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (response.isExists()) {
System.err.println("B: EXISTS");
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testFailedUpdateTemplateInGlobalContext() throws IOException {
verify(listener, times(1)).onFailure(exceptionCaptor.capture());

assertEquals(
"Failed to update template for workflow_id : 1, global_context index does not exist.",
"Failed to update template for workflow_id : 1, global context index does not exist.",
exceptionCaptor.getValue().getMessage()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
package org.opensearch.flowframework.transport;

import org.opensearch.action.DocWriteResponse.Result;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -27,25 +30,42 @@
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.AfterClass;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.mockito.ArgumentCaptor;

import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DeleteWorkflowTransportActionTests extends OpenSearchTestCase {

private static final TestThreadPool testThreadPool = new TestThreadPool(
DeleteWorkflowTransportActionTests.class.getName(),
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(Settings.EMPTY) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
)
);

private Client client;
private SdkClient sdkClient;
private DeleteWorkflowTransportAction deleteWorkflowTransportAction;
Expand All @@ -56,6 +76,7 @@ public class DeleteWorkflowTransportActionTests extends OpenSearchTestCase {
public void setUp() throws Exception {
super.setUp();
this.client = mock(Client.class);
when(client.threadPool()).thenReturn(testThreadPool);
this.sdkClient = SdkClientFactory.createSdkClient(client, NamedXContentRegistry.EMPTY, Collections.emptyMap());
this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
this.flowFrameworkSettings = mock(FlowFrameworkSettings.class);
Expand All @@ -78,13 +99,11 @@ public void setUp() throws Exception {
xContentRegistry(),
Settings.EMPTY
);
}

ThreadPool clientThreadPool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);

when(client.threadPool()).thenReturn(clientThreadPool);
when(clientThreadPool.getThreadContext()).thenReturn(threadContext);

@AfterClass
public static void cleanup() {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}

public void testDeleteWorkflowNoGlobalContext() {
Expand All @@ -100,48 +119,46 @@ public void testDeleteWorkflowNoGlobalContext() {
assertTrue(exceptionCaptor.getValue().getMessage().contains("There are no templates in the global context"));
}

public void testDeleteWorkflowSuccess() {
public void testDeleteWorkflowSuccess() throws InterruptedException {
String workflowId = "12345";
@SuppressWarnings("unchecked")
ActionListener<DeleteResponse> listener = mock(ActionListener.class);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

when(flowFrameworkIndicesHandler.doesIndexExist(anyString())).thenReturn(true);

// Stub client.delete to force on response
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1);
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
future.onResponse(new DeleteResponse(shardId, workflowId, 1, 1, 1, true));
when(client.delete(any(DeleteRequest.class))).thenReturn(future);

ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1);
responseListener.onResponse(new DeleteResponse(shardId, workflowId, 1, 1, 1, true));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<DeleteResponse> latchedActionListener = new LatchedActionListener<>(listener, latch);
deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener);
latch.await(1, TimeUnit.SECONDS);

ArgumentCaptor<DeleteResponse> responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
assertEquals(Result.DELETED, responseCaptor.getValue().getResult());
}

public void testDeleteWorkflowNotFound() {
public void testDeleteWorkflowNotFound() throws InterruptedException {
String workflowId = "12345";
@SuppressWarnings("unchecked")
ActionListener<DeleteResponse> listener = mock(ActionListener.class);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

when(flowFrameworkIndicesHandler.doesIndexExist(anyString())).thenReturn(true);

// Stub client.delete to force on response
doAnswer(invocation -> {
ActionListener<DeleteResponse> responseListener = invocation.getArgument(1);
ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1);
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
future.onResponse(new DeleteResponse(shardId, workflowId, 1, 1, 1, false));
when(client.delete(any(DeleteRequest.class))).thenReturn(future);

ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1);
responseListener.onResponse(new DeleteResponse(shardId, workflowId, 1, 1, 1, false));
return null;
}).when(client).delete(any(DeleteRequest.class), any());

deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<DeleteResponse> latchedActionListener = new LatchedActionListener<>(listener, latch);
deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener);
latch.await(1, TimeUnit.SECONDS);

ArgumentCaptor<DeleteResponse> responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class);
verify(listener, times(1)).onResponse(responseCaptor.capture());
Expand Down

0 comments on commit 95446e8

Please sign in to comment.