Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] [2.x] feat: add vertical scaling and SoftReference for snapshot repository data cache (#16489) #16624

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: add vertical scaling and SoftReference for snapshot repository …
…data cache (#16489)

- Applies `SoftReference` to cached repository data for efficient memory management under heap pressure.
- Enables cache size configuration in `opensearch.yml`, adjustable within a range of 500KB to 1% of heap memory.
- Sets the default cache size to `Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2)` so it’s generally proportional to heap size. In cases where 1% of the heap is less than 1000KB, indicating a low-memory environment, the default reverts to 500KB as before.
- Since `BytesReference` internally uses `byte[]`, the compressed array size is capped at `Integer.MAX_VALUE - 8` to ensure compatibility with JDK limitations on array sizes. Therefore, the maximum cache size cannot exceed this limit.

Signed-off-by: inpink <[email protected]>
(cherry picked from commit 53d41d3)
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
inpink authored and reta committed Nov 12, 2024
commit 91fc7ab70918100bef4a5797d1df36e1975f5c73
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Increase segrep pressure checkpoint default limit to 30 ([#16577](https://github.com/opensearch-project/OpenSearch/pull/16577/files))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))
- Add vertical scaling and SoftReference for snapshot repository data cache ([#16489](https://github.com/opensearch-project/OpenSearch/pull/16489))

### Dependencies
- Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548))
Original file line number Diff line number Diff line change
@@ -787,6 +787,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Snapshot related Settings
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,

// Composite index settings
CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING,
Original file line number Diff line number Diff line change
@@ -142,6 +142,7 @@
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.IndexMetaDataGenerations;
@@ -168,6 +169,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.SoftReference;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -197,6 +199,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1;
import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS;
@@ -254,6 +257,23 @@
*/
public static final String VIRTUAL_DATA_BLOB_PREFIX = "v__";

public static final String SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME = "snapshot.repository_data.cache.threshold";

public static final double SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE = 0.01;

public static final long CACHE_MIN_THRESHOLD = ByteSizeUnit.KB.toBytes(500);

public static final long CACHE_MAX_THRESHOLD = calculateMaxSnapshotRepositoryDataCacheThreshold();

public static final long CACHE_DEFAULT_THRESHOLD = calculateDefaultSnapshotRepositoryDataCacheThreshold();

/**
* Set to Integer.MAX_VALUE - 8 to prevent OutOfMemoryError due to array header requirements, following the limit used in certain JDK versions.
* This ensures compatibility across various JDK versions. For a practical usage example,
* see this link: https://github.com/openjdk/jdk11u/blob/cee8535a9d3de8558b4b5028d68e397e508bef71/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ByteArrayChannel.java#L226
*/
private static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
@@ -276,6 +296,58 @@
Setting.Property.Deprecated
);

/**
* Sets the cache size for snapshot repository data: the valid range is within 500Kb ... 1% of the node heap memory.
*/
public static final Setting<ByteSizeValue> SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD = new Setting<>(
SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME,
CACHE_DEFAULT_THRESHOLD + "b",
(s) -> {
ByteSizeValue userDefinedLimit = parseBytesSizeValueOrHeapRatio(s, SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME);
long userDefinedLimitBytes = userDefinedLimit.getBytes();

if (userDefinedLimitBytes > CACHE_MAX_THRESHOLD) {
throw new IllegalArgumentException(
"["
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
+ "] cannot be larger than ["
+ CACHE_MAX_THRESHOLD
+ "] bytes."
);
}

if (userDefinedLimitBytes < CACHE_MIN_THRESHOLD) {
throw new IllegalArgumentException(

Check warning on line 320 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L320

Added line #L320 was not covered by tests
"["
+ SNAPSHOT_REPOSITORY_DATA_CACHET_THRESHOLD_SETTING_NAME
+ "] cannot be smaller than ["
+ CACHE_MIN_THRESHOLD
+ "] bytes."
);
}

return userDefinedLimit;
},
Setting.Property.NodeScope
);

public static long calculateDefaultSnapshotRepositoryDataCacheThreshold() {
return Math.max(ByteSizeUnit.KB.toBytes(500), CACHE_MAX_THRESHOLD / 2);
}

public static long calculateMaxSnapshotRepositoryDataCacheThreshold() {
long jvmHeapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
long defaultThresholdOfHeap = (long) (jvmHeapSize * SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD_DEFAULT_PERCENTAGE);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);
long maxThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

return maxThreshold;
}

protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, long defaultAbsoluteThreshold) {
return Math.min(Math.max(defaultThresholdOfHeap, defaultAbsoluteThreshold), MAX_SAFE_ARRAY_SIZE);
}

/**
* Size hint for the IO buffer size to use when reading from and writing to the repository.
*/
@@ -462,6 +534,8 @@

private volatile boolean enableAsyncDeletion;

protected final long repositoryDataCacheThreshold;

/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
@@ -519,6 +593,7 @@
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes();
}

