Skip to content

Commit

Permalink
Add support to skip pinned timestamp in remote segment garbage collector
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Aug 25, 2024
1 parent ed65482 commit a763217
Show file tree
Hide file tree
Showing 7 changed files with 858 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
assertTrue(actualFileCount >= lastNMetadataFilesToKeep);
}
}, 30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -249,7 +246,7 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
assertTrue(actualFileCount >= 4);
}

public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
Expand Down
159 changes: 159 additions & 0 deletions server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -373,4 +380,156 @@ public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsReq
incomingSettings
) == RemoteStoreNodeService.CompatibilityMode.STRICT;
}

/**
* Determines and returns a set of metadata files that match provided pinned timestamps.
*
* This method is an overloaded version of getPinnedTimestampLockedFiles and do not use cached entries to find
* the metadata file
*
* @param metadataFiles A list of metadata file names. Expected to be sorted in descending order of timestamp.
* @param pinnedTimestampSet A set of timestamps representing pinned points in time.
* @param getTimestampFunction A function that extracts the timestamp from a metadata file name.
* @param prefixFunction A function that extracts a tuple of prefix information from a metadata file name.
* @return A set of metadata file names that are implicitly locked based on the pinned timestamps.
*/
public static Set<String> getPinnedTimestampLockedFiles(
List<String> metadataFiles,
Set<Long> pinnedTimestampSet,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction
) {
return getPinnedTimestampLockedFiles(metadataFiles, pinnedTimestampSet, new HashMap<>(), getTimestampFunction, prefixFunction);
}

/**
* Determines and returns a set of metadata files that match provided pinned timestamps.
*
* This method identifies metadata files that are considered implicitly locked due to their timestamps
* matching or being the closest preceding timestamp to the pinned timestamps. It uses a caching mechanism
* to improve performance for previously processed timestamps.
*
* The method performs the following steps:
* 1. Validates input parameters.
* 2. Updates the cache (metadataFilePinnedTimestampMap) to remove outdated entries.
* 3. Processes cached entries and identifies new timestamps to process.
* 4. For new timestamps, iterates through metadata files to find matching or closest preceding files.
* 5. Updates the cache with newly processed timestamps and their corresponding metadata files.
*
* @param metadataFiles A list of metadata file names. Expected to be sorted in descending order of timestamp.
* @param pinnedTimestampSet A set of timestamps representing pinned points in time.
* @param metadataFilePinnedTimestampMap A map used for caching processed timestamps and their corresponding metadata files.
* @param getTimestampFunction A function that extracts the timestamp from a metadata file name.
* @param prefixFunction A function that extracts a tuple of prefix information from a metadata file name.
* @return A set of metadata file names that are implicitly locked based on the pinned timestamps.
*
*/
public static Set<String> getPinnedTimestampLockedFiles(
List<String> metadataFiles,
Set<Long> pinnedTimestampSet,
Map<Long, String> metadataFilePinnedTimestampMap,
Function<String, Long> getTimestampFunction,
Function<String, Tuple<String, String>> prefixFunction
) {
Set<String> implicitLockedFiles = new HashSet<>();

if (metadataFiles == null || metadataFiles.isEmpty() || pinnedTimestampSet == null) {
return implicitLockedFiles;
}

// Remove entries for timestamps that are no longer pinned
metadataFilePinnedTimestampMap.keySet().retainAll(pinnedTimestampSet);

// Add cached entries and collect new timestamps
Set<Long> newPinnedTimestamps = new TreeSet<>(Collections.reverseOrder());
for (Long pinnedTimestamp : pinnedTimestampSet) {
String cachedFile = metadataFilePinnedTimestampMap.get(pinnedTimestamp);
if (cachedFile != null) {
implicitLockedFiles.add(cachedFile);
} else {
newPinnedTimestamps.add(pinnedTimestamp);
}
}

if (newPinnedTimestamps.isEmpty()) {
return implicitLockedFiles;
}

// Sort metadata files in descending order of timestamp
// ToDo: Do we really need this? Files fetched from remote store are already lexicographically sorted.
metadataFiles.sort(String::compareTo);

// If we have metadata files from multiple writers, it can result in picking file generated by stale primary.
// To avoid this, we fail fast.
RemoteStoreUtils.verifyNoMultipleWriters(metadataFiles, prefixFunction);

Iterator<Long> timestampIterator = newPinnedTimestamps.iterator();
Long currentPinnedTimestamp = timestampIterator.next();
long prevMdTimestamp = Long.MAX_VALUE;
for (String metadataFileName : metadataFiles) {
long currentMdTimestamp = getTimestampFunction.apply(metadataFileName);
// We always prefer md file with higher values of prefix like primary term, generation etc.
if (currentMdTimestamp > prevMdTimestamp) {
continue;
}
while (currentMdTimestamp <= currentPinnedTimestamp && prevMdTimestamp > currentPinnedTimestamp) {
implicitLockedFiles.add(metadataFileName);
// Do not cache entry for latest metadata file as the next metadata can also match the same pinned timestamp
if (prevMdTimestamp != Long.MAX_VALUE) {
metadataFilePinnedTimestampMap.put(currentPinnedTimestamp, metadataFileName);
}
if (timestampIterator.hasNext() == false) {
return implicitLockedFiles;
}
currentPinnedTimestamp = timestampIterator.next();
}
prevMdTimestamp = currentMdTimestamp;
}

return implicitLockedFiles;
}

