diff --git a/CHANGELOG.md b/CHANGELOG.md index e8dd188709cd2..515cf0ce93157 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583)) - Support for keyword fields in star-tree index ([#16233](https://github.com/opensearch-project/OpenSearch/pull/16233)) - Add a flag in QueryShardContext to differentiate inner hit query ([#16600](https://github.com/opensearch-project/OpenSearch/pull/16600)) +- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489)) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index cac4b3914df5a..c836984655ad1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -786,6 +786,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Snapshot related Settings BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING, BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, + BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD, SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 243d0021fac2e..c1305fa563b16 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -142,6 +142,7 @@ import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.IndexMetaDataGenerations; @@ -167,6 +168,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.ref.SoftReference; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; @@ -196,6 +198,7 @@ import java.util.stream.LongStream; import java.util.stream.Stream; +import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio; import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; @@ -253,6 +256,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp */ public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__"; + public static final String SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME = "snapshot.repository_data.cache.threshold"; + + public static final double SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE = 0.01; + + public static final long CACHE_MIN_THRESHOLD = ByteSizeUnit.KB.toBytes(500); + + public static final long CACHE_MAX_THRESHOLD = calculateMaxSnapshotRepositoryDataCacheThreshold(); + + public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold(); + + /** + * Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions. + * This ensures compatibility across various JDK versions. For a practical usage example, + * see this link: https://github.com/openjdk/jdk11u/blob/cee8535a9d3de8558b4b5028d68e397e508bef71/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ByteArrayChannel.java#L226 + */ + private static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8; + /** * When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository * contents will not result in the repository being marked as corrupted. @@ -275,6 +295,58 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Deprecated ); + /** + * Sets the cache size for snapshot repository data: the valid range is within 500Kb ... 1% of the node heap memory. + */ + public static final Setting SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD = new Setting<>( + SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME, + CACHE_DEFAULT_THRESHOLD + "b", + (s) -> { + ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME); + long userDefinedLimitBytes = userDefinedLimit.getBytes(); + + if (userDefinedLimitBytes > CACHE_MAX_THRESHOLD) { + throw new IllegalArgumentException( + "[" + + SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME + + "] cannot be larger than [" + + CACHE_MAX_THRESHOLD + + "] bytes." + ); + } + + if (userDefinedLimitBytes < CACHE_MIN_THRESHOLD) { + throw new IllegalArgumentException( + "[" + + SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME + + "] cannot be smaller than [" + + CACHE_MIN_THRESHOLD + + "] bytes." + ); + } + + return userDefinedLimit; + }, + Setting.Property.NodeScope + ); + + public static long calculateDefaultSnapshotRepositoryDataCacheThreshold() { + return Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2); + } + + public static long calculateMaxSnapshotRepositoryDataCacheThreshold() { + long jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes(); + long defaultThresholdOfHeap = (long) (jvmHeapSize * SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE); + long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500); + long maxThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold); + + return maxThreshold; + } + + protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, long defaultAbsoluteThreshold) { + return Math.min(Math.max(defaultThresholdOfHeap, defaultAbsoluteThreshold), MAX_SAFE_ARRAY_SIZE); + } + /** * Size hint for the IO buffer size to use when reading from and writing to the repository. */ @@ -461,6 +533,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private volatile boolean enableAsyncDeletion; + protected final long repositoryDataCacheThreshold; + /** * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for * {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart @@ -515,6 +589,7 @@ protected BlobStoreRepository( this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings()); this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion); + this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes(); } @Override @@ -1132,7 +1207,8 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map> softRef = latestKnownRepositoryData.get(); + cached = (softRef != null) ? softRef.get() : null; } if (genToLoad > generation) { // It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just @@ -2926,7 +3002,9 @@ public void endVerification(String seed) { private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN); // Best effort cache of the latest known repository data and its generation, cached serialized as compressed json - private final AtomicReference> latestKnownRepositoryData = new AtomicReference<>(); + private final AtomicReference>> latestKnownRepositoryData = new AtomicReference<>( + new SoftReference<>(null) + ); @Override public void getRepositoryData(ActionListener listener) { @@ -2934,7 +3012,9 @@ public void getRepositoryData(ActionListener listener) { listener.onFailure(corruptedStateException(null)); return; } - final Tuple cached = latestKnownRepositoryData.get(); + final SoftReference> softRef = latestKnownRepositoryData.get(); + final Tuple cached = (softRef != null) ? softRef.get() : null; + // Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with // the latest known repository generation if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) { @@ -2983,7 +3063,8 @@ private void doGetRepositoryData(ActionListener listener) { genToLoad = latestKnownRepoGen.get(); } try { - final Tuple cached = latestKnownRepositoryData.get(); + final SoftReference> softRef = latestKnownRepositoryData.get(); + final Tuple cached = (softRef != null) ? softRef.get() : null; final RepositoryData loaded; // Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) { @@ -3050,19 +3131,22 @@ private void cacheRepositoryData(BytesReference updated, long generation) { try { serialized = CompressorRegistry.defaultCompressor().compress(updated); final int len = serialized.length(); - if (len > ByteSizeUnit.KB.toBytes(500)) { + long cacheWarningThreshold = Math.min(repositoryDataCacheThreshold * 10, MAX_SAFE_ARRAY_SIZE); + if (len > repositoryDataCacheThreshold) { logger.debug( - "Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" + "Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in" + " serialized size", len, - metadata.name() + metadata.name(), + repositoryDataCacheThreshold ); - if (len > ByteSizeUnit.MB.toBytes(5)) { + if (len > cacheWarningThreshold) { logger.warn( - "Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" + "Your repository metadata blob for repository [{}] is larger than [{}] bytes. Consider moving to a fresh" + " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" + " repository behavior going forward.", - metadata.name() + metadata.name(), + cacheWarningThreshold ); } // Set empty repository data to not waste heap for an outdated cached value @@ -3074,11 +3158,12 @@ private void cacheRepositoryData(BytesReference updated, long generation) { logger.warn("Failed to serialize repository data", e); return; } - latestKnownRepositoryData.updateAndGet(known -> { + latestKnownRepositoryData.updateAndGet(knownRef -> { + Tuple known = (knownRef != null) ? knownRef.get() : null; if (known != null && known.v1() > generation) { - return known; + return knownRef; } - return new Tuple<>(generation, serialized); + return new SoftReference<>(new Tuple<>(generation, serialized)); }); } } diff --git a/server/src/test/java/org/opensearch/common/settings/MemorySizeSettingsTests.java b/server/src/test/java/org/opensearch/common/settings/MemorySizeSettingsTests.java index 95db7c2cfacaa..78782112be844 100644 --- a/server/src/test/java/org/opensearch/common/settings/MemorySizeSettingsTests.java +++ b/server/src/test/java/org/opensearch/common/settings/MemorySizeSettingsTests.java @@ -34,6 +34,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.indices.IndexingMemoryController; import org.opensearch.indices.IndicesQueryCache; @@ -41,6 +42,7 @@ import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.monitor.jvm.JvmInfo; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; import static org.hamcrest.Matchers.equalTo; @@ -127,22 +129,75 @@ public void testIndicesFieldDataCacheSetting() { ); } + public void testSnapshotRepositoryDataCacheSizeSetting() { + assertMemorySizeSettingInRange( + BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD, + "snapshot.repository_data.cache.threshold", + new ByteSizeValue(BlobStoreRepository.calculateDefaultSnapshotRepositoryDataCacheThreshold()), + ByteSizeUnit.KB.toBytes(500), + 1.0 + ); + } + private void assertMemorySizeSetting(Setting setting, String settingKey, ByteSizeValue defaultValue) { assertMemorySizeSetting(setting, settingKey, defaultValue, Settings.EMPTY); } private void assertMemorySizeSetting(Setting setting, String settingKey, ByteSizeValue defaultValue, Settings settings) { + assertMemorySizeSetting(setting, settingKey, defaultValue, 25.0, 1024, settings); + } + + private void assertMemorySizeSetting( + Setting setting, + String settingKey, + ByteSizeValue defaultValue, + double availablePercentage, + long availableBytes, + Settings settings + ) { assertThat(setting, notNullValue()); assertThat(setting.getKey(), equalTo(settingKey)); assertThat(setting.getProperties(), hasItem(Property.NodeScope)); assertThat(setting.getDefault(settings), equalTo(defaultValue)); - Settings settingWithPercentage = Settings.builder().put(settingKey, "25%").build(); + Settings settingWithPercentage = Settings.builder().put(settingKey, percentageAsString(availablePercentage)).build(); assertThat( setting.get(settingWithPercentage), - equalTo(new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25))) + equalTo( + new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * percentageAsFraction(availablePercentage))) + ) ); - Settings settingWithBytesValue = Settings.builder().put(settingKey, "1024b").build(); - assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(1024))); + Settings settingWithBytesValue = Settings.builder().put(settingKey, availableBytes + "b").build(); + assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(availableBytes))); } + private void assertMemorySizeSettingInRange( + Setting setting, + String settingKey, + ByteSizeValue defaultValue, + long minBytes, + double maxPercentage + ) { + assertMemorySizeSetting(setting, settingKey, defaultValue, maxPercentage, minBytes, Settings.EMPTY); + + assertThrows(IllegalArgumentException.class, () -> { + Settings settingWithTooSmallValue = Settings.builder().put(settingKey, minBytes - 1).build(); + setting.get(settingWithTooSmallValue); + }); + + assertThrows(IllegalArgumentException.class, () -> { + double unavailablePercentage = maxPercentage + 0.1; + Settings settingWithPercentageExceedingLimit = Settings.builder() + .put(settingKey, percentageAsString(unavailablePercentage)) + .build(); + setting.get(settingWithPercentageExceedingLimit); + }); + } + + private double percentageAsFraction(double availablePercentage) { + return availablePercentage / 100.0; + } + + private String percentageAsString(double availablePercentage) { + return availablePercentage + "%"; + } } diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index aa10b7dc18381..620b18ad9d7cf 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -92,6 +92,7 @@ import java.util.stream.Collectors; import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.calculateMaxWithinIntLimit; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; @@ -653,4 +654,53 @@ public void testGetRestrictedSystemRepositorySettings() { assertTrue(settings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY)); repository.close(); } + + public void testSnapshotRepositoryDataCacheDefaultSetting() { + // given + BlobStoreRepository repository = setupRepo(); + long maxThreshold = BlobStoreRepository.calculateMaxSnapshotRepositoryDataCacheThreshold(); + + // when + long expectedThreshold = Math.max(ByteSizeUnit.KB.toBytes(500), maxThreshold / 2); + + // then + assertEquals(repository.repositoryDataCacheThreshold, expectedThreshold); + } + + public void testHeapThresholdUsed() { + // given + long defaultThresholdOfHeap = ByteSizeUnit.GB.toBytes(1); + long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500); + + // when + long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold); + + // then + assertEquals(defaultThresholdOfHeap, expectedThreshold); + } + + public void testAbsoluteThresholdUsed() { + // given + long defaultThresholdOfHeap = ByteSizeUnit.KB.toBytes(499); + long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500); + + // when + long result = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold); + + // then + assertEquals(defaultAbsoluteThreshold, result); + } + + public void testThresholdCappedAtIntMax() { + // given + int maxSafeArraySize = Integer.MAX_VALUE - 8; + long defaultThresholdOfHeap = (long) maxSafeArraySize + 1; + long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500); + + // when + long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold); + + // then + assertEquals(maxSafeArraySize, expectedThreshold); + } }