diff --git a/.github/workflows/dco.yml b/.github/workflows/dco.yml new file mode 100644 index 0000000000000..9580d510fd108 --- /dev/null +++ b/.github/workflows/dco.yml @@ -0,0 +1,19 @@ +name: Developer Certificate of Origin Check + +on: [pull_request] + +jobs: + dco-check: + runs-on: ubuntu-latest + + steps: + - name: Get PR Commits + id: 'get-pr-commits' + uses: tim-actions/get-pr-commits@v1.1.0 + with: + token: ${{ secrets.GITHUB_TOKEN }} + - name: DCO Check + uses: tim-actions/dco@v1.1.0 + with: + commits: ${{ steps.get-pr-commits.outputs.commits }} + diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java similarity index 99% rename from modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java rename to modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java index bfc184cff0566..02be0990eb136 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java @@ -65,7 +65,7 @@ protected Collection> nodePlugins() { return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class); } - private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) { + static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) { return Settings.builder() .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( @@ -88,7 +88,7 @@ private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) { OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) .getKey(), - onHeapCacheSizeInBytesOrPecentage + onHeapCacheSizeInBytesOrPercentage ) .build(); } diff --git a/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java new file mode 100644 index 0000000000000..537caccbac652 --- /dev/null +++ b/modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java @@ -0,0 +1,501 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.service.NodeCacheStats; +import org.opensearch.common.cache.stats.ImmutableCacheStats; +import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +// Use a single data node to simplify accessing cache stats across different shards. +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class TieredSpilloverCacheStatsIT extends OpenSearchIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TieredSpilloverCachePlugin.class, TieredSpilloverCacheIT.MockDiskCachePlugin.class); + } + + private final String HEAP_CACHE_SIZE_STRING = "10000B"; + private final int HEAP_CACHE_SIZE = 10_000; + private final String index1Name = "index1"; + private final String index2Name = "index2"; + + /** + * Test aggregating by indices + */ + public void testIndicesLevelAggregation() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) + ); + ImmutableCacheStatsHolder indicesOnlyStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME) + ); + + // Get values for indices alone, assert these match for statsHolders that have additional dimensions vs. a statsHolder that only has + // the indices dimension + ImmutableCacheStats index1ExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex1") + values.get("hitsOnDiskIndex1"), + values.get("itemsOnDiskIndex1AfterTest") + values.get("itemsOnHeapIndex1AfterTest"), + 0, + (values.get("itemsOnDiskIndex1AfterTest") + values.get("itemsOnHeapIndex1AfterTest")) * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex1AfterTest") + values.get("itemsOnHeapIndex1AfterTest") + ) + ); + ImmutableCacheStats index2ExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex2") + values.get("hitsOnDiskIndex2"), + values.get("itemsOnDiskIndex2AfterTest") + values.get("itemsOnHeapIndex2AfterTest"), + 0, + (values.get("itemsOnDiskIndex2AfterTest") + values.get("itemsOnHeapIndex2AfterTest")) * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex2AfterTest") + values.get("itemsOnHeapIndex2AfterTest") + ) + ); + + for (ImmutableCacheStatsHolder statsHolder : List.of(allLevelsStatsHolder, indicesOnlyStatsHolder)) { + assertEquals(index1ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index1Name))); + assertEquals(index2ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index2Name))); + } + } + + /** + * Test aggregating by indices and tier + */ + public void testIndicesAndTierLevelAggregation() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) + ); + + // Get values broken down by indices+tiers + ImmutableCacheStats index1HeapExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex1"), + values.get("itemsOnHeapIndex1AfterTest") + values.get("itemsOnDiskIndex1AfterTest") + values.get("hitsOnDiskIndex1"), + values.get("itemsOnDiskIndex1AfterTest"), + values.get("itemsOnHeapIndex1AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnHeapIndex1AfterTest") + ) + ); + assertEquals( + index1HeapExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP)) + ); + + ImmutableCacheStats index2HeapExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex2"), + values.get("itemsOnHeapIndex2AfterTest") + values.get("itemsOnDiskIndex2AfterTest") + values.get("hitsOnDiskIndex2"), + values.get("itemsOnDiskIndex2AfterTest"), + values.get("itemsOnHeapIndex2AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnHeapIndex2AfterTest") + ) + ); + assertEquals( + index2HeapExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_ON_HEAP)) + ); + + ImmutableCacheStats index1DiskExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnDiskIndex1"), + values.get("itemsOnHeapIndex1AfterTest") + values.get("itemsOnDiskIndex1AfterTest"), + 0, + values.get("itemsOnDiskIndex1AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex1AfterTest") + ) + ); + assertEquals( + index1DiskExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_DISK)) + ); + + ImmutableCacheStats index2DiskExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnDiskIndex2"), + values.get("itemsOnHeapIndex2AfterTest") + values.get("itemsOnDiskIndex2AfterTest"), + 0, + values.get("itemsOnDiskIndex2AfterTest") * values.get("singleSearchSize"), + values.get("itemsOnDiskIndex2AfterTest") + ) + ); + assertEquals( + index2DiskExpectedStats, + allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_DISK)) + ); + } + + /** + * Test aggregating by tier only + */ + public void testTierLevelAggregation() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + // Get values for tiers alone and check they add correctly across indices + ImmutableCacheStatsHolder tiersOnlyStatsHolder = getNodeCacheStatsResult(client, List.of(TIER_DIMENSION_NAME)); + ImmutableCacheStats totalHeapExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnHeapIndex1") + values.get("hitsOnHeapIndex2"), + values.get("itemsOnHeapAfterTest") + values.get("itemsOnDiskAfterTest") + values.get("hitsOnDiskIndex1") + values.get( + "hitsOnDiskIndex2" + ), + values.get("itemsOnDiskAfterTest"), + values.get("itemsOnHeapAfterTest") * values.get("singleSearchSize"), + values.get("itemsOnHeapAfterTest") + ) + ); + ImmutableCacheStats heapStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(totalHeapExpectedStats, heapStats); + ImmutableCacheStats totalDiskExpectedStats = returnNullIfAllZero( + new ImmutableCacheStats( + values.get("hitsOnDiskIndex1") + values.get("hitsOnDiskIndex2"), + values.get("itemsOnHeapAfterTest") + values.get("itemsOnDiskAfterTest"), + 0, + values.get("itemsOnDiskAfterTest") * values.get("singleSearchSize"), + values.get("itemsOnDiskAfterTest") + ) + ); + ImmutableCacheStats diskStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_DISK)); + assertEquals(totalDiskExpectedStats, diskStats); + } + + public void testInvalidLevelsAreIgnored() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + Client client = client(); + Map values = setupCacheForAggregationTests(client); + + ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) + ); + ImmutableCacheStatsHolder indicesOnlyStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME) + ); + + // Test invalid levels are ignored and permuting the order of levels in the request doesn't matter + + // This should be equivalent to just "indices" + ImmutableCacheStatsHolder indicesEquivalentStatsHolder = getNodeCacheStatsResult( + client, + List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, "unrecognized_dimension") + ); + assertEquals(indicesOnlyStatsHolder, indicesEquivalentStatsHolder); + + // This should be equivalent to "indices", "tier" + ImmutableCacheStatsHolder indicesAndTierEquivalentStatsHolder = getNodeCacheStatsResult( + client, + List.of(TIER_DIMENSION_NAME, "unrecognized_dimension_1", IndicesRequestCache.INDEX_DIMENSION_NAME, "unrecognized_dimension_2") + ); + assertEquals(allLevelsStatsHolder, indicesAndTierEquivalentStatsHolder); + + // This should be equivalent to no levels passed in + ImmutableCacheStatsHolder noLevelsEquivalentStatsHolder = getNodeCacheStatsResult( + client, + List.of("unrecognized_dimension_1", "unrecognized_dimension_2") + ); + ImmutableCacheStatsHolder noLevelsStatsHolder = getNodeCacheStatsResult(client, List.of()); + assertEquals(noLevelsStatsHolder, noLevelsEquivalentStatsHolder); + } + + /** + * Check the new stats API returns the same values as the old stats API. + */ + public void testStatsMatchOldApi() throws Exception { + internalCluster().startNodes( + 1, + Settings.builder() + .put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.SECONDS) + ) + .build() + ); + String index = "index"; + Client client = client(); + startIndex(client, index); + + // First search one time to see how big a single value will be + searchIndex(client, index, 0); + // get total stats + long singleSearchSize = getTotalStats(client).getSizeInBytes(); + // Select numbers so we get some values on both heap and disk + int itemsOnHeap = HEAP_CACHE_SIZE / (int) singleSearchSize; + int itemsOnDisk = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk + int expectedEntries = itemsOnHeap + itemsOnDisk; + + for (int i = 1; i < expectedEntries; i++) { + // Cause misses + searchIndex(client, index, i); + } + int expectedMisses = itemsOnHeap + itemsOnDisk; + + // Cause some hits + int expectedHits = randomIntBetween(itemsOnHeap, expectedEntries); // Select it so some hits come from both tiers + for (int i = 0; i < expectedHits; i++) { + searchIndex(client, index, i); + } + + ImmutableCacheStats totalStats = getNodeCacheStatsResult(client, List.of()).getTotalStats(); + + // Check the new stats API values are as expected + assertEquals( + new ImmutableCacheStats(expectedHits, expectedMisses, 0, expectedEntries * singleSearchSize, expectedEntries), + totalStats + ); + // Now check the new stats API values for the cache as a whole match the old stats API values + RequestCacheStats oldAPIStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + assertEquals(oldAPIStats.getHitCount(), totalStats.getHits()); + assertEquals(oldAPIStats.getMissCount(), totalStats.getMisses()); + assertEquals(oldAPIStats.getEvictions(), totalStats.getEvictions()); + assertEquals(oldAPIStats.getMemorySizeInBytes(), totalStats.getSizeInBytes()); + } + + private void startIndex(Client client, String indexName) throws InterruptedException { + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ) + .get() + ); + indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); + ensureSearchable(indexName); + } + + private Map setupCacheForAggregationTests(Client client) throws Exception { + startIndex(client, index1Name); + startIndex(client, index2Name); + + // First search one time to see how big a single value will be + searchIndex(client, index1Name, 0); + // get total stats + long singleSearchSize = getTotalStats(client).getSizeInBytes(); + int itemsOnHeapAfterTest = HEAP_CACHE_SIZE / (int) singleSearchSize; // As the heap tier evicts, the items on it after the test will + // be the same as its max capacity + int itemsOnDiskAfterTest = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk + + // Put some values on heap and disk for each index + int itemsOnHeapIndex1AfterTest = randomInt(itemsOnHeapAfterTest); + int itemsOnHeapIndex2AfterTest = itemsOnHeapAfterTest - itemsOnHeapIndex1AfterTest; + int itemsOnDiskIndex1AfterTest = 1 + randomInt(itemsOnDiskAfterTest - 1); + // The first one we search (to get the size) always goes to disk + int itemsOnDiskIndex2AfterTest = itemsOnDiskAfterTest - itemsOnDiskIndex1AfterTest; + int hitsOnHeapIndex1 = randomInt(itemsOnHeapIndex1AfterTest); + int hitsOnDiskIndex1 = randomInt(itemsOnDiskIndex1AfterTest); + int hitsOnHeapIndex2 = randomInt(itemsOnHeapIndex2AfterTest); + int hitsOnDiskIndex2 = randomInt(itemsOnDiskIndex2AfterTest); + + // Put these values into a map so tests can know what to expect in stats responses + Map expectedValues = new HashMap<>(); + expectedValues.put("itemsOnHeapIndex1AfterTest", itemsOnHeapIndex1AfterTest); + expectedValues.put("itemsOnHeapIndex2AfterTest", itemsOnHeapIndex2AfterTest); + expectedValues.put("itemsOnDiskIndex1AfterTest", itemsOnDiskIndex1AfterTest); + expectedValues.put("itemsOnDiskIndex2AfterTest", itemsOnDiskIndex2AfterTest); + expectedValues.put("hitsOnHeapIndex1", hitsOnHeapIndex1); + expectedValues.put("hitsOnDiskIndex1", hitsOnDiskIndex1); + expectedValues.put("hitsOnHeapIndex2", hitsOnHeapIndex2); + expectedValues.put("hitsOnDiskIndex2", hitsOnDiskIndex2); + expectedValues.put("singleSearchSize", (int) singleSearchSize); + expectedValues.put("itemsOnDiskAfterTest", itemsOnDiskAfterTest); + expectedValues.put("itemsOnHeapAfterTest", itemsOnHeapAfterTest); // Can only pass 10 keys in Map.of() constructor + + // The earliest items (0 - itemsOnDiskAfterTest) are the ones which get evicted to disk + for (int i = 1; i < itemsOnDiskIndex1AfterTest; i++) { // Start at 1 as 0 has already been searched + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskIndex1AfterTest; i < itemsOnDiskIndex1AfterTest + itemsOnDiskIndex2AfterTest; i++) { + searchIndex(client, index2Name, i); + } + // The remaining items stay on heap + for (int i = itemsOnDiskAfterTest; i < itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i++) { + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i < itemsOnDiskAfterTest + itemsOnHeapAfterTest; i++) { + searchIndex(client, index2Name, i); + } + + // Get some hits on all combinations of indices and tiers + for (int i = itemsOnDiskAfterTest; i < itemsOnDiskAfterTest + hitsOnHeapIndex1; i++) { + // heap hits for index 1 + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest; i < itemsOnDiskAfterTest + itemsOnHeapIndex1AfterTest + + hitsOnHeapIndex2; i++) { + // heap hits for index 2 + searchIndex(client, index2Name, i); + } + for (int i = 0; i < hitsOnDiskIndex1; i++) { + // disk hits for index 1 + searchIndex(client, index1Name, i); + } + for (int i = itemsOnDiskIndex1AfterTest; i < itemsOnDiskIndex1AfterTest + hitsOnDiskIndex2; i++) { + // disk hits for index 2 + searchIndex(client, index2Name, i); + } + return expectedValues; + } + + private ImmutableCacheStats returnNullIfAllZero(ImmutableCacheStats expectedStats) { + // If the randomly chosen numbers are such that the expected stats would be 0, we actually have not interacted with the cache for + // this index. + // In this case, we expect the stats holder to have no stats for this node, and therefore we should get null from + // statsHolder.getStatsForDimensionValues(). + // We will not see it in the XContent response. + if (expectedStats.equals(new ImmutableCacheStats(0, 0, 0, 0, 0))) { + return null; + } + return expectedStats; + } + + // Duplicated from CacheStatsAPIIndicesRequestCacheIT.java, as we can't add a dependency on server.internalClusterTest + + private SearchResponse searchIndex(Client client, String index, int searchSuffix) { + SearchResponse resp = client.prepareSearch(index) + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k", "hello" + padWithZeros(4, searchSuffix))) + // pad with zeros so request 0 and request 10 have the same size ("0000" and "0010" instead of "0" and "10") + .get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + return resp; + } + + private String padWithZeros(int finalLength, int inputValue) { + // Avoid forbidden API String.format() + String input = String.valueOf(inputValue); + if (input.length() >= finalLength) { + return input; + } + StringBuilder sb = new StringBuilder(); + while (sb.length() < finalLength - input.length()) { + sb.append('0'); + } + sb.append(input); + return sb.toString(); + } + + private ImmutableCacheStats getTotalStats(Client client) throws IOException { + ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); + return statsHolder.getStatsForDimensionValues(List.of()); + } + + private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, List aggregationLevels) throws IOException { + CommonStatsFlags statsFlags = new CommonStatsFlags(); + statsFlags.includeAllCacheTypes(); + String[] flagsLevels; + if (aggregationLevels == null) { + flagsLevels = null; + } else { + flagsLevels = aggregationLevels.toArray(new String[0]); + } + statsFlags.setLevels(flagsLevels); + + NodesStatsResponse nodeStatsResponse = client.admin() + .cluster() + .prepareNodesStats("data:true") + .addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName()) + .setIndices(statsFlags) + .get(); + // Can always get the first data node as there's only one in this test suite + assertEquals(1, nodeStatsResponse.getNodes().size()); + NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats(); + return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE); + } +} diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index f40c35dde83de..63cdbca101f2a 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -327,6 +327,7 @@ private Function, Tuple> getValueFromTieredCache(boolean void handleRemovalFromHeapTier(RemovalNotification, V> notification) { ICacheKey key = notification.getKey(); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); + boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) { try (ReleasableLock ignore = writeLock.acquire()) { diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats @@ -336,21 +337,28 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification // If the value is not going to the disk cache, send this notification to the TSC's removal listener // as the value is leaving the TSC entirely removalListener.onRemoval(notification); + countEvictionTowardsTotal = true; } - updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue()); + updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue(), countEvictionTowardsTotal); } void handleRemovalFromDiskTier(RemovalNotification, V> notification) { // Values removed from the disk tier leave the TSC entirely removalListener.onRemoval(notification); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); - updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue()); + updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue(), true); } - void updateStatsOnRemoval(String removedFromTierValue, boolean wasEvicted, ICacheKey key, V value) { + void updateStatsOnRemoval( + String removedFromTierValue, + boolean wasEvicted, + ICacheKey key, + V value, + boolean countEvictionTowardsTotal + ) { List dimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, removedFromTierValue); if (wasEvicted) { - statsHolder.incrementEvictions(dimensionValues); + statsHolder.incrementEvictions(dimensionValues, countEvictionTowardsTotal); } statsHolder.decrementItems(dimensionValues); statsHolder.decrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value)); diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java index d17059e8dee94..b40724430454b 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsHolder.java @@ -105,20 +105,29 @@ public void incrementMisses(List dimensionValues) { internalIncrement(dimensionValues, missIncrementer, true); } + /** + * This method shouldn't be used in this class. Instead, use incrementEvictions(dimensionValues, includeInTotal) + * which specifies whether the eviction should be included in the cache's total evictions, or if it should + * just count towards that tier's evictions. + * @param dimensionValues The dimension values + */ @Override public void incrementEvictions(List dimensionValues) { - final String tierValue = validateTierDimensionValue(dimensionValues); + throw new UnsupportedOperationException( + "TieredSpilloverCacheHolder must specify whether to include an eviction in the total cache stats. Use incrementEvictions(List dimensionValues, boolean includeInTotal)" + ); + } - // If the disk tier is present, only evictions from the disk tier should be included in total values. + /** + * Increment evictions for this set of dimension values. + * @param dimensionValues The dimension values + * @param includeInTotal Whether to include this eviction in the total for the whole cache's evictions + */ + public void incrementEvictions(List dimensionValues, boolean includeInTotal) { + validateTierDimensionValue(dimensionValues); + // If we count this eviction towards the total, we should increment all ancestor nodes. If not, only increment the leaf node. Consumer evictionsIncrementer = (node) -> { - if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) { - // If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent - // nodes - if (node.isAtLowestLevel()) { - node.incrementEvictions(); - } - } else { - // If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents + if (includeInTotal || node.isAtLowestLevel()) { node.incrementEvictions(); } }; diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 6c49341591589..54b15f236a418 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -926,14 +926,14 @@ public void testDiskTierPolicies() throws Exception { MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( keyValueSize, - 100, + keyValueSize * 100, removalListener, Settings.builder() .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) .getKey(), - onHeapCacheSize * 50 + "b" + onHeapCacheSize * keyValueSize + "b" ) .build(), 0, @@ -955,6 +955,7 @@ public void testDiskTierPolicies() throws Exception { LoadAwareCacheLoader, String> loader = getLoadAwareCacheLoader(keyValuePairs); + int expectedEvictions = 0; for (String key : keyValuePairs.keySet()) { ICacheKey iCacheKey = getICacheKey(key); Boolean expectedOutput = expectedOutputs.get(key); @@ -967,8 +968,15 @@ public void testDiskTierPolicies() throws Exception { } else { // Should miss as heap tier size = 0 and the policy rejected it assertNull(result); + expectedEvictions++; } } + + // We expect values that were evicted from the heap tier and not allowed into the disk tier by the policy + // to count towards total evictions + assertEquals(keyValuePairs.size(), getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(0, getEvictionsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK)); // Disk tier is large enough for no evictions + assertEquals(expectedEvictions, getTotalStatsSnapshot(tieredSpilloverCache).getEvictions()); } public void testTookTimePolicyFromFactory() throws Exception { @@ -1493,6 +1501,11 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } + private ImmutableCacheStats getTotalStatsSnapshot(TieredSpilloverCache tsc) throws IOException { + ImmutableCacheStatsHolder cacheStats = tsc.stats(new String[0]); + return cacheStats.getStatsForDimensionValues(List.of()); + } + // Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin static class StringSerializer implements Serializer { private final Charset charset = StandardCharsets.UTF_8; diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java deleted file mode 100644 index 0f441fe01a368..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerIT.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.settings.Settings; -import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; - -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase { - - private static final String INDEX_NAME = "test-index"; - - @Before - public void setup() { - asyncUploadMockFsRepo = false; - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); - } - - private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { - prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); - Map indexStats = indexData(1, false, INDEX_NAME); - assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); - ensureGreen(INDEX_NAME); - return indexStats; - } - - public void testRemoteCleanupTaskUpdated() { - int shardCount = randomIntBetween(1, 2); - int replicaCount = 1; - int dataNodeCount = shardCount * (replicaCount + 1); - int clusterManagerNodeCount = 1; - - initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); - RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance( - RemoteClusterStateCleanupManager.class - ); - - assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval()); - assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); - - // now disable - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1)) - .get(); - - assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis()); - assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); - - // now set Clean up interval to 1 min - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m")) - .get(); - assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes()); - } - - public void testRemoteCleanupDeleteStale() throws Exception { - int shardCount = randomIntBetween(1, 2); - int replicaCount = 1; - int dataNodeCount = shardCount * (replicaCount + 1); - int clusterManagerNodeCount = 1; - - initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); - - // set cleanup interval to 100 ms to make the test faster - ClusterUpdateSettingsResponse response = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms")) - .get(); - - assertTrue(response.isAcknowledged()); - - // update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files - // to repository, if manifest files are less than that it means clean up has run - updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1); - - RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); - BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); - BlobPath baseMetadataPath = repository.basePath() - .add( - Base64.getUrlEncoder() - .withoutPadding() - .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) - ) - .add("cluster-state") - .add(getClusterState().metadata().clusterUUID()); - BlobPath manifestContainerPath = baseMetadataPath.add("manifest"); - - assertBusy(() -> { - int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size(); - logger.info("number of current manifest file: {}", manifestFiles); - // we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task - // other than replica count change which can upload new manifest files, that's why we check that number of manifests is between - // Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests) - assertTrue( - "Current number of manifest files: " + manifestFiles, - manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES - ); - }, 500, TimeUnit.MILLISECONDS); - } - - private void updateClusterStateNTimes(int n) { - int newReplicaCount = randomIntBetween(0, 3); - for (int i = n; i > 0; i--) { - ClusterUpdateSettingsResponse response = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS)) - .get(); - assertTrue(response.isAcknowledged()); - } - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index ab2f0f0080566..42120aa32eb47 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -10,6 +10,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; @@ -26,6 +27,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; @@ -49,6 +51,16 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build(); } + private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); + internalCluster().startDataOnlyNodes(numDataOnlyNodes); + for (String index : indices.split(",")) { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); Map indexStats = indexData(1, false, INDEX_NAME); @@ -57,6 +69,49 @@ private Map initialTestSetup(int shardCount, int replicaCount, int return indexStats; } + public void testFullClusterRestoreStaleDelete() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + setReplicaCount(0); + setReplicaCount(2); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(1); + setReplicaCount(0); + setReplicaCount(2); + setReplicaCount(0); + + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + + RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + + BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME); + BlobPath baseMetadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(getClusterState().metadata().clusterUUID()); + + assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size()); + + Map indexMetadataMap = remoteClusterStateService.getLatestClusterState( + cluster().getClusterName(), + getClusterState().metadata().clusterUUID() + ).getMetadata().getIndices(); + assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas()); + assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); + } + public void testRemoteStateStats() { int shardCount = randomIntBetween(1, 2); int replicaCount = 1; @@ -186,4 +241,12 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) { assertNotNull(nodesStatsResponse.getNodes().get(0)); assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats()); } + + private void setReplicaCount(int replicaCount) { + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) + .get(); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java index de7a52761c77c..0539f96e429c1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/CacheStatsAPIIndicesRequestCacheIT.java @@ -12,6 +12,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; @@ -20,7 +21,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.service.NodeCacheStats; import org.opensearch.common.cache.stats.ImmutableCacheStats; -import org.opensearch.common.cache.stats.ImmutableCacheStatsHolderTests; +import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.xcontent.XContentFactory; @@ -56,6 +57,10 @@ public static Collection parameters() { return Arrays.asList(new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }); } + /** + * Test aggregating by indices, indices+shards, shards, or no levels, and check the resulting stats + * are as we expect. + */ public void testCacheStatsAPIWIthOnHeapCache() throws Exception { String index1Name = "index1"; String index2Name = "index2"; @@ -73,84 +78,60 @@ public void testCacheStatsAPIWIthOnHeapCache() throws Exception { searchIndex(client, index2Name, ""); // First, aggregate by indices only - Map xContentMap = getNodeCacheStatsXContentMap(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME)); + ImmutableCacheStatsHolder indicesStats = getNodeCacheStatsResult(client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME)); - List index1Keys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue(), IndicesRequestCache.INDEX_DIMENSION_NAME, index1Name); + List index1Dimensions = List.of(index1Name); // Since we searched twice, we expect to see 1 hit, 1 miss and 1 entry for index 1 ImmutableCacheStats expectedStats = new ImmutableCacheStats(1, 1, 0, 0, 1); - checkCacheStatsAPIResponse(xContentMap, index1Keys, expectedStats, false, true); + checkCacheStatsAPIResponse(indicesStats, index1Dimensions, expectedStats, false, true); // Get the request size for one request, so we can reuse it for next index - int requestSize = (int) ((Map) ImmutableCacheStatsHolderTests.getValueFromNestedXContentMap( - xContentMap, - index1Keys - )).get(ImmutableCacheStats.Fields.SIZE_IN_BYTES); + long requestSize = indicesStats.getStatsForDimensionValues(List.of(index1Name)).getSizeInBytes(); assertTrue(requestSize > 0); - List index2Keys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue(), IndicesRequestCache.INDEX_DIMENSION_NAME, index2Name); + List index2Dimensions = List.of(index2Name); // We searched once in index 2, we expect 1 miss + 1 entry expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index2Keys, expectedStats, true, true); + checkCacheStatsAPIResponse(indicesStats, index2Dimensions, expectedStats, true, true); // The total stats for the node should be 1 hit, 2 misses, and 2 entries expectedStats = new ImmutableCacheStats(1, 2, 0, 2 * requestSize, 2); - List totalStatsKeys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue()); - checkCacheStatsAPIResponse(xContentMap, totalStatsKeys, expectedStats, true, true); + List totalStatsKeys = List.of(); + checkCacheStatsAPIResponse(indicesStats, totalStatsKeys, expectedStats, true, true); // Aggregate by shards only - xContentMap = getNodeCacheStatsXContentMap(client, List.of(IndicesRequestCache.SHARD_ID_DIMENSION_NAME)); + ImmutableCacheStatsHolder shardsStats = getNodeCacheStatsResult(client, List.of(IndicesRequestCache.SHARD_ID_DIMENSION_NAME)); - List index1Shard0Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index1Name + "][0]" - ); + List index1Shard0Dimensions = List.of("[" + index1Name + "][0]"); expectedStats = new ImmutableCacheStats(1, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index1Shard0Keys, expectedStats, true, true); + checkCacheStatsAPIResponse(shardsStats, index1Shard0Dimensions, expectedStats, true, true); - List index2Shard0Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index2Name + "][0]" - ); + List index2Shard0Dimensions = List.of("[" + index2Name + "][0]"); expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index2Shard0Keys, expectedStats, true, true); + checkCacheStatsAPIResponse(shardsStats, index2Shard0Dimensions, expectedStats, true, true); // Aggregate by indices and shards - xContentMap = getNodeCacheStatsXContentMap( + ImmutableCacheStatsHolder indicesAndShardsStats = getNodeCacheStatsResult( client, List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, IndicesRequestCache.SHARD_ID_DIMENSION_NAME) ); - index1Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.INDEX_DIMENSION_NAME, - index1Name, - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index1Name + "][0]" - ); + index1Dimensions = List.of(index1Name, "[" + index1Name + "][0]"); expectedStats = new ImmutableCacheStats(1, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index1Keys, expectedStats, true, true); - - index2Keys = List.of( - CacheType.INDICES_REQUEST_CACHE.getValue(), - IndicesRequestCache.INDEX_DIMENSION_NAME, - index2Name, - IndicesRequestCache.SHARD_ID_DIMENSION_NAME, - "[" + index2Name + "][0]" - ); + checkCacheStatsAPIResponse(indicesAndShardsStats, index1Dimensions, expectedStats, true, true); + index2Dimensions = List.of(index2Name, "[" + index2Name + "][0]"); expectedStats = new ImmutableCacheStats(0, 1, 0, requestSize, 1); - checkCacheStatsAPIResponse(xContentMap, index2Keys, expectedStats, true, true); - + checkCacheStatsAPIResponse(indicesAndShardsStats, index2Dimensions, expectedStats, true, true); } - // TODO: Add testCacheStatsAPIWithTieredCache when TSC stats implementation PR is merged - + /** + * Check the new stats API returns the same values as the old stats API. In particular, + * check that the new and old APIs are both correctly estimating memory size, + * using the logic that includes the overhead memory in ICacheKey. + */ public void testStatsMatchOldApi() throws Exception { - // The main purpose of this test is to check that the new and old APIs are both correctly estimating memory size, - // using the logic that includes the overhead memory in ICacheKey. String index = "index"; Client client = client(); startIndex(client, index); @@ -173,8 +154,7 @@ public void testStatsMatchOldApi() throws Exception { .getRequestCache(); assertNotEquals(0, oldApiStats.getMemorySizeInBytes()); - List xContentMapKeys = List.of(CacheType.INDICES_REQUEST_CACHE.getValue()); - Map xContentMap = getNodeCacheStatsXContentMap(client, List.of()); + ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); ImmutableCacheStats expected = new ImmutableCacheStats( oldApiStats.getHitCount(), oldApiStats.getMissCount(), @@ -183,9 +163,13 @@ public void testStatsMatchOldApi() throws Exception { 0 ); // Don't check entries, as the old API doesn't track this - checkCacheStatsAPIResponse(xContentMap, xContentMapKeys, expected, true, false); + checkCacheStatsAPIResponse(statsHolder, List.of(), expected, true, false); } + /** + * Test the XContent in the response behaves correctly when we pass null levels. + * Only the total cache stats should be returned. + */ public void testNullLevels() throws Exception { String index = "index"; Client client = client(); @@ -194,9 +178,81 @@ public void testNullLevels() throws Exception { for (int i = 0; i < numKeys; i++) { searchIndex(client, index, String.valueOf(i)); } - Map xContentMap = getNodeCacheStatsXContentMap(client, null); + Map xContentMap = getStatsXContent(getNodeCacheStatsResult(client, null)); // Null levels should result in only the total cache stats being returned -> 6 fields inside the response. - assertEquals(6, ((Map) xContentMap.get("request_cache")).size()); + assertEquals(6, xContentMap.size()); + } + + /** + * Test clearing the cache using API sets memory size and number of items to 0, but leaves other stats + * unaffected. + */ + public void testCacheClear() throws Exception { + String index = "index"; + Client client = client(); + + startIndex(client, index); + + int expectedHits = 2; + int expectedMisses = 7; + // Search for the same doc to give hits + for (int i = 0; i < expectedHits + 1; i++) { + searchIndex(client, index, ""); + } + // Search for new docs + for (int i = 0; i < expectedMisses - 1; i++) { + searchIndex(client, index, String.valueOf(i)); + } + + ImmutableCacheStats expectedTotal = new ImmutableCacheStats(expectedHits, expectedMisses, 0, 0, expectedMisses); + ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); + // Don't check the memory size, just assert it's nonzero + checkCacheStatsAPIResponse(statsHolder, List.of(), expectedTotal, false, true); + long originalMemorySize = statsHolder.getTotalSizeInBytes(); + assertNotEquals(0, originalMemorySize); + + // Clear cache + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(index); + client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + + // Now size and items should be 0 + expectedTotal = new ImmutableCacheStats(expectedHits, expectedMisses, 0, 0, 0); + statsHolder = getNodeCacheStatsResult(client, List.of()); + checkCacheStatsAPIResponse(statsHolder, List.of(), expectedTotal, true, true); + } + + /** + * Test the cache stats responses are in the expected place in XContent when we call the overall API + * GET /_nodes/stats. They should be at nodes.[node_id].caches.request_cache. + */ + public void testNodesStatsResponse() throws Exception { + String index = "index"; + Client client = client(); + + startIndex(client, index); + + NodesStatsResponse nodeStatsResponse = client.admin() + .cluster() + .prepareNodesStats("data:true") + .all() // This mimics /_nodes/stats + .get(); + XContentBuilder builder = XContentFactory.jsonBuilder(); + Map paramMap = new HashMap<>(); + ToXContent.Params params = new ToXContent.MapParams(paramMap); + + builder.startObject(); + nodeStatsResponse.toXContent(builder, params); + builder.endObject(); + Map xContentMap = XContentHelper.convertToMap(MediaTypeRegistry.JSON.xContent(), builder.toString(), true); + // Values should be at nodes.[node_id].caches.request_cache + // Get the node id + Map nodesResponse = (Map) xContentMap.get("nodes"); + assertEquals(1, nodesResponse.size()); + String nodeId = nodesResponse.keySet().toArray(String[]::new)[0]; + Map cachesResponse = (Map) ((Map) nodesResponse.get(nodeId)).get("caches"); + assertNotNull(cachesResponse); + // Request cache should be present in the response + assertTrue(cachesResponse.containsKey("request_cache")); } private void startIndex(Client client, String indexName) throws InterruptedException { @@ -227,8 +283,7 @@ private SearchResponse searchIndex(Client client, String index, String searchSuf return resp; } - private static Map getNodeCacheStatsXContentMap(Client client, List aggregationLevels) throws IOException { - + private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, List aggregationLevels) throws IOException { CommonStatsFlags statsFlags = new CommonStatsFlags(); statsFlags.includeAllCacheTypes(); String[] flagsLevels; @@ -248,16 +303,16 @@ private static Map getNodeCacheStatsXContentMap(Client client, L // Can always get the first data node as there's only one in this test suite assertEquals(1, nodeStatsResponse.getNodes().size()); NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats(); + return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE); + } + private static Map getStatsXContent(ImmutableCacheStatsHolder statsHolder) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); Map paramMap = new HashMap<>(); - if (aggregationLevels != null && !aggregationLevels.isEmpty()) { - paramMap.put("level", String.join(",", aggregationLevels)); - } ToXContent.Params params = new ToXContent.MapParams(paramMap); builder.startObject(); - ncs.toXContent(builder, params); + statsHolder.toXContent(builder, params); builder.endObject(); String resultString = builder.toString(); @@ -265,27 +320,22 @@ private static Map getNodeCacheStatsXContentMap(Client client, L } private static void checkCacheStatsAPIResponse( - Map xContentMap, - List xContentMapKeys, + ImmutableCacheStatsHolder statsHolder, + List dimensionValues, ImmutableCacheStats expectedStats, boolean checkMemorySize, boolean checkEntries ) { - // Assumes the keys point to a level whose keys are the field values ("size_in_bytes", "evictions", etc) and whose values store - // those stats - Map aggregatedStatsResponse = (Map) ImmutableCacheStatsHolderTests.getValueFromNestedXContentMap( - xContentMap, - xContentMapKeys - ); + ImmutableCacheStats aggregatedStatsResponse = statsHolder.getStatsForDimensionValues(dimensionValues); assertNotNull(aggregatedStatsResponse); - assertEquals(expectedStats.getHits(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.HIT_COUNT)); - assertEquals(expectedStats.getMisses(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.MISS_COUNT)); - assertEquals(expectedStats.getEvictions(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.EVICTIONS)); + assertEquals(expectedStats.getHits(), (int) aggregatedStatsResponse.getHits()); + assertEquals(expectedStats.getMisses(), (int) aggregatedStatsResponse.getMisses()); + assertEquals(expectedStats.getEvictions(), (int) aggregatedStatsResponse.getEvictions()); if (checkMemorySize) { - assertEquals(expectedStats.getSizeInBytes(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.SIZE_IN_BYTES)); + assertEquals(expectedStats.getSizeInBytes(), (int) aggregatedStatsResponse.getSizeInBytes()); } if (checkEntries) { - assertEquals(expectedStats.getItems(), (int) aggregatedStatsResponse.get(ImmutableCacheStats.Fields.ITEM_COUNT)); + assertEquals(expectedStats.getItems(), (int) aggregatedStatsResponse.getItems()); } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 64efcee6ef1b5..740aee69f7d80 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -350,14 +350,4 @@ protected void restore(boolean restoreAllShards, String... indices) { PlainActionFuture.newFuture() ); } - - protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); - internalCluster().startDataOnlyNodes(numDataOnlyNodes); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } - } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java index 4d108f8d78a69..ca2685e093d3f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java @@ -138,7 +138,7 @@ public CommonStatsFlags all() { includeUnloadedSegments = false; includeAllShardIndexingPressureTrackers = false; includeOnlyTopIndexingPressureMetrics = false; - includeCaches = EnumSet.noneOf(CacheType.class); + includeCaches = EnumSet.allOf(CacheType.class); levels = new String[0]; return this; } diff --git a/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java b/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java index 07c75eab34194..dd94dbf61debb 100644 --- a/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java +++ b/server/src/main/java/org/opensearch/common/cache/service/NodeCacheStats.java @@ -8,6 +8,7 @@ package org.opensearch.common.cache.service; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; @@ -51,6 +52,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(NodesStatsRequest.Metric.CACHE_STATS.metricName()); for (CacheType type : statsByCache.keySet()) { if (flags.getIncludeCaches().contains(type)) { builder.startObject(type.getValue()); @@ -58,6 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); } } + builder.endObject(); return builder; } @@ -77,4 +80,10 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(statsByCache, flags); } + + // Get the immutable cache stats for a given cache, used to avoid having to process XContent in tests. + // Safe to expose publicly as the ImmutableCacheStatsHolder can't be modified after its creation. + public ImmutableCacheStatsHolder getStatsByCache(CacheType cacheType) { + return statsByCache.get(cacheType); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 297fc98764d07..7814518af471b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -104,7 +104,6 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; -import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpTransportSettings; import org.opensearch.index.IndexModule; @@ -712,7 +711,6 @@ public void apply(Settings value, Settings current, Settings previous) { SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL, // Remote cluster state settings - RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java deleted file mode 100644 index 8a106a25e5630..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ /dev/null @@ -1,408 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Strings; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.blobstore.BlobMetadata; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.AbstractAsyncTask; -import org.opensearch.core.action.ActionListener; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; -import org.opensearch.threadpool.ThreadPool; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT; -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; - -/** - * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task - * - * @opensearch.internal - */ -public class RemoteClusterStateCleanupManager implements Closeable { - - public static final int RETAINED_MANIFESTS = 10; - public static final int SKIP_CLEANUP_STATE_CHANGES = 10; - public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5); - public static final TimeValue CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM = TimeValue.MINUS_ONE; - - /** - * Setting to specify the interval to do run stale file cleanup job - * Min value -1 indicates that the stale file cleanup job should be disabled - */ - public static final Setting REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( - "cluster.remote_store.state.cleanup_interval", - CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, - CLUSTER_STATE_CLEANUP_INTERVAL_MINIMUM, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - private static final Logger logger = LogManager.getLogger(RemoteClusterStateCleanupManager.class); - private final RemoteClusterStateService remoteClusterStateService; - private final RemotePersistenceStats remoteStateStats; - private BlobStoreTransferService blobStoreTransferService; - private TimeValue staleFileCleanupInterval; - private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); - private AsyncStaleFileDeletion staleFileDeletionTask; - private long lastCleanupAttemptStateVersion; - private final ThreadPool threadpool; - private final ClusterApplierService clusterApplierService; - - public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) { - this.remoteClusterStateService = remoteClusterStateService; - this.remoteStateStats = remoteClusterStateService.getStats(); - ClusterSettings clusterSettings = clusterService.getClusterSettings(); - this.clusterApplierService = clusterService.getClusterApplierService(); - this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING); - this.threadpool = remoteClusterStateService.getThreadpool(); - // initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold - this.lastCleanupAttemptStateVersion = 0; - clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval); - } - - void start() { - staleFileDeletionTask = new AsyncStaleFileDeletion(this); - } - - @Override - public void close() throws IOException { - if (staleFileDeletionTask != null) { - staleFileDeletionTask.close(); - } - } - - private BlobStoreTransferService getBlobStoreTransferService() { - if (blobStoreTransferService == null) { - blobStoreTransferService = new BlobStoreTransferService(remoteClusterStateService.getBlobStore(), threadpool); - } - return blobStoreTransferService; - } - - private void updateCleanupInterval(TimeValue updatedInterval) { - this.staleFileCleanupInterval = updatedInterval; - logger.info("updated remote state cleanup interval to {}", updatedInterval); - // After updating the interval, we need to close the current task and create a new one which will run with updated interval - if (staleFileDeletionTask != null && !staleFileDeletionTask.getInterval().equals(updatedInterval)) { - staleFileDeletionTask.setInterval(updatedInterval); - } - } - - // visible for testing - void cleanUpStaleFiles() { - ClusterState currentAppliedState = clusterApplierService.state(); - if (currentAppliedState.nodes().isLocalNodeElectedClusterManager()) { - long cleanUpAttemptStateVersion = currentAppliedState.version(); - assert Strings.isNotEmpty(currentAppliedState.getClusterName().value()) : "cluster name is not set"; - assert Strings.isNotEmpty(currentAppliedState.metadata().clusterUUID()) : "cluster uuid is not set"; - if (cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion > SKIP_CLEANUP_STATE_CHANGES) { - logger.info( - "Cleaning up stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates", - currentAppliedState.getClusterName().value(), - currentAppliedState.metadata().clusterUUID(), - cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion - ); - this.deleteStaleClusterMetadata( - currentAppliedState.getClusterName().value(), - currentAppliedState.metadata().clusterUUID(), - RETAINED_MANIFESTS - ); - lastCleanupAttemptStateVersion = cleanUpAttemptStateVersion; - } else { - logger.debug( - "Skipping cleanup of stale remote state files for cluster [{}] with uuid [{}]. Last clean was done before {} updates, which is less than threshold {}", - currentAppliedState.getClusterName().value(), - currentAppliedState.metadata().clusterUUID(), - cleanUpAttemptStateVersion - lastCleanupAttemptStateVersion, - SKIP_CLEANUP_STATE_CHANGES - ); - } - } else { - logger.debug("Skipping cleanup task as local node is not elected Cluster Manager"); - } - } - - private void addStaleGlobalMetadataPath(String fileName, Set filesToKeep, Set staleGlobalMetadataPaths) { - if (!filesToKeep.contains(fileName)) { - String[] splitPath = fileName.split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - splitPath[splitPath.length - 1] - ) - ); - } - } - - // visible for testing - void deleteClusterMetadata( - String clusterName, - String clusterUUID, - List activeManifestBlobMetadata, - List staleManifestBlobMetadata - ) { - try { - Set filesToKeep = new HashSet<>(); - Set staleManifestPaths = new HashSet<>(); - Set staleIndexMetadataPaths = new HashSet<>(); - Set staleGlobalMetadataPaths = new HashSet<>(); - activeManifestBlobMetadata.forEach(blobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - blobMetadata.name() - ); - clusterMetadataManifest.getIndices() - .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); - if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) { - filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); - } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) { - filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); - filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); - filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); - clusterMetadataManifest.getCustomMetadataMap() - .values() - .forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename())); - } - }); - staleManifestBlobMetadata.forEach(blobMetadata -> { - ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest( - clusterName, - clusterUUID, - blobMetadata.name() - ); - staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); - if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) { - addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths); - } else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) { - addStaleGlobalMetadataPath( - clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(), - filesToKeep, - staleGlobalMetadataPaths - ); - addStaleGlobalMetadataPath( - clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(), - filesToKeep, - staleGlobalMetadataPaths - ); - addStaleGlobalMetadataPath( - clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(), - filesToKeep, - staleGlobalMetadataPaths - ); - clusterMetadataManifest.getCustomMetadataMap() - .values() - .forEach( - attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths) - ); - } - - clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { - if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { - staleIndexMetadataPaths.add( - new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() - + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename()) - ); - } - }); - }); - - if (staleManifestPaths.isEmpty()) { - logger.debug("No stale Remote Cluster Metadata files found"); - return; - } - - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); - deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); - } catch (IllegalStateException e) { - logger.error("Error while fetching Remote Cluster Metadata manifests", e); - } catch (IOException e) { - logger.error("Error while deleting stale Remote Cluster Metadata files", e); - remoteStateStats.cleanUpAttemptFailed(); - } catch (Exception e) { - logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); - remoteStateStats.cleanUpAttemptFailed(); - } - } - - /** - * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests - * - * @param clusterName name of the cluster - * @param clusterUUID uuid of cluster state to refer to in remote - * @param manifestsToRetain no of latest manifest files to keep in remote - */ - // package private for testing - void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { - if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { - logger.info("Delete stale cluster metadata task is already in progress."); - return; - } - try { - getBlobStoreTransferService().listAllInSortedOrderAsync( - ThreadPool.Names.REMOTE_PURGE, - remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID), - MANIFEST_FILE_PREFIX, - Integer.MAX_VALUE, - new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - if (blobMetadata.size() > manifestsToRetain) { - deleteClusterMetadata( - clusterName, - clusterUUID, - blobMetadata.subList(0, manifestsToRetain), - blobMetadata.subList(manifestsToRetain, blobMetadata.size()) - ); - } - deleteStaleMetadataRunning.set(false); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", - clusterUUID - ) - ); - deleteStaleMetadataRunning.set(false); - } - } - ); - } catch (Exception e) { - deleteStaleMetadataRunning.set(false); - throw e; - } - } - - /** - * Purges all remote cluster state against provided cluster UUIDs - * - * @param clusterName name of the cluster - * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged - */ - void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { - clusterUUIDs.forEach( - clusterUUID -> getBlobStoreTransferService().deleteAsync( - ThreadPool.Names.REMOTE_PURGE, - remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID), - new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); - } - - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", - clusterUUID - ), - e - ); - remoteStateStats.cleanUpAttemptFailed(); - } - } - ) - ); - } - - // package private for testing - void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { - logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); - getBlobStoreTransferService().deleteBlobs( - remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID), - stalePaths - ); - } - - /** - * Purges all remote cluster state against provided cluster UUIDs - * @param clusterState current state of the cluster - * @param committedManifest last committed ClusterMetadataManifest - */ - public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) { - threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { - String clusterName = clusterState.getClusterName().value(); - logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); - Set allClustersUUIDsInRemote; - try { - allClustersUUIDsInRemote = new HashSet<>( - remoteClusterStateService.getAllClusterUUIDs(clusterState.getClusterName().value()) - ); - } catch (IOException e) { - logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); - return; - } - // Retain last 2 cluster uuids data - allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); - allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); - deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); - }); - } - - public TimeValue getStaleFileCleanupInterval() { - return this.staleFileCleanupInterval; - } - - AsyncStaleFileDeletion getStaleFileDeletionTask() { // for testing - return this.staleFileDeletionTask; - } - - RemotePersistenceStats getStats() { - return this.remoteStateStats; - } - - static final class AsyncStaleFileDeletion extends AbstractAsyncTask { - private final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; - - AsyncStaleFileDeletion(RemoteClusterStateCleanupManager remoteClusterStateCleanupManager) { - super( - logger, - remoteClusterStateCleanupManager.threadpool, - remoteClusterStateCleanupManager.getStaleFileCleanupInterval(), - true - ); - this.remoteClusterStateCleanupManager = remoteClusterStateCleanupManager; - rescheduleIfNecessary(); - } - - @Override - protected boolean mustReschedule() { - return true; - } - - @Override - protected void runInternal() { - remoteClusterStateCleanupManager.cleanUpStaleFiles(); - } - } -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 01ffd8f1cca46..0f862d1b68820 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,13 +18,11 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -61,6 +59,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; @@ -82,6 +81,8 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int RETAINED_MANIFESTS = 10; + public static final String DELIMITER = "__"; public static final String CUSTOM_DELIMITER = "--"; @@ -206,7 +207,8 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue indexMetadataUploadTimeout; private volatile TimeValue globalMetadataUploadTimeout; private volatile TimeValue metadataManifestUploadTimeout; - private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; + + private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); private final RemotePersistenceStats remoteStateStats; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " @@ -230,7 +232,7 @@ public RemoteClusterStateService( String nodeId, Supplier repositoriesService, Settings settings, - ClusterService clusterService, + ClusterSettings clusterSettings, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, List indexMetadataUploadListeners @@ -241,7 +243,6 @@ public RemoteClusterStateService( this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.threadpool = threadPool; - ClusterSettings clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); @@ -251,10 +252,16 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; } + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool); + } + return blobStoreTransferService; + } + /** * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be * invoked by the elected cluster manager when the remote cluster state is enabled. @@ -410,6 +417,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( : previousManifest.getCustomMetadataMap(), false ); + deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateSucceeded(); @@ -713,10 +721,6 @@ private CheckedRunnable getAsyncMetadataWriteAction( ); } - public RemoteClusterStateCleanupManager getCleanupManager() { - return remoteClusterStateCleanupManager; - } - @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -736,16 +740,12 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat previousManifest.getCustomMetadataMap(), true ); - if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { - remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifest); - } - + deleteStaleClusterUUIDs(clusterState, committedManifest); return committedManifest; } @Override public void close() throws IOException { - remoteClusterStateCleanupManager.close(); if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } @@ -760,7 +760,6 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; - remoteClusterStateCleanupManager.start(); } private ClusterMetadataManifest uploadManifest( @@ -851,14 +850,6 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust ); } - ThreadPool getThreadpool() { - return threadpool; - } - - BlobStore getBlobStore() { - return blobStoreRepository.blobStore(); - } - private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX return blobStoreRepository.blobStore() @@ -876,7 +867,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID) return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); } - BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { + private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); } @@ -991,7 +982,7 @@ private static String metadataAttributeFileName(String componentPrefix, Long met ); } - BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); } @@ -1244,7 +1235,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } - Set getAllClusterUUIDs(String clusterName) throws IOException { + private Set getAllClusterUUIDs(String clusterName) throws IOException { Map clusterUUIDMetadata = clusterUUIDContainer(clusterName).children(); if (clusterUUIDMetadata == null) { return Collections.emptySet(); @@ -1435,7 +1426,7 @@ private Optional getLatestManifestFileName(String clusterName, String cl * @param clusterName name of the cluster * @return ClusterMetadataManifest */ - ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) + private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) throws IllegalStateException { try { return getClusterMetadataManifestBlobStoreFormat(filename).read( @@ -1495,6 +1486,234 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) { } } + /** + * Purges all remote cluster state against provided cluster UUIDs + * + * @param clusterName name of the cluster + * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged + */ + void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { + clusterUUIDs.forEach(clusterUUID -> { + getBlobStoreTransferService().deleteAsync( + ThreadPool.Names.REMOTE_PURGE, + getCusterMetadataBasePath(clusterName, clusterUUID), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote cluster metadata for cluster UUID - {}", clusterUUID); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting all remote cluster metadata for cluster UUID {}", + clusterUUID + ), + e + ); + remoteStateStats.cleanUpAttemptFailed(); + } + } + ); + }); + } + + /** + * Deletes older than last {@code versionsToRetain} manifests. Also cleans up unreferenced IndexMetadata associated with older manifests + * + * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote + * @param manifestsToRetain no of latest manifest files to keep in remote + */ + // package private for testing + void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) { + if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) { + logger.info("Delete stale cluster metadata task is already in progress."); + return; + } + try { + getBlobStoreTransferService().listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + getManifestFolderPath(clusterName, clusterUUID), + "manifest", + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + if (blobMetadata.size() > manifestsToRetain) { + deleteClusterMetadata( + clusterName, + clusterUUID, + blobMetadata.subList(0, manifestsToRetain - 1), + blobMetadata.subList(manifestsToRetain - 1, blobMetadata.size()) + ); + } + deleteStaleMetadataRunning.set(false); + } + + @Override + public void onFailure(Exception e) { + logger.error( + new ParameterizedMessage( + "Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}", + clusterUUID + ) + ); + deleteStaleMetadataRunning.set(false); + } + } + ); + } catch (Exception e) { + deleteStaleMetadataRunning.set(false); + throw e; + } + } + + private void deleteClusterMetadata( + String clusterName, + String clusterUUID, + List activeManifestBlobMetadata, + List staleManifestBlobMetadata + ) { + try { + Set filesToKeep = new HashSet<>(); + Set staleManifestPaths = new HashSet<>(); + Set staleIndexMetadataPaths = new HashSet<>(); + Set staleGlobalMetadataPaths = new HashSet<>(); + activeManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + clusterMetadataManifest.getIndices() + .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + if (clusterMetadataManifest.getGlobalMetadataFileName() != null) { + filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); + } else { + filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); + filesToKeep.add(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); + filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); + clusterMetadataManifest.getCustomMetadataMap() + .forEach((key, value) -> { filesToKeep.add(value.getUploadedFilename()); }); + } + }); + staleManifestBlobMetadata.forEach(blobMetadata -> { + ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( + clusterName, + clusterUUID, + blobMetadata.name() + ); + staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); + if (clusterMetadataManifest.getGlobalMetadataFileName() != null) { + if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) { + String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + globalMetadataSplitPath[globalMetadataSplitPath.length - 1] + ) + ); + } + } else { + if (filesToKeep.contains(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()) == false) { + String[] coordinationMetadataSplitPath = clusterMetadataManifest.getCoordinationMetadata() + .getUploadedFilename() + .split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + coordinationMetadataSplitPath[coordinationMetadataSplitPath.length - 1] + ) + ); + } + if (filesToKeep.contains(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()) == false) { + String[] templatesMetadataSplitPath = clusterMetadataManifest.getTemplatesMetadata() + .getUploadedFilename() + .split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + templatesMetadataSplitPath[templatesMetadataSplitPath.length - 1] + ) + ); + } + if (filesToKeep.contains(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()) == false) { + String[] settingsMetadataSplitPath = clusterMetadataManifest.getSettingsMetadata().getUploadedFilename().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + settingsMetadataSplitPath[settingsMetadataSplitPath.length - 1] + ) + ); + } + clusterMetadataManifest.getCustomMetadataMap().forEach((key, value) -> { + if (filesToKeep.contains(value.getUploadedFilename()) == false) { + String[] customMetadataSplitPath = value.getUploadedFilename().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + customMetadataSplitPath[customMetadataSplitPath.length - 1] + ) + ); + } + }); + } + + clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { + if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { + staleIndexMetadataPaths.add( + new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString() + + INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename()) + ); + } + }); + }); + + if (staleManifestPaths.isEmpty()) { + logger.debug("No stale Remote Cluster Metadata files found"); + return; + } + + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); + } catch (IllegalStateException e) { + logger.error("Error while fetching Remote Cluster Metadata manifests", e); + } catch (IOException e) { + logger.error("Error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); + } catch (Exception e) { + logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); + } + } + + private void deleteStalePaths(String clusterName, String clusterUUID, List stalePaths) throws IOException { + logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths)); + getBlobStoreTransferService().deleteBlobs(getCusterMetadataBasePath(clusterName, clusterUUID), stalePaths); + } + + /** + * Purges all remote cluster state against provided cluster UUIDs + * + * @param clusterState current state of the cluster + * @param committedManifest last committed ClusterMetadataManifest + */ + public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) { + threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + String clusterName = clusterState.getClusterName().value(); + logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName); + Set allClustersUUIDsInRemote; + try { + allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value())); + } catch (IOException e) { + logger.info(String.format(Locale.ROOT, "Error while fetching all cluster UUIDs for [%s]", clusterName)); + return; + } + // Retain last 2 cluster uuids data + allClustersUUIDsInRemote.remove(committedManifest.getClusterUUID()); + allClustersUUIDsInRemote.remove(committedManifest.getPreviousClusterUUID()); + deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); + }); + } + public RemotePersistenceStats getStats() { return remoteStateStats; } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 44af83bb839c1..57f7e402536f2 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -715,23 +715,28 @@ private synchronized void cleanCache(double stalenessThreshold) { } // Contains CleanupKey objects with open shard but invalidated readerCacheKeyId. final Set cleanupKeysFromOutdatedReaders = new HashSet<>(); - // Contains CleanupKey objects of a closed shard. + // Contains CleanupKey objects for a full cache cleanup. + final Set> cleanupKeysFromFullClean = new HashSet<>(); + // Contains CleanupKey objects for a closed shard. final Set> cleanupKeysFromClosedShards = new HashSet<>(); for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { - // null indicates full cleanup, as does a closed shard - IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + final IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); + if (cleanupKey.readerCacheKeyId == null) { + // null indicates full cleanup // Add both shardId and indexShardHashCode to uniquely identify an indexShard. + cleanupKeysFromFullClean.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode())); + } else if (!cleanupKey.entity.isOpen()) { + // The shard is closed cleanupKeysFromClosedShards.add(new Tuple<>(indexShard.shardId(), indexShard.hashCode())); } else { cleanupKeysFromOutdatedReaders.add(cleanupKey); } } - if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { + if (cleanupKeysFromOutdatedReaders.isEmpty() && cleanupKeysFromFullClean.isEmpty() && cleanupKeysFromClosedShards.isEmpty()) { return; } @@ -740,15 +745,15 @@ private synchronized void cleanCache(double stalenessThreshold) { for (Iterator> iterator = cache.keys().iterator(); iterator.hasNext();) { ICacheKey key = iterator.next(); Key delegatingKey = key.key; - if (cleanupKeysFromClosedShards.contains(new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode))) { - // Since the shard is closed, the cache should drop stats for this shard. - dimensionListsToDrop.add(key.dimensions); + Tuple shardIdInfo = new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode); + if (cleanupKeysFromFullClean.contains(shardIdInfo) || cleanupKeysFromClosedShards.contains(shardIdInfo)) { iterator.remove(); } else { CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null); if (cacheEntity == null) { // If cache entity is null, it means that index or shard got deleted/closed meanwhile. // So we will delete this key. + dimensionListsToDrop.add(key.dimensions); iterator.remove(); } else { CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId); @@ -757,6 +762,12 @@ private synchronized void cleanCache(double stalenessThreshold) { } } } + + if (cleanupKeysFromClosedShards.contains(shardIdInfo)) { + // Since the shard is closed, the cache should drop stats for this shard. + // This should not happen on a full cache cleanup. + dimensionListsToDrop.add(key.dimensions); + } } for (List closedDimensions : dimensionListsToDrop) { // Invalidate a dummy key containing the dimensions we need to drop stats for diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 49545fa8a0c8b..76109ba10624a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -138,7 +138,6 @@ import org.opensearch.gateway.MetaStateService; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; -import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.identity.IdentityService; @@ -753,7 +752,6 @@ protected Node( threadPool::relativeTimeInMillis ); final RemoteClusterStateService remoteClusterStateService; - final RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; final RemoteIndexPathUploader remoteIndexPathUploader; if (isRemoteStoreClusterStateEnabled(settings)) { remoteIndexPathUploader = new RemoteIndexPathUploader( @@ -766,16 +764,14 @@ protected Node( nodeEnvironment.nodeId(), repositoriesServiceReference::get, settings, - clusterService, + clusterService.getClusterSettings(), threadPool::preciseRelativeTimeInNanos, threadPool, List.of(remoteIndexPathUploader) ); - remoteClusterStateCleanupManager = remoteClusterStateService.getCleanupManager(); } else { remoteClusterStateService = null; remoteIndexPathUploader = null; - remoteClusterStateCleanupManager = null; } // collect engine factory providers from plugins @@ -1380,7 +1376,6 @@ protected Node( b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); - b.bind(RemoteClusterStateCleanupManager.class).toProvider(() -> remoteClusterStateCleanupManager); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 418e6d8de6adb..3ba98c44f8d3e 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -462,7 +462,9 @@ public void testDataOnlyNodePersistence() throws Exception { }); when(transportService.getThreadPool()).thenReturn(threadPool); ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); final PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService( nodeEnvironment, xContentRegistry(), @@ -485,7 +487,7 @@ public void testDataOnlyNodePersistence() throws Exception { nodeEnvironment.nodeId(), repositoriesServiceSupplier, settings, - clusterService, + clusterSettings, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java deleted file mode 100644 index 24fd1b164a4ff..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobMetadata; -import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.support.PlainBlobMetadata; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.AbstractAsyncTask; -import org.opensearch.core.action.ActionListener; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.fs.FsRepository; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.VersionUtils; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.junit.After; -import org.junit.Before; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - -import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; -import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString; -import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; -import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase { - private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; - private Supplier repositoriesServiceSupplier; - private RepositoriesService repositoriesService; - private BlobStoreRepository blobStoreRepository; - private BlobStore blobStore; - private ClusterSettings clusterSettings; - private ClusterApplierService clusterApplierService; - private ClusterState clusterState; - private Metadata metadata; - private RemoteClusterStateService remoteClusterStateService; - private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); - - @Before - public void setup() { - repositoriesServiceSupplier = mock(Supplier.class); - repositoriesService = mock(RepositoriesService.class); - when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - "remote_store_repository" - ); - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - "remote_store_repository" - ); - - Settings settings = Settings.builder() - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") - .put(stateRepoTypeAttributeKey, FsRepository.TYPE) - .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .build(); - - clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterApplierService = mock(ClusterApplierService.class); - clusterState = mock(ClusterState.class); - metadata = mock(Metadata.class); - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); - when(clusterState.getClusterName()).thenReturn(new ClusterName("test")); - when(metadata.clusterUUID()).thenReturn("testUUID"); - when(clusterState.metadata()).thenReturn(metadata); - when(clusterApplierService.state()).thenReturn(clusterState); - when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); - - blobStoreRepository = mock(BlobStoreRepository.class); - blobStore = mock(BlobStore.class); - when(blobStoreRepository.blobStore()).thenReturn(blobStore); - when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); - - remoteClusterStateService = mock(RemoteClusterStateService.class); - when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats()); - when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool); - when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore); - remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService); - } - - @After - public void teardown() throws Exception { - super.tearDown(); - remoteClusterStateCleanupManager.close(); - threadPool.shutdown(); - } - - public void testDeleteClusterMetadata() throws IOException { - String clusterUUID = "clusterUUID"; - String clusterName = "test-cluster"; - List inactiveBlobs = Arrays.asList( - new PlainBlobMetadata("manifest1.dat", 1L), - new PlainBlobMetadata("manifest2.dat", 1L), - new PlainBlobMetadata("manifest3.dat", 1L) - ); - List activeBlobs = Arrays.asList( - new PlainBlobMetadata("manifest4.dat", 1L), - new PlainBlobMetadata("manifest5.dat", 1L) - ); - UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1"); - UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2"); - UploadedIndexMetadata index1UpdatedMetadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1_updated"); - UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata"); - UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata"); - UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata"); - UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute( - COORDINATION_METADATA, - "coordination_metadata_updated" - ); - UploadedMetadataAttribute templateMetadataUpdated = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata_updated"); - UploadedMetadataAttribute settingMetadataUpdated = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata_updated"); - ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder() - .indices(List.of(index1Metadata)) - .globalMetadataFileName("global_metadata") - .clusterTerm(1L) - .stateVersion(1L) - .codecVersion(CODEC_V1) - .stateUUID(randomAlphaOfLength(10)) - .clusterUUID(clusterUUID) - .nodeId("nodeA") - .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) - .previousClusterUUID(ClusterState.UNKNOWN_UUID) - .committed(true) - .build(); - ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1) - .indices(List.of(index1Metadata, index2Metadata)) - .codecVersion(CODEC_V2) - .globalMetadataFileName(null) - .coordinationMetadata(coordinationMetadata) - .templatesMetadata(templateMetadata) - .settingMetadata(settingMetadata) - .build(); - ClusterMetadataManifest manifest3 = ClusterMetadataManifest.builder(manifest2) - .indices(List.of(index1UpdatedMetadata, index2Metadata)) - .settingMetadata(settingMetadataUpdated) - .build(); - - // active manifest have reference to index1Updated, index2, settingsUpdated, coordinationUpdated, templates, templatesUpdated - ClusterMetadataManifest manifest4 = ClusterMetadataManifest.builder(manifest3) - .coordinationMetadata(coordinationMetadataUpdated) - .build(); - ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build(); - - when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( - manifest4, - manifest5, - manifest1, - manifest2, - manifest3 - ); - BlobContainer container = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).thenReturn(container); - doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); - - remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); - verify(container).deleteBlobsIgnoringIfNotExists( - List.of( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + coordinationMetadata.getUploadedFilename() + ".dat", - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + settingMetadata.getUploadedFilename() + ".dat", - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + "global_metadata.dat" - ) - ); - verify(container).deleteBlobsIgnoringIfNotExists( - List.of( - new BlobPath().add(INDEX_PATH_TOKEN).add(index1Metadata.getIndexUUID()).buildAsString() - + index1Metadata.getUploadedFilePath() - + ".dat" - ) - ); - Set staleManifest = new HashSet<>(); - inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name())); - verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest)); - } - - public void testDeleteStaleClusterUUIDs() throws IOException { - final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); - ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder() - .indices(List.of()) - .clusterTerm(1L) - .stateVersion(1L) - .stateUUID(randomAlphaOfLength(10)) - .clusterUUID("cluster-uuid1") - .nodeId("nodeA") - .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) - .previousClusterUUID(ClusterState.UNKNOWN_UUID) - .committed(true) - .build(); - - BlobPath blobPath = new BlobPath().add("random-path"); - BlobContainer uuidContainerContainer = mock(BlobContainer.class); - BlobContainer manifest2Container = mock(BlobContainer.class); - BlobContainer manifest3Container = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).then(invocation -> { - BlobPath blobPath1 = invocation.getArgument(0); - if (blobPath1.buildAsString().endsWith("cluster-state/")) { - return uuidContainerContainer; - } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) { - return manifest2Container; - } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) { - return manifest3Container; - } else { - throw new IllegalArgumentException("Unexpected blob path " + blobPath1); - } - }); - when( - manifest2Container.listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - Integer.MAX_VALUE, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); - when( - manifest3Container.listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - Integer.MAX_VALUE, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); - Set uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3")); - when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids); - when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then( - invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0))) - .add(CLUSTER_STATE_PATH_TOKEN) - .add((String) invocationOnMock.getArgument(1)) - ); - remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); - try { - assertBusy(() -> { - verify(manifest2Container, times(1)).delete(); - verify(manifest3Container, times(1)).delete(); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void testRemoteStateCleanupFailureStats() throws IOException { - BlobContainer blobContainer = mock(BlobContainer.class); - doThrow(IOException.class).when(blobContainer).delete(); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - BlobPath blobPath = new BlobPath().add("random-path"); - when((blobStoreRepository.basePath())).thenReturn(blobPath); - remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1")); - try { - assertBusy(() -> { - // wait for stats to get updated - assertNotNull(remoteClusterStateCleanupManager.getStats()); - assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount()); - assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount()); - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { - BlobContainer blobContainer = mock(BlobContainer.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> { - callCount.incrementAndGet(); - if (latch.await(5000, TimeUnit.SECONDS) == false) { - throw new Exception("Timed out waiting for delete task queuing to complete"); - } - return null; - }).when(blobContainer) - .listBlobsByPrefixInSortedOrder( - any(String.class), - any(int.class), - any(BlobContainer.BlobNameSortOrder.class), - any(ActionListener.class) - ); - - remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - remoteClusterStateCleanupManager.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); - - latch.countDown(); - assertBusy(() -> assertEquals(1, callCount.get())); - } - - public void testRemoteClusterStateCleanupSetting() { - remoteClusterStateCleanupManager.start(); - // verify default value - assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileCleanupInterval()); - - // verify update interval - int cleanupInterval = randomIntBetween(1, 10); - Settings newSettings = Settings.builder().put("cluster.remote_store.state.cleanup_interval", cleanupInterval + "s").build(); - clusterSettings.applySettings(newSettings); - assertEquals(cleanupInterval, remoteClusterStateCleanupManager.getStaleFileCleanupInterval().seconds()); - } - - public void testRemoteCleanupTaskScheduled() { - AbstractAsyncTask cleanupTask = remoteClusterStateCleanupManager.getStaleFileDeletionTask(); - assertNull(cleanupTask); - // now the task should be initialized - remoteClusterStateCleanupManager.start(); - assertNotNull(remoteClusterStateCleanupManager.getStaleFileDeletionTask()); - assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().mustReschedule()); - assertEquals( - clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING), - remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval() - ); - assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled()); - assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isClosed()); - } - - public void testRemoteCleanupSkipsOnOnlyElectedClusterManager() { - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(false); - when(clusterState.nodes()).thenReturn(nodes); - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); - spyManager.cleanUpStaleFiles(); - assertEquals(0, callCount.get()); - - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); - when(clusterState.version()).thenReturn(randomLongBetween(11, 20)); - spyManager.cleanUpStaleFiles(); - assertEquals(1, callCount.get()); - } - - public void testRemoteCleanupSkipsIfVersionIncrementLessThanThreshold() { - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - long version = randomLongBetween(1, SKIP_CLEANUP_STATE_CHANGES); - when(clusterApplierService.state()).thenReturn(clusterState); - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); - when(clusterState.nodes()).thenReturn(nodes); - when(clusterState.version()).thenReturn(version); - - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); - - remoteClusterStateCleanupManager.cleanUpStaleFiles(); - assertEquals(0, callCount.get()); - } - - public void testRemoteCleanupCallsDeleteIfVersionIncrementGreaterThanThreshold() { - DiscoveryNodes nodes = mock(DiscoveryNodes.class); - long version = randomLongBetween(SKIP_CLEANUP_STATE_CHANGES + 1, SKIP_CLEANUP_STATE_CHANGES + 10); - when(clusterApplierService.state()).thenReturn(clusterState); - when(nodes.isLocalNodeElectedClusterManager()).thenReturn(true); - when(clusterState.nodes()).thenReturn(nodes); - when(clusterState.version()).thenReturn(version); - - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocation -> callCount.incrementAndGet()).when(spyManager).deleteStaleClusterMetadata(any(), any(), anyInt()); - - // using spied cleanup manager so that stubbed deleteStaleClusterMetadata is called - spyManager.cleanUpStaleFiles(); - assertEquals(1, callCount.get()); - } - - public void testRemoteCleanupSchedulesEvenAfterFailure() { - remoteClusterStateCleanupManager.start(); - RemoteClusterStateCleanupManager spyManager = spy(remoteClusterStateCleanupManager); - AtomicInteger callCount = new AtomicInteger(0); - doAnswer(invocationOnMock -> { - callCount.incrementAndGet(); - throw new RuntimeException("Test exception"); - }).when(spyManager).cleanUpStaleFiles(); - AsyncStaleFileDeletion task = new AsyncStaleFileDeletion(spyManager); - assertTrue(task.isScheduled()); - task.run(); - // Task is still scheduled after the failure - assertTrue(task.isScheduled()); - assertEquals(1, callCount.get()); - - task.run(); - // Task is still scheduled after the failure - assertTrue(task.isScheduled()); - assertEquals(2, callCount.get()); - } -} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 5f0c371a3137e..1b242b921c0d7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -19,7 +19,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; @@ -73,6 +72,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -91,6 +93,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -106,12 +109,13 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RemoteClusterStateService remoteClusterStateService; - private ClusterService clusterService; private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; @@ -144,8 +148,6 @@ public void setup() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), @@ -163,7 +165,7 @@ public void setup() { "test-node-id", repositoriesServiceSupplier, settings, - clusterService, + clusterSettings, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) @@ -185,14 +187,14 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); - when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); assertThrows( AssertionError.class, () -> new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, settings, - clusterService, + clusterSettings, () -> 0L, threadPool, List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)) @@ -1278,6 +1280,72 @@ public void testGetValidPreviousClusterUUIDWhenLastUUIDUncommitted() throws IOEx assertThat(previousClusterUUID, equalTo("cluster-uuid2")); } + public void testDeleteStaleClusterUUIDs() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + ClusterMetadataManifest clusterMetadataManifest = ClusterMetadataManifest.builder() + .indices(List.of()) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID("cluster-uuid1") + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .build(); + + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + BlobContainer uuidContainerContainer = mock(BlobContainer.class); + BlobContainer manifest2Container = mock(BlobContainer.class); + BlobContainer manifest3Container = mock(BlobContainer.class); + when(blobStore.blobContainer(any())).then(invocation -> { + BlobPath blobPath1 = invocation.getArgument(0); + if (blobPath1.buildAsString().endsWith("cluster-state/")) { + return uuidContainerContainer; + } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid2/")) { + return manifest2Container; + } else if (blobPath1.buildAsString().contains("cluster-state/cluster-uuid3/")) { + return manifest3Container; + } else { + throw new IllegalArgumentException("Unexpected blob path " + blobPath1); + } + }); + Map blobMetadataMap = Map.of( + "cluster-uuid1", + mock(BlobContainer.class), + "cluster-uuid2", + mock(BlobContainer.class), + "cluster-uuid3", + mock(BlobContainer.class) + ); + when(uuidContainerContainer.children()).thenReturn(blobMetadataMap); + when( + manifest2Container.listBlobsByPrefixInSortedOrder( + MANIFEST_FILE_PREFIX + DELIMITER, + Integer.MAX_VALUE, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of(new PlainBlobMetadata("mainfest2", 1L))); + when( + manifest3Container.listBlobsByPrefixInSortedOrder( + MANIFEST_FILE_PREFIX + DELIMITER, + Integer.MAX_VALUE, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ) + ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterUUIDs(clusterState, clusterMetadataManifest); + try { + assertBusy(() -> { + verify(manifest2Container, times(1)).delete(); + verify(manifest3Container, times(1)).delete(); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void testRemoteStateStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); @@ -1290,6 +1358,26 @@ public void testRemoteStateStats() throws IOException { assertEquals(0, remoteClusterStateService.getStats().getFailedCount()); } + public void testRemoteStateCleanupFailureStats() throws IOException { + BlobContainer blobContainer = mock(BlobContainer.class); + doThrow(IOException.class).when(blobContainer).delete(); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1")); + try { + assertBusy(() -> { + // wait for stats to get updated + assertTrue(remoteClusterStateService.getStats() != null); + assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); + assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void testFileNames() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() @@ -1330,6 +1418,36 @@ private void verifyManifestFileNameWithCodec(int codecVersion) { assertThat(splittedName[3], is("P")); } + public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { + BlobContainer blobContainer = mock(BlobContainer.class); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callCount = new AtomicInteger(0); + doAnswer(invocation -> { + callCount.incrementAndGet(); + if (latch.await(5000, TimeUnit.SECONDS) == false) { + throw new Exception("Timed out waiting for delete task queuing to complete"); + } + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + any(int.class), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); + + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS); + + latch.countDown(); + assertBusy(() -> assertEquals(1, callCount.get())); + } + public void testIndexMetadataUploadWaitTimeSetting() { // verify default value assertEquals( @@ -1773,7 +1891,7 @@ private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { ); } - static ClusterState.Builder generateClusterStateWithOneIndex() { + private static ClusterState.Builder generateClusterStateWithOneIndex() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) @@ -1803,7 +1921,7 @@ static ClusterState.Builder generateClusterStateWithOneIndex() { ); } - static DiscoveryNodes nodesWithLocalNodeClusterManager() { + private static DiscoveryNodes nodesWithLocalNodeClusterManager() { return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build(); }