@Override
@@ -1157,7 +1232,8 @@
cached = null;
} else {
genToLoad = latestKnownRepoGen.get();
cached = latestKnownRepositoryData.get();
SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
cached = (softRef != null) ? softRef.get() : null;
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
@@ -3025,15 +3101,19 @@
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);

// Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
private final AtomicReference<Tuple<Long, BytesReference>> latestKnownRepositoryData = new AtomicReference<>();
private final AtomicReference<SoftReference<Tuple<Long, BytesReference>>> latestKnownRepositoryData = new AtomicReference<>(
new SoftReference<>(null)
);

@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;

// Fast path loading repository data directly from cache if we're in fully consistent mode and the cache matches up with
// the latest known repository generation
if (bestEffortConsistency == false && cached != null && cached.v1() == latestKnownRepoGen.get()) {
@@ -3082,7 +3162,8 @@
genToLoad = latestKnownRepoGen.get();
}
try {
final Tuple<Long, BytesReference> cached = latestKnownRepositoryData.get();
final SoftReference<Tuple<Long, BytesReference>> softRef = latestKnownRepositoryData.get();
final Tuple<Long, BytesReference> cached = (softRef != null) ? softRef.get() : null;
final RepositoryData loaded;
// Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
if (bestEffortConsistency == false && cached != null && cached.v1() == genToLoad) {
@@ -3149,19 +3230,22 @@
try {
serialized = CompressorRegistry.defaultCompressor().compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
long cacheWarningThreshold = Math.min(repositoryDataCacheThreshold * 10, MAX_SAFE_ARRAY_SIZE);
if (len > repositoryDataCacheThreshold) {
logger.debug(
"Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in"
"Not caching repository data of size [{}] for repository [{}] because it is larger than [{}] bytes in"
+ " serialized size",
len,
metadata.name()
metadata.name(),
repositoryDataCacheThreshold

Check warning on line 3240 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3239-L3240

Added lines #L3239 - L3240 were not covered by tests
);
if (len > ByteSizeUnit.MB.toBytes(5)) {
if (len > cacheWarningThreshold) {
logger.warn(
"Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh"
"Your repository metadata blob for repository [{}] is larger than [{}] bytes. Consider moving to a fresh"
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable"
+ " repository behavior going forward.",
metadata.name()
metadata.name(),
cacheWarningThreshold

Check warning on line 3248 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3247-L3248

Added lines #L3247 - L3248 were not covered by tests
);
}
// Set empty repository data to not waste heap for an outdated cached value
@@ -3173,11 +3257,12 @@
logger.warn("Failed to serialize repository data", e);
return;
}
latestKnownRepositoryData.updateAndGet(known -> {
latestKnownRepositoryData.updateAndGet(knownRef -> {
Tuple<Long, BytesReference> known = (knownRef != null) ? knownRef.get() : null;
if (known != null && known.v1() > generation) {
return known;
return knownRef;

Check warning on line 3263 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3263

Added line #L3263 was not covered by tests
}
return new Tuple<>(generation, serialized);
return new SoftReference<>(new Tuple<>(generation, serialized));
});
}
}
Original file line number Diff line number Diff line change
@@ -34,13 +34,15 @@

import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.breaker.HierarchyCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;

import static org.hamcrest.Matchers.equalTo;
@@ -127,22 +129,75 @@ public void testIndicesFieldDataCacheSetting() {
);
}

public void testSnapshotRepositoryDataCacheSizeSetting() {
assertMemorySizeSettingInRange(
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,
"snapshot.repository_data.cache.threshold",
new ByteSizeValue(BlobStoreRepository.calculateDefaultSnapshotRepositoryDataCacheThreshold()),
ByteSizeUnit.KB.toBytes(500),
1.0
);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue) {
assertMemorySizeSetting(setting, settingKey, defaultValue, Settings.EMPTY);
}

private void assertMemorySizeSetting(Setting<ByteSizeValue> setting, String settingKey, ByteSizeValue defaultValue, Settings settings) {
assertMemorySizeSetting(setting, settingKey, defaultValue, 25.0, 1024, settings);
}

private void assertMemorySizeSetting(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
double availablePercentage,
long availableBytes,
Settings settings
) {
assertThat(setting, notNullValue());
assertThat(setting.getKey(), equalTo(settingKey));
assertThat(setting.getProperties(), hasItem(Property.NodeScope));
assertThat(setting.getDefault(settings), equalTo(defaultValue));
Settings settingWithPercentage = Settings.builder().put(settingKey, "25%").build();
Settings settingWithPercentage = Settings.builder().put(settingKey, percentageAsString(availablePercentage)).build();
assertThat(
setting.get(settingWithPercentage),
equalTo(new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.25)))
equalTo(
new ByteSizeValue((long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * percentageAsFraction(availablePercentage)))
)
);
Settings settingWithBytesValue = Settings.builder().put(settingKey, "1024b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(1024)));
Settings settingWithBytesValue = Settings.builder().put(settingKey, availableBytes + "b").build();
assertThat(setting.get(settingWithBytesValue), equalTo(new ByteSizeValue(availableBytes)));
}

private void assertMemorySizeSettingInRange(
Setting<ByteSizeValue> setting,
String settingKey,
ByteSizeValue defaultValue,
long minBytes,
double maxPercentage
) {
assertMemorySizeSetting(setting, settingKey, defaultValue, maxPercentage, minBytes, Settings.EMPTY);

assertThrows(IllegalArgumentException.class, () -> {
Settings settingWithTooSmallValue = Settings.builder().put(settingKey, minBytes - 1).build();
setting.get(settingWithTooSmallValue);
});

assertThrows(IllegalArgumentException.class, () -> {
double unavailablePercentage = maxPercentage + 0.1;
Settings settingWithPercentageExceedingLimit = Settings.builder()
.put(settingKey, percentageAsString(unavailablePercentage))
.build();
setting.get(settingWithPercentageExceedingLimit);
});
}

private double percentageAsFraction(double availablePercentage) {
return availablePercentage / 100.0;
}

private String percentageAsString(double availablePercentage) {
return availablePercentage + "%";
}
}
Original file line number Diff line number Diff line change
@@ -92,6 +92,7 @@
import java.util.stream.Collectors;

import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.calculateMaxWithinIntLimit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
@@ -653,4 +654,53 @@ public void testGetRestrictedSystemRepositorySettings() {
assertTrue(settings.contains(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY));
repository.close();
}

public void testSnapshotRepositoryDataCacheDefaultSetting() {
// given
BlobStoreRepository repository = setupRepo();
long maxThreshold = BlobStoreRepository.calculateMaxSnapshotRepositoryDataCacheThreshold();

// when
long expectedThreshold = Math.max(ByteSizeUnit.KB.toBytes(500), maxThreshold / 2);

// then
assertEquals(repository.repositoryDataCacheThreshold, expectedThreshold);
}

public void testHeapThresholdUsed() {
// given
long defaultThresholdOfHeap = ByteSizeUnit.GB.toBytes(1);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(defaultThresholdOfHeap, expectedThreshold);
}

public void testAbsoluteThresholdUsed() {
// given
long defaultThresholdOfHeap = ByteSizeUnit.KB.toBytes(499);
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long result = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(defaultAbsoluteThreshold, result);
}

public void testThresholdCappedAtIntMax() {
// given
int maxSafeArraySize = Integer.MAX_VALUE - 8;
long defaultThresholdOfHeap = (long) maxSafeArraySize + 1;
long defaultAbsoluteThreshold = ByteSizeUnit.KB.toBytes(500);

// when
long expectedThreshold = calculateMaxWithinIntLimit(defaultThresholdOfHeap, defaultAbsoluteThreshold);

// then
assertEquals(maxSafeArraySize, expectedThreshold);
}
}
Loading