diff --git a/CHANGELOG.md b/CHANGELOG.md index 97aad69d80b46..44742bef28c0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724)) +- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) @@ -15,11 +16,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `reactor` from 3.5.17 to 3.5.18 ([#14395](https://github.com/opensearch-project/OpenSearch/pull/14395)) - Bump `reactor-netty` from 1.1.19 to 1.1.20 ([#14395](https://github.com/opensearch-project/OpenSearch/pull/14395)) - Bump `commons-net:commons-net` from 3.10.0 to 3.11.1 ([#14396](https://github.com/opensearch-project/OpenSearch/pull/14396)) +- Bump `com.nimbusds:nimbus-jose-jwt` from 9.37.3 to 9.40 ([#14398](https://github.com/opensearch-project/OpenSearch/pull/14398)) - Bump `org.apache.commons:commons-configuration2` from 2.10.1 to 2.11.0 ([#14399](https://github.com/opensearch-project/OpenSearch/pull/14399)) - Bump `com.gradle.develocity` from 3.17.4 to 3.17.5 ([#14397](https://github.com/opensearch-project/OpenSearch/pull/14397)) ### Changed - unsignedLongRangeQuery now returns MatchNoDocsQuery if the lower bounds are greater than the upper bounds ([#14416](https://github.com/opensearch-project/OpenSearch/pull/14416)) +- Updated the `indices.query.bool.max_clause_count` setting from being static to dynamically updateable ([#13568](https://github.com/opensearch-project/OpenSearch/pull/13568)) +- Make the class CommunityIdProcessor final ([#14448](https://github.com/opensearch-project/OpenSearch/pull/14448)) ### Deprecated @@ -27,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379)) +- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086)) ### Security diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index bc11e7335af49..03cd189aa911e 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -651,16 +651,18 @@ Note that these snapshots do not follow the Maven [naming convention](https://ma ### Flaky Tests -OpenSearch has a very large test suite with long running, often failing (flaky), integration tests. Such individual tests are labelled as [Flaky Random Test Failure](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aopen+is%3Aissue+label%3A%22flaky-test%22). Your help is wanted fixing these! +If you encounter a test failure locally or in CI that is seemingly unrelated to the change in your pull request, it may be a known flaky test or a new test failure. OpenSearch has a very large test suite with long running, often failing (flaky), integration tests. Such individual tests are labelled as [Flaky Random Test Failure](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aopen+is%3Aissue+label%3A%22flaky-test%22). Your help is wanted fixing these! -If you encounter a build/test failure in CI that is unrelated to the change in your pull request, it may be a known flaky test, or a new test failure. +The automation [gradle-check-flaky-test-detector](https://build.ci.opensearch.org/job/gradle-check-flaky-test-detector/), which runs in OpenSearch public Jenkins, identifies failing flaky issues that are part of post-merge actions. Once a flaky test is identified, the automation creates an issue with detailed report that includes links to all relevant commits, the Gradle check build log, the test report, and pull requests that are impacted with the flaky test failures. This automation leverages data from the [OpenSearch Metrics Project](https://github.com/opensearch-project/opensearch-metrics) to establish a baseline for creating the issue and updating the flaky test report. For all flaky test issues created by automation, visit this [link](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aissue+is%3Aopen+label%3A%3Etest-failure+author%3Aopensearch-ci-bot). + +If you still see a failing test that is not part of the post merge actions, please do: + +* Follow failed CI links, and locate the failing test(s) or use the [Gradle Check Metrics Dashboard](#gradle-check-metrics-dashboard). +* Copy-paste the failure into a comment of your PR. +* Search through issues using the name of the failed test for whether this is a known flaky test. +* If no existing issue is found, open one. +* Retry CI via the GitHub UX or by pushing an update to your PR. -1. Follow failed CI links, and locate the failing test(s). -2. Copy-paste the failure into a comment of your PR. -3. Search through [issues](https://github.com/opensearch-project/OpenSearch/issues?q=is%3Aopen+is%3Aissue+label%3A%22flaky-test%22) using the name of the failed test for whether this is a known flaky test. -4. If an existing issue is found, paste a link to the known issue in a comment to your PR. -5. If no existing issue is found, open one. -6. Retry CI via the GitHub UX or by pushing an update to your PR. ### Gradle Check Metrics Dashboard diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java b/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java index c9e18426966f9..e8772522b19a4 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testfixtures/TestFixturesPlugin.java @@ -34,6 +34,7 @@ import com.avast.gradle.dockercompose.ComposeExtension; import com.avast.gradle.dockercompose.DockerComposePlugin; import com.avast.gradle.dockercompose.ServiceInfo; +import com.avast.gradle.dockercompose.tasks.ComposeBuild; import com.avast.gradle.dockercompose.tasks.ComposeDown; import com.avast.gradle.dockercompose.tasks.ComposePull; import com.avast.gradle.dockercompose.tasks.ComposeUp; @@ -200,6 +201,7 @@ public void execute(Task task) { maybeSkipTasks(tasks, dockerSupport, getTaskClass("org.opensearch.gradle.test.RestIntegTestTask")); maybeSkipTasks(tasks, dockerSupport, TestingConventionsTasks.class); maybeSkipTasks(tasks, dockerSupport, getTaskClass("org.opensearch.gradle.test.AntFixture")); + maybeSkipTasks(tasks, dockerSupport, ComposeBuild.class); maybeSkipTasks(tasks, dockerSupport, ComposeUp.class); maybeSkipTasks(tasks, dockerSupport, ComposePull.class); maybeSkipTasks(tasks, dockerSupport, ComposeDown.class); diff --git a/buildSrc/version.properties b/buildSrc/version.properties index eb96261b056e3..af421b97e12e4 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -1,5 +1,5 @@ opensearch = 3.0.0 -lucene = 9.11.0 +lucene = 9.12.0-snapshot-c896995 bundled_jdk_vendor = adoptium bundled_jdk = 21.0.3+9 diff --git a/libs/core/licenses/lucene-core-9.11.0.jar.sha1 b/libs/core/licenses/lucene-core-9.11.0.jar.sha1 deleted file mode 100644 index b0d38c4165581..0000000000000 --- a/libs/core/licenses/lucene-core-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -2e487755a6814b2a1bc770c26569dcba86873dcf \ No newline at end of file diff --git a/libs/core/licenses/lucene-core-9.12.0-snapshot-c896995.jar.sha1 b/libs/core/licenses/lucene-core-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..299283562fddc --- /dev/null +++ b/libs/core/licenses/lucene-core-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +826b328c37ea7f27c05d685db03bf8d2b00457ff \ No newline at end of file diff --git a/libs/core/src/main/java/org/opensearch/Version.java b/libs/core/src/main/java/org/opensearch/Version.java index d99dae2a5e64b..0cb2d4f867c12 100644 --- a/libs/core/src/main/java/org/opensearch/Version.java +++ b/libs/core/src/main/java/org/opensearch/Version.java @@ -106,7 +106,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_14_1 = new Version(2140199, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_2_15_0 = new Version(2150099, org.apache.lucene.util.Version.LUCENE_9_10_0); public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_0); - public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_11_0); + public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0); public static final Version CURRENT = V_3_0_0; public static Version fromId(int id) { diff --git a/libs/geo/src/main/java/org/opensearch/geometry/utils/WellKnownText.java b/libs/geo/src/main/java/org/opensearch/geometry/utils/WellKnownText.java index ed1d63e6d4fef..8ad135b8bc1ca 100644 --- a/libs/geo/src/main/java/org/opensearch/geometry/utils/WellKnownText.java +++ b/libs/geo/src/main/java/org/opensearch/geometry/utils/WellKnownText.java @@ -49,8 +49,10 @@ import java.io.StreamTokenizer; import java.io.StringReader; import java.text.ParseException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Locale; @@ -67,6 +69,7 @@ public class WellKnownText { public static final String RPAREN = ")"; public static final String COMMA = ","; public static final String NAN = "NaN"; + public static final int MAX_DEPTH_OF_GEO_COLLECTION = 1000; private final String NUMBER = ""; private final String EOF = "END-OF-STREAM"; @@ -278,6 +281,16 @@ public Geometry fromWKT(String wkt) throws IOException, ParseException { */ private Geometry parseGeometry(StreamTokenizer stream) throws IOException, ParseException { final String type = nextWord(stream).toLowerCase(Locale.ROOT); + switch (type) { + case "geometrycollection": + return parseGeometryCollection(stream); + default: + return parseSimpleGeometry(stream, type); + } + } + + private Geometry parseSimpleGeometry(StreamTokenizer stream, String type) throws IOException, ParseException { + assert "geometrycollection".equals(type) == false; switch (type) { case "point": return parsePoint(stream); @@ -294,7 +307,7 @@ private Geometry parseGeometry(StreamTokenizer stream) throws IOException, Parse case "bbox": return parseBBox(stream); case "geometrycollection": - return parseGeometryCollection(stream); + throw new IllegalStateException("Unexpected type: geometrycollection"); case "circle": // Not part of the standard, but we need it for internal serialization return parseCircle(stream); } @@ -305,12 +318,56 @@ private GeometryCollection parseGeometryCollection(StreamTokenizer str if (nextEmptyOrOpen(stream).equals(EMPTY)) { return GeometryCollection.EMPTY; } - List shapes = new ArrayList<>(); - shapes.add(parseGeometry(stream)); - while (nextCloserOrComma(stream).equals(COMMA)) { - shapes.add(parseGeometry(stream)); + + List topLevelShapes = new ArrayList<>(); + Deque> deque = new ArrayDeque<>(); + deque.push(topLevelShapes); + boolean isFirstIteration = true; + List currentLevelShapes = null; + while (!deque.isEmpty()) { + List previousShapes = deque.pop(); + if (currentLevelShapes != null) { + previousShapes.add(new GeometryCollection<>(currentLevelShapes)); + } + currentLevelShapes = previousShapes; + + if (isFirstIteration == true) { + isFirstIteration = false; + } else { + if (nextCloserOrComma(stream).equals(COMMA) == false) { + // Done with current level, continue with parent level + continue; + } + } + while (true) { + final String type = nextWord(stream).toLowerCase(Locale.ROOT); + if (type.equals("geometrycollection")) { + if (nextEmptyOrOpen(stream).equals(EMPTY) == false) { + // GEOMETRYCOLLECTION() -> 1 depth, GEOMETRYCOLLECTION(GEOMETRYCOLLECTION()) -> 2 depth + // When parsing the top level geometry collection, the queue size is zero. + // When max depth is 1, we don't want to push any sub geometry collection in the queue. + // Therefore, we subtract 2 from max depth. + if (deque.size() >= MAX_DEPTH_OF_GEO_COLLECTION - 2) { + throw new IllegalArgumentException( + "a geometry collection with a depth greater than " + MAX_DEPTH_OF_GEO_COLLECTION + " is not supported" + ); + } + deque.push(currentLevelShapes); + currentLevelShapes = new ArrayList<>(); + continue; + } + currentLevelShapes.add(GeometryCollection.EMPTY); + } else { + currentLevelShapes.add(parseSimpleGeometry(stream, type)); + } + + if (nextCloserOrComma(stream).equals(COMMA) == false) { + break; + } + } } - return new GeometryCollection<>(shapes); + + return new GeometryCollection<>(topLevelShapes); } private Point parsePoint(StreamTokenizer stream) throws IOException, ParseException { diff --git a/libs/geo/src/test/java/org/opensearch/geometry/GeometryCollectionTests.java b/libs/geo/src/test/java/org/opensearch/geometry/GeometryCollectionTests.java index 631b6456a77da..cd8bb8f585966 100644 --- a/libs/geo/src/test/java/org/opensearch/geometry/GeometryCollectionTests.java +++ b/libs/geo/src/test/java/org/opensearch/geometry/GeometryCollectionTests.java @@ -62,6 +62,11 @@ public void testBasicSerialization() throws IOException, ParseException { assertEquals("GEOMETRYCOLLECTION EMPTY", wkt.toWKT(GeometryCollection.EMPTY)); assertEquals(GeometryCollection.EMPTY, wkt.fromWKT("GEOMETRYCOLLECTION EMPTY)")); + + assertEquals( + new GeometryCollection(Arrays.asList(GeometryCollection.EMPTY)), + wkt.fromWKT("GEOMETRYCOLLECTION (GEOMETRYCOLLECTION EMPTY)") + ); } @SuppressWarnings("ConstantConditions") @@ -86,4 +91,29 @@ public void testInitValidation() { new StandardValidator(true).validate(new GeometryCollection(Collections.singletonList(new Point(20, 10, 30)))); } + + public void testDeeplyNestedGeometryCollection() throws IOException, ParseException { + WellKnownText wkt = new WellKnownText(true, new GeographyValidator(true)); + StringBuilder validGeometryCollectionHead = new StringBuilder("GEOMETRYCOLLECTION"); + StringBuilder validGeometryCollectionTail = new StringBuilder(" EMPTY"); + for (int i = 0; i < WellKnownText.MAX_DEPTH_OF_GEO_COLLECTION - 1; i++) { + validGeometryCollectionHead.append(" (GEOMETRYCOLLECTION"); + validGeometryCollectionTail.append(")"); + } + // Expect no exception + wkt.fromWKT(validGeometryCollectionHead.append(validGeometryCollectionTail).toString()); + + StringBuilder invalidGeometryCollectionHead = new StringBuilder("GEOMETRYCOLLECTION"); + StringBuilder invalidGeometryCollectionTail = new StringBuilder(" EMPTY"); + for (int i = 0; i < WellKnownText.MAX_DEPTH_OF_GEO_COLLECTION; i++) { + invalidGeometryCollectionHead.append(" (GEOMETRYCOLLECTION"); + invalidGeometryCollectionTail.append(")"); + } + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> wkt.fromWKT(invalidGeometryCollectionHead.append(invalidGeometryCollectionTail).toString()) + ); + assertEquals("a geometry collection with a depth greater than 1000 is not supported", ex.getMessage()); + } } diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java index c968fb2f6c2da..c84892971c87e 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/CommunityIdProcessor.java @@ -29,7 +29,7 @@ * Processor that generating community id flow hash for the network flow tuples, the algorithm is defined in * Community ID Flow Hashing. */ -public class CommunityIdProcessor extends AbstractProcessor { +public final class CommunityIdProcessor extends AbstractProcessor { public static final String TYPE = "community_id"; // the version of the community id flow hashing algorithm private static final String COMMUNITY_ID_HASH_VERSION = "1"; diff --git a/modules/lang-expression/licenses/lucene-expressions-9.11.0.jar.sha1 b/modules/lang-expression/licenses/lucene-expressions-9.11.0.jar.sha1 deleted file mode 100644 index 29aade3ad4298..0000000000000 --- a/modules/lang-expression/licenses/lucene-expressions-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5e21d20edee0712472e7c6f605c9d97aeecf16c0 \ No newline at end of file diff --git a/modules/lang-expression/licenses/lucene-expressions-9.12.0-snapshot-c896995.jar.sha1 b/modules/lang-expression/licenses/lucene-expressions-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..6d8d3be59f945 --- /dev/null +++ b/modules/lang-expression/licenses/lucene-expressions-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +9f0321cf2d34fca3f1f9334fdfee2b79d9d27444 \ No newline at end of file diff --git a/plugins/analysis-icu/licenses/lucene-analysis-icu-9.11.0.jar.sha1 b/plugins/analysis-icu/licenses/lucene-analysis-icu-9.11.0.jar.sha1 deleted file mode 100644 index 6f0501d3312ae..0000000000000 --- a/plugins/analysis-icu/licenses/lucene-analysis-icu-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5c7f2d8eab0fca3fdc3d3e57a7f48a335dc7ac33 \ No newline at end of file diff --git a/plugins/analysis-icu/licenses/lucene-analysis-icu-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-icu/licenses/lucene-analysis-icu-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..696803bf63b46 --- /dev/null +++ b/plugins/analysis-icu/licenses/lucene-analysis-icu-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +e6314f36fb29e208d58c0470f14269c9c36996ba \ No newline at end of file diff --git a/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.11.0.jar.sha1 b/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.11.0.jar.sha1 deleted file mode 100644 index 25031381c9cb3..0000000000000 --- a/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -efcf65dda1b4e9d7e83926fd5895a47e491cbf29 \ No newline at end of file diff --git a/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..7a12077d7fc62 --- /dev/null +++ b/plugins/analysis-kuromoji/licenses/lucene-analysis-kuromoji-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +77fbf1e37af79715f28f66d8cc5b50af2982fc54 \ No newline at end of file diff --git a/plugins/analysis-nori/licenses/lucene-analysis-nori-9.11.0.jar.sha1 b/plugins/analysis-nori/licenses/lucene-analysis-nori-9.11.0.jar.sha1 deleted file mode 100644 index e27d45b217dad..0000000000000 --- a/plugins/analysis-nori/licenses/lucene-analysis-nori-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -59599d7b8bed2e6bd27d0dad7935c078b98c39cc \ No newline at end of file diff --git a/plugins/analysis-nori/licenses/lucene-analysis-nori-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-nori/licenses/lucene-analysis-nori-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..efed62c7e5e5b --- /dev/null +++ b/plugins/analysis-nori/licenses/lucene-analysis-nori-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +a7a4e9c6004c72782e1002e1dcfaf4fbab7887d8 \ No newline at end of file diff --git a/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.11.0.jar.sha1 b/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.11.0.jar.sha1 deleted file mode 100644 index ad5473865537d..0000000000000 --- a/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e55f83bb373ac139e313f64e80afe1eb0a75b8c0 \ No newline at end of file diff --git a/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..f2020abcb8ef7 --- /dev/null +++ b/plugins/analysis-phonetic/licenses/lucene-analysis-phonetic-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +42ac148a3769d6eb880d7f184d1917bad48ca303 \ No newline at end of file diff --git a/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.11.0.jar.sha1 b/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.11.0.jar.sha1 deleted file mode 100644 index 68abd162e7266..0000000000000 --- a/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -1be59d91c45a4de069611fb7f8aa3e8fd26020ec \ No newline at end of file diff --git a/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..b64e4061311e5 --- /dev/null +++ b/plugins/analysis-smartcn/licenses/lucene-analysis-smartcn-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +adf2a25339ac8722647f8196288c1f5056bbf0de \ No newline at end of file diff --git a/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.11.0.jar.sha1 b/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.11.0.jar.sha1 deleted file mode 100644 index c5f1521ec3769..0000000000000 --- a/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -d5b5922acf3743b5a0c542959dd93fca8be333a7 \ No newline at end of file diff --git a/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..f56e7fc5df766 --- /dev/null +++ b/plugins/analysis-stempel/licenses/lucene-analysis-stempel-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +a689e3af2015b21b7b4f41a1206b50c44519b6f7 \ No newline at end of file diff --git a/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.11.0.jar.sha1 b/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.11.0.jar.sha1 deleted file mode 100644 index b676ca507467a..0000000000000 --- a/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -50fd7b471cbdd6648c4972169f3fc67fae9db7f6 \ No newline at end of file diff --git a/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.12.0-snapshot-c896995.jar.sha1 b/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..30732e3c4a688 --- /dev/null +++ b/plugins/analysis-ukrainian/licenses/lucene-analysis-morfologik-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +c875f7706ee81b1fb0b3443767a8c9c52f30abc5 \ No newline at end of file diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 61e9f71712eaf..0fd30af71dd0a 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -63,7 +63,7 @@ dependencies { api "net.java.dev.jna:jna-platform:${versions.jna}" api 'com.microsoft.azure:msal4j:1.14.3' api 'com.nimbusds:oauth2-oidc-sdk:11.9.1' - api 'com.nimbusds:nimbus-jose-jwt:9.37.3' + api 'com.nimbusds:nimbus-jose-jwt:9.40' api 'com.nimbusds:content-type:2.3' api 'com.nimbusds:lang-tag:1.7' // Both msal4j:1.14.3 and oauth2-oidc-sdk:11.9.1 has compile dependency on different versions of json-smart, @@ -219,11 +219,6 @@ thirdPartyAudit { 'org.bouncycastle.cert.X509CertificateHolder', 'org.bouncycastle.cert.jcajce.JcaX509CertificateHolder', 'org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder', - 'org.bouncycastle.crypto.InvalidCipherTextException', - 'org.bouncycastle.crypto.engines.AESEngine', - 'org.bouncycastle.crypto.modes.GCMBlockCipher', - 'org.bouncycastle.jcajce.provider.BouncyCastleFipsProvider', - 'org.bouncycastle.jce.provider.BouncyCastleProvider', 'org.bouncycastle.openssl.PEMKeyPair', 'org.bouncycastle.openssl.PEMParser', 'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter', diff --git a/plugins/repository-azure/licenses/nimbus-jose-jwt-9.37.3.jar.sha1 b/plugins/repository-azure/licenses/nimbus-jose-jwt-9.37.3.jar.sha1 deleted file mode 100644 index 7278cd8994f71..0000000000000 --- a/plugins/repository-azure/licenses/nimbus-jose-jwt-9.37.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -700f71ffefd60c16bd8ce711a956967ea9071cec \ No newline at end of file diff --git a/plugins/repository-azure/licenses/nimbus-jose-jwt-9.40.jar.sha1 b/plugins/repository-azure/licenses/nimbus-jose-jwt-9.40.jar.sha1 new file mode 100644 index 0000000000000..83228caf233cc --- /dev/null +++ b/plugins/repository-azure/licenses/nimbus-jose-jwt-9.40.jar.sha1 @@ -0,0 +1 @@ +42b1dfa0360e4062951b070bac52dd8d96fd7b38 \ No newline at end of file diff --git a/server/licenses/lucene-analysis-common-9.11.0.jar.sha1 b/server/licenses/lucene-analysis-common-9.11.0.jar.sha1 deleted file mode 100644 index 7139f6a43a15a..0000000000000 --- a/server/licenses/lucene-analysis-common-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -75a0a333cf1e043102743066c929e65fe51cbcda \ No newline at end of file diff --git a/server/licenses/lucene-analysis-common-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-analysis-common-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..4b545e061c52f --- /dev/null +++ b/server/licenses/lucene-analysis-common-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +73696492c6e59972974cd91e03ad9464e6b5bfcd \ No newline at end of file diff --git a/server/licenses/lucene-backward-codecs-9.11.0.jar.sha1 b/server/licenses/lucene-backward-codecs-9.11.0.jar.sha1 deleted file mode 100644 index 735e80b60b001..0000000000000 --- a/server/licenses/lucene-backward-codecs-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -db385446bc3fd70e7c6a744276c0a157bd60ee0a \ No newline at end of file diff --git a/server/licenses/lucene-backward-codecs-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-backward-codecs-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..ae4ffb2b1800b --- /dev/null +++ b/server/licenses/lucene-backward-codecs-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +3cbb29ecc873e8c880a6f32e739655551708dbcf \ No newline at end of file diff --git a/server/licenses/lucene-core-9.11.0.jar.sha1 b/server/licenses/lucene-core-9.11.0.jar.sha1 deleted file mode 100644 index b0d38c4165581..0000000000000 --- a/server/licenses/lucene-core-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -2e487755a6814b2a1bc770c26569dcba86873dcf \ No newline at end of file diff --git a/server/licenses/lucene-core-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-core-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..299283562fddc --- /dev/null +++ b/server/licenses/lucene-core-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +826b328c37ea7f27c05d685db03bf8d2b00457ff \ No newline at end of file diff --git a/server/licenses/lucene-grouping-9.11.0.jar.sha1 b/server/licenses/lucene-grouping-9.11.0.jar.sha1 deleted file mode 100644 index 562de95605b60..0000000000000 --- a/server/licenses/lucene-grouping-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -882bdaf209b0acb332aa34836616424bcbecf462 \ No newline at end of file diff --git a/server/licenses/lucene-grouping-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-grouping-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..b0268c98167d3 --- /dev/null +++ b/server/licenses/lucene-grouping-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +a3a7003dc83197523e830f058a3748dbea96cab7 \ No newline at end of file diff --git a/server/licenses/lucene-highlighter-9.11.0.jar.sha1 b/server/licenses/lucene-highlighter-9.11.0.jar.sha1 deleted file mode 100644 index e0ef36d321c9d..0000000000000 --- a/server/licenses/lucene-highlighter-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -44accdc03c5482e602718f7bf91e5940ba4e4870 \ No newline at end of file diff --git a/server/licenses/lucene-highlighter-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-highlighter-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..d87927364b5a8 --- /dev/null +++ b/server/licenses/lucene-highlighter-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +00eb386915c3cffa9efcef2dc4c406f8a6776afe \ No newline at end of file diff --git a/server/licenses/lucene-join-9.11.0.jar.sha1 b/server/licenses/lucene-join-9.11.0.jar.sha1 deleted file mode 100644 index 34c618ccfbcc7..0000000000000 --- a/server/licenses/lucene-join-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -32a30ee03ed4f3e43bf63250270b2d4d53050045 \ No newline at end of file diff --git a/server/licenses/lucene-join-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-join-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..25a95546ab544 --- /dev/null +++ b/server/licenses/lucene-join-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +bb1fc572da7d473bf39672fd8ac323b15a1ffff0 \ No newline at end of file diff --git a/server/licenses/lucene-memory-9.11.0.jar.sha1 b/server/licenses/lucene-memory-9.11.0.jar.sha1 deleted file mode 100644 index d730cfb4b7660..0000000000000 --- a/server/licenses/lucene-memory-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -b3e80aa6aa3299118e76a23edc23b58f3ba5a515 \ No newline at end of file diff --git a/server/licenses/lucene-memory-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-memory-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..a0b3fd812561c --- /dev/null +++ b/server/licenses/lucene-memory-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +05ebfcef0435f4870859a19c93020e24398bb939 \ No newline at end of file diff --git a/server/licenses/lucene-misc-9.11.0.jar.sha1 b/server/licenses/lucene-misc-9.11.0.jar.sha1 deleted file mode 100644 index 9be27f004435b..0000000000000 --- a/server/licenses/lucene-misc-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -54fe308908194e1b0697a1157a45c5998c9e1083 \ No newline at end of file diff --git a/server/licenses/lucene-misc-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-misc-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..1e2cc97c37257 --- /dev/null +++ b/server/licenses/lucene-misc-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +d5747ed1be242b59aa36b0c32b0d3bd26b1d8fb8 \ No newline at end of file diff --git a/server/licenses/lucene-queries-9.11.0.jar.sha1 b/server/licenses/lucene-queries-9.11.0.jar.sha1 deleted file mode 100644 index b445610c25858..0000000000000 --- a/server/licenses/lucene-queries-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -987d1286949ddf514b8405fd453ed47bebdfb12d \ No newline at end of file diff --git a/server/licenses/lucene-queries-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-queries-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..31d4fe2886fc1 --- /dev/null +++ b/server/licenses/lucene-queries-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +fb6678d7fe035e55c545450682b67be49457ef1b \ No newline at end of file diff --git a/server/licenses/lucene-queryparser-9.11.0.jar.sha1 b/server/licenses/lucene-queryparser-9.11.0.jar.sha1 deleted file mode 100644 index a1620ba9c7708..0000000000000 --- a/server/licenses/lucene-queryparser-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e97fe1c0d102edb8d6e1c01454992fd2b8d80ae0 \ No newline at end of file diff --git a/server/licenses/lucene-queryparser-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-queryparser-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..754e4ea20765f --- /dev/null +++ b/server/licenses/lucene-queryparser-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +a11d7f56a9e78dc8e61f85b9b54ad94d73583bb3 \ No newline at end of file diff --git a/server/licenses/lucene-sandbox-9.11.0.jar.sha1 b/server/licenses/lucene-sandbox-9.11.0.jar.sha1 deleted file mode 100644 index 0dc193f054973..0000000000000 --- a/server/licenses/lucene-sandbox-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5e46b790744bd9118ccc053f70235364213312a5 \ No newline at end of file diff --git a/server/licenses/lucene-sandbox-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-sandbox-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..08c2bc48ae85b --- /dev/null +++ b/server/licenses/lucene-sandbox-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +75352855bcc052abfba821f878a27fd2b328fb1c \ No newline at end of file diff --git a/server/licenses/lucene-spatial-extras-9.11.0.jar.sha1 b/server/licenses/lucene-spatial-extras-9.11.0.jar.sha1 deleted file mode 100644 index 9d3a8d2857db6..0000000000000 --- a/server/licenses/lucene-spatial-extras-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -079ca5aaf544a3acde84b8b88423ace6dedc23eb \ No newline at end of file diff --git a/server/licenses/lucene-spatial-extras-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-spatial-extras-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..5e0b7196f48c2 --- /dev/null +++ b/server/licenses/lucene-spatial-extras-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +299be103216d67ca092bef177642b275224e77a6 \ No newline at end of file diff --git a/server/licenses/lucene-spatial3d-9.11.0.jar.sha1 b/server/licenses/lucene-spatial3d-9.11.0.jar.sha1 deleted file mode 100644 index fd5ff875a0113..0000000000000 --- a/server/licenses/lucene-spatial3d-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -564558818d70fc384db5b36fbc8a0ab27b107609 \ No newline at end of file diff --git a/server/licenses/lucene-spatial3d-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-spatial3d-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..c79b34adea5e2 --- /dev/null +++ b/server/licenses/lucene-spatial3d-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +29b4a76cd0bdabe0e067063831e661dedac6e503 \ No newline at end of file diff --git a/server/licenses/lucene-suggest-9.11.0.jar.sha1 b/server/licenses/lucene-suggest-9.11.0.jar.sha1 deleted file mode 100644 index 2fa96e97f307a..0000000000000 --- a/server/licenses/lucene-suggest-9.11.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -aa345db9b6caaf881e7890ea5b8911357d592167 \ No newline at end of file diff --git a/server/licenses/lucene-suggest-9.12.0-snapshot-c896995.jar.sha1 b/server/licenses/lucene-suggest-9.12.0-snapshot-c896995.jar.sha1 new file mode 100644 index 0000000000000..8d5334f0c4619 --- /dev/null +++ b/server/licenses/lucene-suggest-9.12.0-snapshot-c896995.jar.sha1 @@ -0,0 +1 @@ +597edb659e9ea93398a816e6837da7d47ef53873 \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java index a1122f279c7e4..acbd68fff6dd0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java @@ -42,24 +42,32 @@ import org.opensearch.Version; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.admin.indices.shrink.ResizeType; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.query.TermsQueryBuilder; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.VersionUtils; +import org.junit.Before; import java.util.concurrent.ExecutionException; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase { @@ -69,6 +77,11 @@ protected boolean forbidPrivateIndexSettings() { return false; } + @Before + public void setup() { + asyncUploadMockFsRepo = true; + } + public void testCreateCloneIndex() { Version version = VersionUtils.randomIndexCompatibleVersion(random()); int numPrimaryShards = randomIntBetween(1, 5); @@ -140,6 +153,79 @@ public void testCreateCloneIndex() { } + public void testCreateCloneIndexLowPriorityRateLimit() { + Version version = VersionUtils.randomIndexCompatibleVersion(random()); + int numPrimaryShards = 1; + prepareCreate("source").setSettings( + Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version) + ).get(); + final int docs = randomIntBetween(0, 128); + for (int i = 0; i < docs; i++) { + client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get(); + } + ByteSizeValue shardSize = client().admin().indices().prepareStats("source").execute().actionGet().getShards()[0].getStats() + .getStore() + .size(); + logger.info("Shard size is {}", shardSize); + internalCluster().ensureAtLeastNumDataNodes(2); + // ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node + // if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due + // to the require._name below. + ensureGreen(); + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get(); + ensureGreen(); + + // disable rebalancing to be able to capture the right stats. balancing can move the target primary + // making it hard to pin point the source shards. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")) + .get(); + try { + // apply rate limiter + setLowPriorityUploadRate(REPOSITORY_NAME, "1kb"); + assertAcked( + client().admin() + .indices() + .prepareResizeIndex("source", "target") + .setResizeType(ResizeType.CLONE) + .setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build()) + .get() + ); + ensureGreen(); + long uploadPauseTime = 0L; + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getLowPriorityRemoteUploadThrottleTimeInNanos(); + } + assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos())); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + // clean up + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null) + .put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null) + ) + .get(); + } + } + + protected void setLowPriorityUploadRate(String repoName, String value) throws ExecutionException, InterruptedException { + GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName }); + GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get(); + RepositoryMetadata rmd = res.repositories().get(0); + Settings.Builder settings = Settings.builder() + .put("location", rmd.settings().get("location")) + .put("max_remote_low_priority_upload_bytes_per_sec", value); + assertAcked(client().admin().cluster().preparePutRepository(repoName).setType(rmd.type()).setSettings(settings).get()); + } + public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException { asyncUploadMockFsRepo = false; Version version = VersionUtils.randomIndexCompatibleVersion(random()); diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index fc0a574c191b1..6296608c64d37 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -34,7 +34,6 @@ import org.apache.lucene.index.CorruptIndexException; import org.opensearch.Version; -import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; @@ -101,6 +100,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -883,17 +884,20 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN assertEquals(YELLOW, health.getStatus()); assertEquals(2, health.getUnassignedShards()); // shard should be unassigned because of Allocation_Delayed - ClusterAllocationExplainResponse allocationExplainResponse = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test") - .setShard(0) - .setPrimary(false) - .get(); - assertEquals( - AllocationDecision.ALLOCATION_DELAYED, - allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision() + BooleanSupplier delayedShardAllocationStatusVerificationSupplier = () -> AllocationDecision.ALLOCATION_DELAYED.equals( + client().admin() + .cluster() + .prepareAllocationExplain() + .setIndex("test") + .setShard(0) + .setPrimary(false) + .get() + .getExplanation() + .getShardAllocationDecision() + .getAllocateDecision() + .getAllocationDecision() ); + waitUntil(delayedShardAllocationStatusVerificationSupplier, 2, TimeUnit.MINUTES); logger.info("--> restarting the node 1"); internalCluster().startDataOnlyNode( @@ -903,26 +907,16 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN assertTrue(clusterRerouteResponse.isAcknowledged()); ensureStableCluster(6); waitUntil( - () -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getInitializingShards() == 0 + () -> client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet().getActiveShards() == 3, + 2, + TimeUnit.MINUTES ); - health = client().admin().cluster().health(Requests.clusterHealthRequest().timeout("5m")).actionGet(); assertFalse(health.isTimedOut()); assertEquals(YELLOW, health.getStatus()); assertEquals(1, health.getUnassignedShards()); assertEquals(1, health.getDelayedUnassignedShards()); - allocationExplainResponse = client().admin() - .cluster() - .prepareAllocationExplain() - .setIndex("test") - .setShard(0) - .setPrimary(false) - .get(); - assertEquals( - AllocationDecision.ALLOCATION_DELAYED, - allocationExplainResponse.getExplanation().getShardAllocationDecision().getAllocateDecision().getAllocationDecision() - ); - + waitUntil(delayedShardAllocationStatusVerificationSupplier, 2, TimeUnit.MINUTES); logger.info("--> restarting the node 0"); internalCluster().startDataOnlyNode( Settings.builder().put("node.name", nodesWithReplicaShards.get(1)).put(replicaNode1DataPathSettings).build() diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 299652e4f07a9..0383aca2de33f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -1168,7 +1168,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception { }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); } - // when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys + // when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys public void testStaleKeysCleanupWithMultipleIndices() throws Exception { int cacheCleanIntervalInMillis = 10; String node = internalCluster().startNode( diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java index 793adef0594fc..6885d37c4aab0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -8,6 +8,8 @@ package org.opensearch.remotemigration; +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; @@ -471,7 +473,6 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E * exclude docrep nodes, assert that remote index path file exists * when shards start relocating to the remote nodes. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13939") public void testRemoteIndexPathFileExistsAfterMigration() throws Exception { String docrepClusterManager = internalCluster().startClusterManagerOnlyNode(); @@ -518,7 +519,11 @@ public void testRemoteIndexPathFileExistsAfterMigration() throws Exception { .isAcknowledged() ); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(docrepClusterManager)); + // elect cluster manager with remote-cluster state enabled + internalCluster().client() + .execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(docrepClusterManager)) + .get(); + internalCluster().validateClusterFormed(); logger.info("---> Excluding docrep nodes from allocation"); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java index 333fba413ce4e..1abacbe5091dd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java @@ -16,9 +16,9 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.repositories.fs.ReloadableFsRepository; -public class MockFsMetadataSupportedRepository extends FsRepository { +public class MockFsMetadataSupportedRepository extends ReloadableFsRepository { public static Setting TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting( "mock_fs_repository.trigger_data_integrity_failure", diff --git a/server/src/internalClusterTest/java/org/opensearch/search/query/QueryStringIT.java b/server/src/internalClusterTest/java/org/opensearch/search/query/QueryStringIT.java index c43a9c23661ea..8841638328ea4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/query/QueryStringIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/query/QueryStringIT.java @@ -45,7 +45,7 @@ import org.opensearch.index.query.QueryStringQueryBuilder; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.SearchModule; +import org.opensearch.search.SearchService; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.junit.Before; import org.junit.BeforeClass; @@ -101,7 +101,7 @@ public void setup() throws Exception { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT) + .put(SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/query/SimpleQueryStringIT.java b/server/src/internalClusterTest/java/org/opensearch/search/query/SimpleQueryStringIT.java index cae543506f919..f9ccdbd62de1c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/query/SimpleQueryStringIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/query/SimpleQueryStringIT.java @@ -57,7 +57,7 @@ import org.opensearch.plugins.Plugin; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.SearchModule; +import org.opensearch.search.SearchService; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.junit.BeforeClass; @@ -79,6 +79,7 @@ import static org.opensearch.index.query.QueryBuilders.simpleQueryStringQuery; import static org.opensearch.index.query.QueryBuilders.termQuery; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING; import static org.opensearch.test.StreamsUtils.copyToStringFromClasspath; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures; @@ -122,7 +123,7 @@ public static void createRandomClusterSetting() { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put(SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT) + .put(SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT) .build(); } @@ -720,6 +721,52 @@ public void testFieldAliasOnDisallowedFieldType() throws Exception { assertHits(response.getHits(), "1"); } + public void testDynamicClauseCountUpdate() throws Exception { + client().prepareIndex("testdynamic").setId("1").setSource("field", "foo bar baz").get(); + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT - 1)) + ); + refresh(); + StringBuilder sb = new StringBuilder("foo"); + + // create clause_count + 1 clauses to hit error + for (int i = 0; i <= CLUSTER_MAX_CLAUSE_COUNT; i++) { + sb.append(" OR foo" + i); + } + + QueryStringQueryBuilder qb = queryStringQuery(sb.toString()).field("field"); + + SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class, () -> { + client().prepareSearch("testdynamic").setQuery(qb).get(); + }); + + assert (e.getDetailedMessage().contains("maxClauseCount is set to " + (CLUSTER_MAX_CLAUSE_COUNT - 1))); + + // increase clause count by 2 + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey(), CLUSTER_MAX_CLAUSE_COUNT + 2)) + ); + + Thread.sleep(1); + + SearchResponse response = client().prepareSearch("testdynamic").setQuery(qb).get(); + assertHitCount(response, 1); + assertHits(response.getHits(), "1"); + + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(INDICES_MAX_CLAUSE_COUNT_SETTING.getKey())) + ); + } + private void assertHits(SearchHits hits, String... ids) { assertThat(hits.getTotalHits().value, equalTo((long) ids.length)); Set hitIds = new HashSet<>(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7ea04acf00415..233a8d732d178 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -150,7 +150,6 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; import org.opensearch.script.ScriptService; -import org.opensearch.search.SearchModule; import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.backpressure.settings.NodeDuressSettings; @@ -540,6 +539,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.MAX_OPEN_PIT_CONTEXT, SearchService.MAX_PIT_KEEPALIVE_SETTING, SearchService.MAX_AGGREGATION_REWRITE_FILTERS, + SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING, SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD, CreatePitController.PIT_INIT_KEEP_ALIVE, Node.WRITE_PORTS_FILE_SETTING, @@ -590,7 +590,6 @@ public void apply(Settings value, Settings current, Settings previous) { ResourceWatcherService.RELOAD_INTERVAL_HIGH, ResourceWatcherService.RELOAD_INTERVAL_MEDIUM, ResourceWatcherService.RELOAD_INTERVAL_LOW, - SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING, ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING, FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE, Node.BREAKER_TYPE_KEY, diff --git a/server/src/main/java/org/opensearch/index/search/QueryParserHelper.java b/server/src/main/java/org/opensearch/index/search/QueryParserHelper.java index bae58c0ce1ebf..06f450f090e63 100644 --- a/server/src/main/java/org/opensearch/index/search/QueryParserHelper.java +++ b/server/src/main/java/org/opensearch/index/search/QueryParserHelper.java @@ -38,7 +38,7 @@ import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.QueryShardException; -import org.opensearch.search.SearchModule; +import org.opensearch.search.SearchService; import java.util.Collection; import java.util.HashMap; @@ -180,7 +180,7 @@ static Map resolveMappingField( } static void checkForTooManyFields(int numberOfFields, QueryShardContext context, @Nullable String inputPattern) { - Integer limit = SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING.get(context.getIndexSettings().getSettings()); + int limit = SearchService.INDICES_MAX_CLAUSE_COUNT_SETTING.get(context.getIndexSettings().getSettings()); if (numberOfFields > limit) { StringBuilder errorMsg = new StringBuilder("field expansion "); if (inputPattern != null) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index ab76150f8f83d..99f78130ad3ef 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -64,6 +64,8 @@ public class RemoteDirectory extends Directory { private final UnaryOperator uploadRateLimiter; + private final UnaryOperator lowPriorityUploadRateLimiter; + private final UnaryOperator downloadRateLimiter; /** @@ -76,15 +78,17 @@ public BlobContainer getBlobContainer() { } public RemoteDirectory(BlobContainer blobContainer) { - this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity()); + this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity()); } public RemoteDirectory( BlobContainer blobContainer, UnaryOperator uploadRateLimiter, + UnaryOperator lowPriorityUploadRateLimiter, UnaryOperator downloadRateLimiter ) { this.blobContainer = blobContainer; + this.lowPriorityUploadRateLimiter = lowPriorityUploadRateLimiter; this.uploadRateLimiter = uploadRateLimiter; this.downloadRateLimiter = downloadRateLimiter; } @@ -357,13 +361,23 @@ private void uploadBlob( remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); + RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; + if (lowPriorityUpload) { + offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply( + new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) + ); + } else { + offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply( + new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) + ); + } RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( src, remoteFileName, contentLength, true, lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, - (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), + offsetRangeInputStreamSupplier, expectedChecksum, remoteIntegrityEnabled ); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index b965d7ce73ae6..3f6f4eeeef87b 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -77,6 +77,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s RemoteDirectory dataDirectory = new RemoteDirectory( blobStoreRepository.blobStore().blobContainer(dataPath), blobStoreRepository::maybeRateLimitRemoteUploadTransfers, + blobStoreRepository::maybeRateLimitLowPriorityRemoteUploadTransfers, blobStoreRepository::maybeRateLimitRemoteDownloadTransfers ); diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 06cd77a34fe0b..93946fa11de13 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -81,6 +81,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -506,7 +507,7 @@ public int hashCode() { * */ class IndicesRequestCacheCleanupManager implements Closeable { private final Set keysToClean; - private final ConcurrentMap> cleanupKeyToCountMap; + private final ConcurrentHashMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private volatile double stalenessThreshold; private final IndicesRequestCacheCleaner cacheCleaner; @@ -514,7 +515,7 @@ class IndicesRequestCacheCleanupManager implements Closeable { IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { this.stalenessThreshold = stalenessThreshold; this.keysToClean = ConcurrentCollections.newConcurrentSet(); - this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); + this.cleanupKeyToCountMap = new ConcurrentHashMap<>(); this.staleKeysCount = new AtomicInteger(0); this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); @@ -572,8 +573,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // pkg-private for testing void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { - cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) - .merge(readerCacheKeyId, 1, Integer::sum); + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum); } /** @@ -831,7 +831,7 @@ public void close() { } // for testing - ConcurrentMap> getCleanupKeyToCountMap() { + ConcurrentHashMap> getCleanupKeyToCountMap() { return cleanupKeyToCountMap; } diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 697ac37c4a175..d700a92ed4bad 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -142,6 +142,11 @@ public long getRemoteUploadThrottleTimeInNanos() { return in.getRemoteUploadThrottleTimeInNanos(); } + @Override + public long getLowPriorityRemoteUploadThrottleTimeInNanos() { + return in.getRemoteUploadThrottleTimeInNanos(); + } + @Override public long getRemoteDownloadThrottleTimeInNanos() { return in.getRemoteDownloadThrottleTimeInNanos(); diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index b3f1e9ce2eed9..ed30aad7b4dd2 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -207,10 +207,17 @@ default void deleteSnapshotsAndReleaseLockFiles( long getRestoreThrottleTimeInNanos(); /** - * Returns restore throttle time in nanoseconds + * Returns upload throttle time in nanoseconds */ long getRemoteUploadThrottleTimeInNanos(); + /** + * Returns low priority upload throttle time in nanoseconds + */ + default long getLowPriorityRemoteUploadThrottleTimeInNanos() { + return 0; + } + /** * Returns restore throttle time in nanoseconds */ diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 1a5701d9204ef..c41e97d278dd5 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -316,6 +316,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private volatile RateLimiter remoteUploadRateLimiter; + private volatile RateLimiter remoteUploadLowPriorityRateLimiter; + private volatile RateLimiter remoteDownloadRateLimiter; private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric(); @@ -326,6 +328,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric(); + private final CounterMetric remoteUploadLowPriorityRateLimitingTimeInNanos = new CounterMetric(); + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "metadata", METADATA_NAME_FORMAT, @@ -445,6 +449,11 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) { snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO); + remoteUploadLowPriorityRateLimiter = getRateLimiter( + metadata.settings(), + "max_remote_low_priority_upload_bytes_per_sec", + ByteSizeValue.ZERO + ); remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO); readOnly = READONLY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); @@ -1882,6 +1891,11 @@ public long getRemoteUploadThrottleTimeInNanos() { return remoteUploadRateLimitingTimeInNanos.count(); } + @Override + public long getLowPriorityRemoteUploadThrottleTimeInNanos() { + return remoteUploadLowPriorityRateLimitingTimeInNanos.count(); + } + @Override public long getRemoteDownloadThrottleTimeInNanos() { return remoteDownloadRateLimitingTimeInNanos.count(); @@ -3177,6 +3191,20 @@ public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInp ); } + public OffsetRangeInputStream maybeRateLimitLowPriorityRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) { + return maybeRateLimitRemoteTransfers( + maybeRateLimitRemoteTransfers( + offsetRangeInputStream, + () -> remoteUploadRateLimiter, + remoteUploadRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_UPLOAD + ), + () -> remoteUploadLowPriorityRateLimiter, + remoteUploadLowPriorityRateLimitingTimeInNanos, + BlobStoreTransferContext.REMOTE_UPLOAD + ); + } + public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) { return maybeRateLimit( maybeRateLimit( diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index 88218896dceae..b463458847a88 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -37,7 +37,6 @@ import org.opensearch.common.Nullable; import org.opensearch.common.geo.GeoShapeType; import org.opensearch.common.geo.ShapesAvailability; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.ParseFieldRegistry; import org.opensearch.core.ParseField; @@ -302,13 +301,6 @@ * @opensearch.internal */ public class SearchModule { - public static final Setting INDICES_MAX_CLAUSE_COUNT_SETTING = Setting.intSetting( - "indices.query.bool.max_clause_count", - 1024, - 1, - Integer.MAX_VALUE, - Setting.Property.NodeScope - ); private final Map highlighters; private final ParseFieldRegistry movingAverageModelParserRegistry = new ParseFieldRegistry<>( @@ -1094,7 +1086,6 @@ private void registerQueryParsers(List plugins) { registerQuery(new QuerySpec<>(MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new, MatchAllQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(QueryStringQueryBuilder.NAME, QueryStringQueryBuilder::new, QueryStringQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(BoostingQueryBuilder.NAME, BoostingQueryBuilder::new, BoostingQueryBuilder::fromXContent)); - BooleanQuery.setMaxClauseCount(INDICES_MAX_CLAUSE_COUNT_SETTING.get(settings)); registerQuery(new QuerySpec<>(BoolQueryBuilder.NAME, BoolQueryBuilder::new, BoolQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(TermQueryBuilder.NAME, TermQueryBuilder::new, TermQueryBuilder::fromXContent)); registerQuery(new QuerySpec<>(TermsQueryBuilder.NAME, TermsQueryBuilder::new, TermsQueryBuilder::fromXContent)); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 135af91912e5d..a53a7198c366f 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TopDocs; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionRunnable; @@ -281,6 +282,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + public static final Setting INDICES_MAX_CLAUSE_COUNT_SETTING = Setting.intSetting( + "indices.query.bool.max_clause_count", + 1024, + 1, + Integer.MAX_VALUE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public static final Setting CLUSTER_ALLOW_DERIVED_FIELD_SETTING = Setting.boolSetting( "search.derived_field.enabled", true, @@ -411,6 +421,9 @@ public SearchService( lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation); + IndexSearcher.setMaxClauseCount(INDICES_MAX_CLAUSE_COUNT_SETTING.get(settings)); + clusterService.getClusterSettings().addSettingsUpdateConsumer(INDICES_MAX_CLAUSE_COUNT_SETTING, IndexSearcher::setMaxClauseCount); + allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index bd01bc1ab0cdb..c543f986b3e86 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -9,19 +9,36 @@ package org.opensearch.gateway.remote; import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata.XContentContext; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.TestCapturingListener; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; +import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteGlobalMetadata; +import org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings; +import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; +import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -32,14 +49,48 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; +import static org.opensearch.common.blobstore.stream.write.WritePriority.URGENT; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadataTests.getCoordinationMetadata; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadataTests.getCustomMetadata; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadataTests.getGlobalMetadata; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettingsTests.getHashesOfConsistentSettings; +import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; +import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadataTests.getSettings; +import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadataTests.getTemplatesMetadata; +import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyIterable; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,26 +98,36 @@ public class RemoteGlobalMetadataManagerTests extends OpenSearchTestCase { private RemoteGlobalMetadataManager remoteGlobalMetadataManager; private ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; + private Compressor compressor; + private NamedXContentRegistry xContentRegistry; + private NamedWriteableRegistry namedWriteableRegistry; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private final long METADATA_VERSION = 7331L; + private final String CLUSTER_NAME = "test-cluster"; + private final String CLUSTER_UUID = "test-cluster-uuid"; @Before public void setup() { clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); blobStoreRepository = mock(BlobStoreRepository.class); - BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); - NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( + blobStoreTransferService = mock(BlobStoreTransferService.class); + compressor = new NoneCompressor(); + xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), IndicesModule.getNamedXContents().stream(), ClusterModule.getNamedXWriteables().stream() ).flatMap(Function.identity()).collect(toList()) ); - Compressor compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + BlobPath blobPath = new BlobPath(); when(blobStoreRepository.getCompressor()).thenReturn(compressor); when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(xContentRegistry); + when(blobStoreRepository.basePath()).thenReturn(blobPath); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, - "test-cluster", + CLUSTER_NAME, blobStoreRepository, blobStoreTransferService, writableRegistry(), @@ -96,6 +157,469 @@ public void testGlobalMetadataUploadWaitTimeSetting() { assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); } + public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Exception { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + COORDINATION_METADATA_FORMAT.serialize(coordinationMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + coordinationMetadataForDownload, + COORDINATION_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(coordinationMetadata, listener.getResult().getObj()); + assertEquals(COORDINATION_METADATA, listener.getResult().getComponent()); + assertEquals(COORDINATION_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Exception { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(COORDINATION_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(COORDINATION_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception { + Settings settingsMetadata = getSettings(); + String fileName = randomAlphaOfLength(10); + RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + RemotePersistentSettingsMetadata.SETTINGS_METADATA_FORMAT.serialize(settingsMetadata, fileName, compressor, FORMAT_PARAMS) + .streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + persistentSettings, + SETTING_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(settingsMetadata, listener.getResult().getObj()); + assertEquals(SETTING_METADATA, listener.getResult().getComponent()); + assertEquals(SETTING_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exception { + Settings settingsMetadata = getSettings(); + RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( + settingsMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(SETTING_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(SETTING_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception { + Settings settingsMetadata = getSettings(); + String fileName = randomAlphaOfLength(10); + RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + RemoteTransientSettingsMetadata.SETTINGS_METADATA_FORMAT.serialize(settingsMetadata, fileName, compressor, FORMAT_PARAMS) + .streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + transientSettings, + TRANSIENT_SETTING_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(settingsMetadata, listener.getResult().getObj()); + assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponent()); + assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception { + Settings settingsMetadata = getSettings(); + RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( + settingsMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, new LatchedActionListener<>(listener, latch)).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(TRANSIENT_SETTING_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(TRANSIENT_SETTING_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws Exception { + DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); + String fileName = randomAlphaOfLength(10); + RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForDownload = new RemoteHashesOfConsistentSettings( + fileName, + CLUSTER_UUID, + compressor + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, fileName, compressor).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + hashesOfConsistentSettingsForDownload, + HASHES_OF_CONSISTENT_SETTINGS, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(hashesOfConsistentSettings, listener.getResult().getObj()); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponent()); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws Exception { + DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); + RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForUpload = new RemoteHashesOfConsistentSettings( + hashesOfConsistentSettings, + METADATA_VERSION, + CLUSTER_UUID, + compressor + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + hashesOfConsistentSettingsForUpload, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception { + TemplatesMetadata templatesMetadata = getTemplatesMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteTemplatesMetadata templatesMetadataForDownload = new RemoteTemplatesMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + TEMPLATES_METADATA_FORMAT.serialize(templatesMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + templatesMetadataForDownload, + TEMPLATES_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(templatesMetadata, listener.getResult().getObj()); + assertEquals(TEMPLATES_METADATA, listener.getResult().getComponent()); + assertEquals(TEMPLATES_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception { + TemplatesMetadata templatesMetadata = getTemplatesMetadata(); + RemoteTemplatesMetadata templateMetadataForUpload = new RemoteTemplatesMetadata( + templatesMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(TEMPLATES_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(TEMPLATES_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { + Metadata.Custom customMetadata = getCustomMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteCustomMetadata customMetadataForDownload = new RemoteCustomMetadata( + fileName, + IndexGraveyard.TYPE, + CLUSTER_UUID, + compressor, + namedWriteableRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + customMetadataForDownload.customBlobStoreFormat.serialize(customMetadata, fileName, compressor).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + customMetadataForDownload, + IndexGraveyard.TYPE, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(customMetadata, listener.getResult().getObj()); + assertEquals(CUSTOM_METADATA, listener.getResult().getComponent()); + assertEquals(IndexGraveyard.TYPE, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { + Metadata.Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata customMetadataForUpload = new RemoteCustomMetadata( + customMetadata, + IndexGraveyard.TYPE, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + namedWriteableRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, IndexGraveyard.TYPE), uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, IndexGraveyard.TYPE), splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { + Metadata metadata = getGlobalMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteGlobalMetadata globalMetadataForDownload = new RemoteGlobalMetadata(fileName, CLUSTER_UUID, compressor, xContentRegistry); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + GLOBAL_METADATA_FORMAT.serialize(metadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + globalMetadataForDownload, + GLOBAL_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertTrue(isGlobalStateEquals(metadata, (Metadata) listener.getResult().getObj())); + assertEquals(GLOBAL_METADATA, listener.getResult().getComponent()); + assertEquals(GLOBAL_METADATA, listener.getResult().getComponentName()); + } + + public void testGetReadMetadataAsyncAction_IOException() throws Exception { + String fileName = randomAlphaOfLength(10); + RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + IOException ioException = new IOException("mock test exception"); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + coordinationMetadataForDownload, + COORDINATION_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertEquals(ioException, listener.getFailure()); + } + + public void testGetAsyncMetadataWriteAction_IOException() throws Exception { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + IOException ioException = new IOException("mock test exception"); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onFailure(ioException); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + .run(); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(ioException, listener.getFailure().getCause()); + } + public void testGetUpdatedCustoms() { Map previousCustoms = Map.of( CustomMetadata1.TYPE, diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java index 9484afe6b7d6c..63d6de05a737c 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java @@ -229,7 +229,7 @@ public void testSerDe() throws IOException { } } - private CoordinationMetadata getCoordinationMetadata() { + public static CoordinationMetadata getCoordinationMetadata() { return CoordinationMetadata.builder() .term(TERM) .lastAcceptedConfiguration(new VotingConfiguration(Set.of("node1"))) diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java index 1bce176273270..1e28817be79f2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java @@ -232,7 +232,7 @@ public void testSerDe() throws IOException { } } - private Custom getCustomMetadata() { + public static Custom getCustomMetadata() { return IndexGraveyard.builder().addTombstone(new Index("test-index", "3q2423")).build(); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java index 02ddc8ba93071..23de485357547 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java @@ -180,7 +180,7 @@ public void testSerDe() throws IOException { } } - private Metadata getGlobalMetadata() { + public static Metadata getGlobalMetadata() { return Metadata.builder() .templates( TemplatesMetadata.builder() diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java index d883eabf9fbc9..b931f24f98631 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java @@ -185,7 +185,7 @@ public void testSerDe() throws IOException { } } - private DiffableStringMap getHashesOfConsistentSettings() { + public static DiffableStringMap getHashesOfConsistentSettings() { Map hashesOfConsistentSettings = new HashMap<>(); hashesOfConsistentSettings.put("secure-setting-key", "secure-setting-value"); return new DiffableStringMap(hashesOfConsistentSettings); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java index 850c18f03fa49..5e4d5d66ca1b7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java @@ -224,7 +224,7 @@ public void testSerDe() throws IOException { } } - private Settings getSettings() { + public static Settings getSettings() { return Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build(); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java index b86044003aa55..d7ecd2ad3f44a 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java @@ -227,7 +227,7 @@ public void testSerDe() throws IOException { } } - private TemplatesMetadata getTemplatesMetadata() { + public static TemplatesMetadata getTemplatesMetadata() { return TemplatesMetadata.builder() .put( IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3)) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index e5bfa8caee79a..3188de13bb00b 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2956,6 +2956,14 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep ) ); + // Make sure to drain refreshes from the shard. Otherwise, if the refresh is in-progress, it overlaps with + // deletion of segment files in the subsequent code block. + for (ReferenceManager.RefreshListener refreshListener : target.getEngine().config().getInternalRefreshListener()) { + if (refreshListener instanceof ReleasableRetryableRefreshListener) { + ((ReleasableRetryableRefreshListener) refreshListener).drainRefreshes(); + } + } + // Delete files in store directory to restore from remote directory Directory storeDirectory = target.store().directory(); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 205712d388cd1..10688de3ab0ae 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -101,7 +101,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( indexShard.hashCode() ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentHashMap> cleanupKeyToCountMap = cache.cacheCleanupManager + .getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentHashMap> cleanupKeyToCountMap = cache.cacheCleanupManager + .getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); // test the mappings - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentHashMap> cleanupKeyToCountMap = cache.cacheCleanupManager + .getCleanupKeyToCountMap(); assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); @@ -796,7 +798,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { } // test adding to cleanupKeyToCountMap with multiple threads - public void testAddToCleanupKeyToCountMap() throws Exception { + public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception { threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); @@ -804,7 +806,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception { int numberOfThreads = 10; int numberOfIterations = 1000; Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread - AtomicBoolean exceptionDetected = new AtomicBoolean(false); + AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false); ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); @@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception { } } catch (ConcurrentModificationException e) { logger.error("ConcurrentModificationException detected in thread : " + e.getMessage()); - exceptionDetected.set(true); // Set flag if exception is detected + concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected } }); } @@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception { } } catch (ConcurrentModificationException e) { logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage()); - exceptionDetected.set(true); // Set flag if exception is detected + concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected } }); executorService.shutdown(); - executorService.awaitTermination(60, TimeUnit.SECONDS); - assertFalse(exceptionDetected.get()); + assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS)); + assertEquals( + numberOfThreads * numberOfIterations, + cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size() + ); + assertFalse(concurrentModificationExceptionDetected.get()); } private IndicesRequestCache getIndicesRequestCache(Settings settings) { diff --git a/test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java b/test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java new file mode 100644 index 0000000000000..a3c8cc15de927 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +import org.opensearch.core.action.ActionListener; + +/** + * A simple implementation of {@link ActionListener} that captures the response and failures used for testing purposes. + * + * @param the result type + */ +public class TestCapturingListener implements ActionListener { + private T result; + private Exception failure; + + @Override + public void onResponse(T result) { + this.result = result; + } + + @Override + public void onFailure(Exception e) { + this.failure = e; + } + + public T getResult() { + return result; + } + + public Exception getFailure() { + return failure; + } +}