From 8cfcb706f8dcb5bc1cc92b703604ca42a1d93815 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Thu, 8 Feb 2024 19:09:13 +0100 Subject: [PATCH] [Transform] Add test for cancelling transform persistent task. (#105285) --- .../integration/TransformRobustnessIT.java | 144 ++++++++++++------ 1 file changed, 98 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java index f67795a807046..105ac09e356fd 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java @@ -9,17 +9,23 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.Strings; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; public class TransformRobustnessIT extends TransformRestTestCase { @@ -33,67 +39,36 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception { String transformId = "simple_continuous_pivot"; String transformIndex = "pivot_reviews_continuous"; final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId); - String config = Strings.format(""" - { - "source": { - "index": "%s" - }, - "dest": { - "index": "%s" - }, - "frequency": "1s", - "sync": { - "time": { - "field": "timestamp", - "delay": "1s" - } - }, - "pivot": { - "group_by": { - "reviewer": { - "terms": { - "field": "user_id" - } - } - }, - "aggregations": { - "avg_rating": { - "avg": { - "field": "stars" - } - } - } - } - }""", indexName, transformIndex); + String config = createConfig(indexName, transformIndex); createTransformRequest.setJsonEntity(config); Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertEquals(1, getTransforms(null).size()); + assertThat(getTransforms(null), hasSize(1)); // there shouldn't be a task yet - assertEquals(0, getNumberOfTransformTasks()); + assertThat(getTransformTasks(), is(empty())); startAndWaitForContinuousTransform(transformId, transformIndex, null); assertTrue(indexExists(transformIndex)); // a task exists - assertEquals(1, getNumberOfTransformTasks()); + assertThat(getTransformTasks(), hasSize(1)); // get and check some users assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417); assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72); assertNotNull(getTransformState(transformId)); - assertEquals(1, getTransforms(null).size()); + assertThat(getTransforms(null), hasSize(1)); // delete the transform index beEvilAndDeleteTheTransformIndex(); // transform is gone - assertEquals(0, getTransforms(List.of(Map.of("type", "dangling_task", "reason", DANGLING_TASK_ERROR_MESSAGE))).size()); + assertThat(getTransforms(List.of(Map.of("type", "dangling_task", "reason", DANGLING_TASK_ERROR_MESSAGE))), is(empty())); // but the task is still there - assertEquals(1, getNumberOfTransformTasks()); + assertThat(getTransformTasks(), hasSize(1)); Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop"); ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest)); - assertEquals(409, e.getResponse().getStatusLine().getStatusCode()); + assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(409))); assertThat( e.getMessage(), containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.") @@ -107,7 +82,7 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception { assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); // the task is gone - assertEquals(0, getNumberOfTransformTasks()); + assertThat(getTransformTasks(), is(empty())); // delete the transform because the task might have written a state doc, cleanup fails if the index isn't empty deleteTransform(transformId); @@ -125,7 +100,7 @@ public void testCreateAndDeleteTransformInALoop() throws IOException { // Wait until the transform finishes startAndWaitForTransform(transformId, destIndex); // After the transform finishes, there should be no transform task left - assertEquals(0, getNumberOfTransformTasks()); + assertThat(getTransformTasks(), is(empty())); // Delete the transform deleteTransform(transformId); } catch (AssertionError | Exception e) { @@ -134,24 +109,66 @@ public void testCreateAndDeleteTransformInALoop() throws IOException { } } + public void testCancellingTransformTask() throws Exception { + createReviewsIndex(); + String transformId = "cancelling_transform_task"; + String transformIndex = transformId + "-dest"; + + // Create the transform. + Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId); + createTransformRequest.setJsonEntity(createConfig(REVIEWS_INDEX_NAME, transformIndex)); + assertAcknowledged(client().performRequest(createTransformRequest)); + assertThat(getTransforms(null), hasSize(1)); + + // Verify that there are no transform tasks yet. + assertThat(getTransformTasks(), is(empty())); + + // Start the transform. + startTransform(transformId); + + // Verify that the transform task already exists. + List tasks = getTransformTasks(); + assertThat(tasks, hasSize(1)); + + // Sleep for a few seconds so that we cover the task being cancelled at various stages. + Thread.sleep(randomLongBetween(0, 5_000)); + + // Cancel the transform task. + Request cancelTaskRequest = new Request("POST", "/_tasks/" + tasks.get(0) + "/_cancel"); + assertOK(client().performRequest(cancelTaskRequest)); + + // Wait until the transform is stopped. + assertBusy(() -> { + Map transformStatsAsMap = getTransformStateAndStats(transformId); + assertEquals("Stats were: " + transformStatsAsMap, "stopped", XContentMapValues.extractValue("state", transformStatsAsMap)); + }, 30, TimeUnit.SECONDS); + + // Verify that the transform is still present. + assertThat(getTransforms(null), hasSize(1)); + + // Verify that there is no transform task left. + assertThat(getTransformTasks(), is(empty())); + } + @SuppressWarnings("unchecked") - private int getNumberOfTransformTasks() throws IOException { + private List getTransformTasks() throws IOException { final Request tasksRequest = new Request("GET", "/_tasks"); tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*"); Map tasksResponse = entityAsMap(client().performRequest(tasksRequest)); Map nodes = (Map) tasksResponse.get("nodes"); if (nodes == null) { - return 0; + return List.of(); } - int foundTasks = 0; + List foundTasks = new ArrayList<>(); for (Entry node : nodes.entrySet()) { Map nodeInfo = (Map) node.getValue(); Map tasks = (Map) nodeInfo.get("tasks"); - foundTasks += tasks != null ? tasks.size() : 0; + if (tasks != null) { + foundTasks.addAll(tasks.keySet()); + } } - return foundTasks; } @@ -167,4 +184,39 @@ private void beEvilAndDeleteTheTransformIndex() throws IOException { ); adminClient().performRequest(deleteRequest); } + + private static String createConfig(String sourceIndex, String destIndex) { + return Strings.format(""" + { + "source": { + "index": "%s" + }, + "dest": { + "index": "%s" + }, + "frequency": "1s", + "sync": { + "time": { + "field": "timestamp", + "delay": "1s" + } + }, + "pivot": { + "group_by": { + "reviewer": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "avg_rating": { + "avg": { + "field": "stars" + } + } + } + } + }""", sourceIndex, destIndex); + } }