Skip to content

Commit

Permalink
Abstract out remote segemnt garbage collection logic
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Apr 16, 2024
1 parent 3d1d5e7 commit cc805f9
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -27,6 +35,8 @@
public class RemoteStoreUtils {
public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length();

private static final Logger logger = LogManager.getLogger(RemoteStoreUtils.class);

/**
* URL safe base 64 character set. This must not be changed as this is used in deriving the base64 equivalent of binary.
*/
Expand Down Expand Up @@ -146,4 +156,26 @@ static String longToCompositeBase64AndBinaryEncoding(long value, int len) {
assert base64DecimalValue >= 0 && base64DecimalValue < 64;
return URL_BASE64_CHARSET[base64DecimalValue] + binaryPart;
}

/**
* Returns if the latest segments_N file is uploaded to the remote store. We fetch latest segment_N filename that is
* present in local directory and check if it is present in remote directory.
* @param storeDirectory instance of local directory
* @param remoteDirectory instance of RemoteSegmentStoreDirectory
* @return true if latest semgents_N from local is present in remote. If otherwise or on exception, returns false.
*/
public static boolean isLatestSegmentInfosUploadedToRemote(Directory storeDirectory, RemoteSegmentStoreDirectory remoteDirectory) {
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (lastCommittedLocalSegmentFileName != null) {
try (IndexInput indexInput = storeDirectory.openInput(lastCommittedLocalSegmentFileName, IOContext.DEFAULT)) {
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput));
return remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, checksum);
}
}
} catch (Exception e) {
logger.info("Exception occurred while checking isLatestSegmentInfosUploadedToRemote", e);
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.RemoteStoreSettings;

public class CommitTriggeredRemoteSegmentGarbageCollector implements RemoteSegmentGarbageCollector {
private final Logger logger;
private final Directory storeDirectory;
private final RemoteSegmentStoreDirectory remoteDirectory;

private final RemoteStoreSettings remoteStoreSettings;

public CommitTriggeredRemoteSegmentGarbageCollector(
ShardId shardId,
Directory storeDirectory,
RemoteSegmentStoreDirectory remoteDirectory,
RemoteStoreSettings remoteStoreSettings
) {
logger = Loggers.getLogger(getClass(), shardId);
this.storeDirectory = storeDirectory;
this.remoteDirectory = remoteDirectory;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
public void deleteStaleSegments(Listener listener) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid triggering delete post each refresh.
if (RemoteStoreUtils.isLatestSegmentInfosUploadedToRemote(storeDirectory, remoteDirectory) == false) {
remoteDirectory.deleteStaleSegmentsAsync(remoteStoreSettings.getMinRemoteSegmentMetadataFiles(), new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
listener.onSuccess(unused);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
String skipReason =
"Skipping garbage collection, CommitTriggeredRemoteSegmentGarbageCollector only triggers deletion on commit";
logger.debug(skipReason);
listener.onSkip(skipReason);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

public interface RemoteSegmentGarbageCollector {

interface Listener {

/**
* Called after the successful deletion of stale segments from remote store
*/
void onSuccess(Void response);

/**
* Called on exception while deleting stale segments from remote store
*/
void onFailure(Exception e);

/**
* Called on skipping the garbage collection
*/
void onSkip(String reason);
}

Listener DEFAULT_NOOP_LISTENER = new Listener() {
@Override
public void onSuccess(Void response) {}

@Override
public void onFailure(Exception e) {}

@Override
public void onSkip(String reason) {}
};

/**
* Deletes stale segments from remote store and notifies the listener.
*/
void deleteStaleSegments(Listener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
Expand Down Expand Up @@ -89,6 +90,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
private volatile long primaryTerm;
private volatile Iterator<TimeValue> backoffDelayIterator;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final RemoteSegmentGarbageCollector remoteSegmentGarbageCollector;

public RemoteStoreRefreshListener(
IndexShard indexShard,
Expand Down Expand Up @@ -116,6 +118,12 @@ public RemoteStoreRefreshListener(
this.segmentTracker = segmentTracker;
resetBackOffDelayIterator();
this.checkpointPublisher = checkpointPublisher;
this.remoteSegmentGarbageCollector = new CommitTriggeredRemoteSegmentGarbageCollector(
indexShard.shardId(),
storeDirectory,
remoteDirectory,
indexShard.getRemoteStoreSettings()
);
}

@Override
Expand Down Expand Up @@ -172,7 +180,7 @@ private boolean shouldSync(boolean didRefresh, boolean skipPrimaryTermCheck) {
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe()
|| isRefreshAfterCommit()
|| isRemoteSegmentStoreInSync() == false;
if (shouldSync || skipPrimaryTermCheck) {
return shouldSync;
Expand Down Expand Up @@ -220,12 +228,10 @@ private boolean syncSegments() {
try {
try {
initializeRemoteDirectoryOnTermUpdate();
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles());
}

// We notify the garbage collector each time, the implementation of garbage collector will decide on
// trigger and frequency of deleting stale segments.
remoteSegmentGarbageCollector.deleteStaleSegments(RemoteSegmentGarbageCollector.DEFAULT_NOOP_LISTENER);

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -362,23 +368,12 @@ protected String getRetryThreadPoolName() {
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

/**
* Returns if the current refresh has happened after a commit.
* @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false.
*/
private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
} catch (Exception e) {
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);
}
return false;
private boolean isRefreshAfterCommit() {
return RemoteStoreUtils.isLatestSegmentInfosUploadedToRemote(storeDirectory, remoteDirectory) == false;
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
Expand Down

0 comments on commit cc805f9

Please sign in to comment.