Skip to content

Commit

Permalink
[Transform] Add test for cancelling transform persistent task. (#105285)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Feb 8, 2024
1 parent 15ded61 commit 8cfcb70
Showing 1 changed file with 98 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Strings;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class TransformRobustnessIT extends TransformRestTestCase {

Expand All @@ -33,67 +39,36 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId);
String config = Strings.format("""
{
"source": {
"index": "%s"
},
"dest": {
"index": "%s"
},
"frequency": "1s",
"sync": {
"time": {
"field": "timestamp",
"delay": "1s"
}
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", indexName, transformIndex);
String config = createConfig(indexName, transformIndex);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertEquals(1, getTransforms(null).size());
assertThat(getTransforms(null), hasSize(1));
// there shouldn't be a task yet
assertEquals(0, getNumberOfTransformTasks());
assertThat(getTransformTasks(), is(empty()));
startAndWaitForContinuousTransform(transformId, transformIndex, null);
assertTrue(indexExists(transformIndex));

// a task exists
assertEquals(1, getNumberOfTransformTasks());
assertThat(getTransformTasks(), hasSize(1));
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertNotNull(getTransformState(transformId));

assertEquals(1, getTransforms(null).size());
assertThat(getTransforms(null), hasSize(1));

// delete the transform index
beEvilAndDeleteTheTransformIndex();
// transform is gone
assertEquals(0, getTransforms(List.of(Map.of("type", "dangling_task", "reason", DANGLING_TASK_ERROR_MESSAGE))).size());
assertThat(getTransforms(List.of(Map.of("type", "dangling_task", "reason", DANGLING_TASK_ERROR_MESSAGE))), is(empty()));
// but the task is still there
assertEquals(1, getNumberOfTransformTasks());
assertThat(getTransformTasks(), hasSize(1));

Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop");
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest));

assertEquals(409, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(409)));
assertThat(
e.getMessage(),
containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.")
Expand All @@ -107,7 +82,7 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

// the task is gone
assertEquals(0, getNumberOfTransformTasks());
assertThat(getTransformTasks(), is(empty()));

// delete the transform because the task might have written a state doc, cleanup fails if the index isn't empty
deleteTransform(transformId);
Expand All @@ -125,7 +100,7 @@ public void testCreateAndDeleteTransformInALoop() throws IOException {
// Wait until the transform finishes
startAndWaitForTransform(transformId, destIndex);
// After the transform finishes, there should be no transform task left
assertEquals(0, getNumberOfTransformTasks());
assertThat(getTransformTasks(), is(empty()));
// Delete the transform
deleteTransform(transformId);
} catch (AssertionError | Exception e) {
Expand All @@ -134,24 +109,66 @@ public void testCreateAndDeleteTransformInALoop() throws IOException {
}
}

public void testCancellingTransformTask() throws Exception {
createReviewsIndex();
String transformId = "cancelling_transform_task";
String transformIndex = transformId + "-dest";

// Create the transform.
Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId);
createTransformRequest.setJsonEntity(createConfig(REVIEWS_INDEX_NAME, transformIndex));
assertAcknowledged(client().performRequest(createTransformRequest));
assertThat(getTransforms(null), hasSize(1));

// Verify that there are no transform tasks yet.
assertThat(getTransformTasks(), is(empty()));

// Start the transform.
startTransform(transformId);

// Verify that the transform task already exists.
List<String> tasks = getTransformTasks();
assertThat(tasks, hasSize(1));

// Sleep for a few seconds so that we cover the task being cancelled at various stages.
Thread.sleep(randomLongBetween(0, 5_000));

// Cancel the transform task.
Request cancelTaskRequest = new Request("POST", "/_tasks/" + tasks.get(0) + "/_cancel");
assertOK(client().performRequest(cancelTaskRequest));

// Wait until the transform is stopped.
assertBusy(() -> {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
assertEquals("Stats were: " + transformStatsAsMap, "stopped", XContentMapValues.extractValue("state", transformStatsAsMap));
}, 30, TimeUnit.SECONDS);

// Verify that the transform is still present.
assertThat(getTransforms(null), hasSize(1));

// Verify that there is no transform task left.
assertThat(getTransformTasks(), is(empty()));
}

@SuppressWarnings("unchecked")
private int getNumberOfTransformTasks() throws IOException {
private List<String> getTransformTasks() throws IOException {
final Request tasksRequest = new Request("GET", "/_tasks");
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));

Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
if (nodes == null) {
return 0;
return List.of();
}

int foundTasks = 0;
List<String> foundTasks = new ArrayList<>();
for (Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
foundTasks += tasks != null ? tasks.size() : 0;
if (tasks != null) {
foundTasks.addAll(tasks.keySet());
}
}

return foundTasks;
}

Expand All @@ -167,4 +184,39 @@ private void beEvilAndDeleteTheTransformIndex() throws IOException {
);
adminClient().performRequest(deleteRequest);
}

private static String createConfig(String sourceIndex, String destIndex) {
return Strings.format("""
{
"source": {
"index": "%s"
},
"dest": {
"index": "%s"
},
"frequency": "1s",
"sync": {
"time": {
"field": "timestamp",
"delay": "1s"
}
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", sourceIndex, destIndex);
}
}

0 comments on commit 8cfcb70

Please sign in to comment.