Skip to content

Commit

Permalink
Fix 1.x compatibility bug with stored Tasks (#5412) (#5440)
Browse files Browse the repository at this point in the history
When the new 'cancelled' field was introduced it was a miss not to
increment the version number on the mapping definitions for the .tasks
index. This commit fixes that oversight, as well as modifies the
existing backward compatiblity test to ensure that it will catch future
mistakes like this one.

Closes #5376

Signed-off-by: Andrew Ross <[email protected]>
(cherry picked from commit 4616dfa)
Signed-off-by: Andrew Ross <[email protected]>

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored Dec 3, 2022
1 parent b7005c7 commit f2f809e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Deprecated
### Removed
### Fixed
- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412))
### Security

## [2.4]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.hamcrest.MatcherAssert;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.test.XContentTestUtils.JsonMapView;

import java.io.IOException;
import java.util.Map;

import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ENFORCEMENT_VERSION;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -68,25 +72,7 @@ public void testSystemIndicesUpgrades() throws Exception {
}
client().performRequest(bulk);

// start a async reindex job
Request reindex = new Request("POST", "/_reindex");
reindex.setJsonEntity(
"{\n" +
" \"source\":{\n" +
" \"index\":\"test_index_old\"\n" +
" },\n" +
" \"dest\":{\n" +
" \"index\":\"test_index_reindex\"\n" +
" }\n" +
"}");
reindex.addParameter("wait_for_completion", "false");
Map<String, Object> response = entityAsMap(client().performRequest(reindex));
String taskId = (String) response.get("task");

// wait for task
Request getTask = new Request("GET", "/_tasks/" + taskId);
getTask.addParameter("wait_for_completion", "true");
client().performRequest(getTask);
createAndVerifyStoredTask();

// make sure .tasks index exists
Request getTasksIndex = new Request("GET", "/.tasks");
Expand Down Expand Up @@ -121,6 +107,8 @@ public void testSystemIndicesUpgrades() throws Exception {
assertThat(client().performRequest(putAliasRequest).getStatusLine().getStatusCode(), is(200));
}
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
createAndVerifyStoredTask();

assertBusy(() -> {
Request clusterStateRequest = new Request("GET", "/_cluster/state/metadata");
Map<String, Object> indices = new JsonMapView(entityAsMap(client().performRequest(clusterStateRequest)))
Expand Down Expand Up @@ -152,4 +140,29 @@ public void testSystemIndicesUpgrades() throws Exception {
});
}
}

/**
* Completed tasks get persisted into the .tasks index, so this method waits
* until the task is completed in order to verify that it has been successfully
* written to the index and can be retrieved.
*/
private static void createAndVerifyStoredTask() throws Exception {
// Use update by query to create an async task
final Request updateByQueryRequest = new Request("POST", "/test_index_old/_update_by_query");
updateByQueryRequest.addParameter("wait_for_completion", "false");
final Response updateByQueryResponse = client().performRequest(updateByQueryRequest);
MatcherAssert.assertThat(updateByQueryResponse.getStatusLine().getStatusCode(), equalTo(200));
final String taskId = (String) entityAsMap(updateByQueryResponse).get("task");

// wait for task to complete
waitUntil(() -> {
try {
final Response getTaskResponse = client().performRequest(new Request("GET", "/_tasks/" + taskId));
MatcherAssert.assertThat(getTaskResponse.getStatusLine().getStatusCode(), equalTo(200));
return (Boolean) entityAsMap(getTaskResponse).get("completed");
} catch (IOException e) {
throw new AssertionError(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class TaskResultsService {

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 3; // must match version in task-index-mapping.json
public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_doc" : {
"_meta": {
"version": 3
"version": 4
},
"dynamic" : "strict",
"properties" : {
Expand Down

0 comments on commit f2f809e

Please sign in to comment.