/**
* Filters out metadata files from the given list based on their age.
*
* @param metadataFiles A list of metadata file names.
* @param getTimestampFunction A function that takes a metadata file name as input and returns its timestamp.
* @param maximumAllowedTimestamp The maximum allowed timestamp for a metadata file to be included in the filtered list.
* Metadata files with a timestamp greater than or equal to this value will be excluded.
* @return A list of metadata file names that have a timestamp less than the specified maximum allowed timestamp.
*/
public static List<String> filterOutMetadataFilesBasedOnAge(
List<String> metadataFiles,
Function<String, Long> getTimestampFunction,
long maximumAllowedTimestamp
) {
List<String> metadataFilesWithMinAge = new ArrayList<>();
for (String metadataFileName : metadataFiles) {
long metadataTimestamp = getTimestampFunction.apply(metadataFileName);
if (metadataTimestamp < maximumAllowedTimestamp) {
metadataFilesWithMinAge.add(metadataFileName);
}
}
return metadataFilesWithMinAge;
}

/**
* Determines if the pinned timestamp state is stale based on the provided remote store settings.
*
* This method checks if the last successful fetch timestamp is older than a calculated stale buffer time.
* The stale buffer time is computed using the pinned timestamps scheduler interval and lookback interval
* from the remote store settings.
*
* @return true if the pinned timestamp state is considered stale, false otherwise.
*
* @throws NullPointerException if remoteStoreSettings is null.
* @throws IllegalStateException if unable to retrieve the pinned timestamps.
*/
public static boolean isPinnedTimestampStateStale() {
long lastSuccessfulFetchTimestamp = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1();
long staleBufferInMillis = (RemoteStoreSettings.getPinnedTimestampsSchedulerInterval().millis() * 3) + RemoteStoreSettings
.getPinnedTimestampsLookbackInterval()
.millis();
return lastSuccessfulFetchTimestamp < (System.currentTimeMillis() - staleBufferInMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -91,6 +93,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final RemoteStoreLockManager mdLockManager;

private final Map<Long, String> metadataFilePinnedTimestampMap;

private final ThreadPool threadPool;

/**
Expand Down Expand Up @@ -132,6 +136,7 @@ public RemoteSegmentStoreDirectory(
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.logger = Loggers.getLogger(getClass(), shardId);
init();
}
Expand Down Expand Up @@ -176,6 +181,38 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
return remoteSegmentMetadata;
}

/**
* Initializes the remote segment metadata to a specific timestamp.
*
* @param timestamp The timestamp to initialize the remote segment metadata to.
* @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
* @throws IOException If an I/O error occurs while reading the metadata file.
*/
public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) throws IOException {
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
);
Set<String> lockedMetadataFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(timestamp),
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
if (lockedMetadataFiles.isEmpty()) {
return null;
}
assert lockedMetadataFiles.size() == 1 : "Expected exactly one metadata file but got " + lockedMetadataFiles;
String metadataFile = lockedMetadataFiles.iterator().next();
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
} else {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>();
}
return remoteSegmentMetadata;
}

/**
* Read the latest metadata file to get the list of segments uploaded to the remote segment store.
* We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
Expand Down Expand Up @@ -324,7 +361,8 @@ public static String getMetadataFilename(
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
String nodeId,
long creationTimestamp
) {
return String.join(
SEPARATOR,
Expand All @@ -334,11 +372,30 @@ public static String getMetadataFilename(
RemoteStoreUtils.invertLong(translogGeneration),
RemoteStoreUtils.invertLong(uploadCounter),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
RemoteStoreUtils.invertLong(creationTimestamp),
String.valueOf(metadataVersion)
);
}

public static String getMetadataFilename(
long primaryTerm,
long generation,
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
) {
return getMetadataFilename(
primaryTerm,
generation,
translogGeneration,
uploadCounter,
metadataVersion,
nodeId,
System.currentTimeMillis()
);
}

// Visible for testing
static long getPrimaryTerm(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[1]);
Expand All @@ -349,6 +406,11 @@ static long getGeneration(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[2]);
}

public static long getTimestamp(String filename) {
String[] filenameTokens = filename.split(SEPARATOR);
return RemoteStoreUtils.invertLong(filenameTokens[6]);
}

public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename) {
String[] tokens = filename.split(SEPARATOR);
if (tokens.length < 8) {
Expand Down Expand Up @@ -773,6 +835,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
);
return;
}

List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand All @@ -786,16 +849,46 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;
}

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
pinnedTimestampsState.v2(),
metadataFilePinnedTimestampMap,
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
Set<String> allLockFiles;
final Set<String> allLockFiles = new HashSet<>(implicitLockedFiles);

try {
allLockFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
allLockFiles.addAll(
((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX)
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
);

// Along with last N files, we need to keep files since last successful run of scheduler
long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState.v1();
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - RemoteStoreSettings.getPinnedTimestampsLookbackInterval()
.getMillis();
metadataFilesEligibleToDelete = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
metadataFilesEligibleToDelete,
MetadataFilenameUtils::getTimestamp,
maximumAllowedTimestamp
);

List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream()
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());
Expand Down
Loading

0 comments on commit a763217

Please sign in to comment.