Skip to content

Commit

Permalink
Purge stale translog metadata files for remote-backed indexes (#6238) (
Browse files Browse the repository at this point in the history
…#6313)

(cherry picked from commit 49d9a90)

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Feb 14, 2023
1 parent 1966a59 commit 4ddae92
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,27 +351,36 @@ public void trimUnreferencedReaders() throws IOException {
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGenerationAsync(generationsToDelete);
deleteOlderPrimaryTranslogFiles();
deleteRemoteGeneration(generationsToDelete);
deleteStaleRemotePrimaryTermsAndMetadataFiles();
}
}

private void deleteRemoteGenerationAsync(Set<Long> generations) {
translogTransferManager.deleteTranslogAsync(primaryTermSupplier.getAsLong(), generations);
/**
* Deletes remote translog and metadata files asynchronously corresponding to the generations.
* @param generations generations to be deleted.
*/
private void deleteRemoteGeneration(Set<Long> generations) {
translogTransferManager.deleteGenerationAsync(primaryTermSupplier.getAsLong(), generations);
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private void deleteOlderPrimaryTranslogFiles() {
private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
// First we delete all stale primary terms folders from remote store
assert readers.isEmpty() == false : "Expected non-empty readers";
long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get();
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
// Second we delete all stale metadata files from remote store
translogTransferManager.deleteStaleTranslogMetadataFilesAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
}

@Override
public void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener) {
threadPool.executor(threadpoolName).execute(() -> {
try {
listener.onResponse(listAll(path));
} catch (IOException e) {
listener.onFailure(e);
}
});
}

@Override
public Set<String> listFolders(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).children().keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ void uploadBlobAsync(
*/
Set<String> listAll(Iterable<String> path) throws IOException;

/**
* Lists the files and invokes the listener on success or failure
* @param threadpoolName threadpool type which will be used to list all files asynchronously.
* @param path the path to list
* @param listener the callback to be invoked once list operation completes successfully/fails.
*/
void listAllAsync(String threadpoolName, Iterable<String> path, ActionListener<Set<String>> listener);

/**
* Lists the folders inside the path.
* @param path : the path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR;
import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName;

/**
* The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService}
Expand Down Expand Up @@ -162,19 +164,15 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadataTransferPath)
.stream()
.max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)
.map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
})
.orElse(null);
return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
}).orElse(null);
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Expand All @@ -191,53 +189,38 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata();
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));
return new TransferFileSnapshot(
translogTransferMetadata.getFileName(),
getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()),
translogTransferMetadata.createMetadataBytes(),
translogTransferMetadata.getPrimaryTerm()
);
}

/**
* This method handles deletion of multiple generations for a single primary term.
* TODO: Take care of metadata file cleanup. <a href="https://github.com/opensearch-project/OpenSearch/issues/5677">Github Issue #5677</a>
* This method handles deletion of multiple generations for a single primary term. The deletion happens for translog
* and metadata files.
*
* @param primaryTerm primary term where the generations will be deleted.
* @param generations set of generation to delete.
*/
public void deleteTranslogAsync(long primaryTerm, Set<Long> generations) {
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations) {
if (generations.isEmpty()) {
return;
}
List<String> files = new ArrayList<>();
List<String> translogFiles = new ArrayList<>();
List<String> metadataFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
files.addAll(List.of(ckpFileName, translogFilename));
String translogFileName = Translog.getFilename(generation);
translogFiles.addAll(List.of(ckpFileName, translogFileName));
// Add metadata file tio metadata file list which is located in basePath/metadata
String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation);
metadataFiles.add(metadataFileName);
});
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteBaseTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm {} generations {}", primaryTerm, generations);
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primary_term={} generations={}",
primaryTerm,
generations
),
e
);
}
}
);
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles);
// Delete the metadata files asynchronously
deleteMetadataFilesAsync(metadataFiles);
}

/**
Expand Down Expand Up @@ -296,4 +279,76 @@ public void onFailure(Exception e) {
}
);
}

public void deleteStaleTranslogMetadataFilesAsync() {
transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> metadataFiles) {
List<String> sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList());
if (sortedMetadataFiles.size() <= 1) {
logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size());
return;
}
List<String> metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1);
deleteMetadataFilesAsync(metadataFilesToDelete);
}

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred while listing translog metadata files from remote store", e);
}
});
}

/**
* Deletes list of translog files asynchronously using the {@code REMOTE_PURGE} threadpool.
*
* @param primaryTerm primary term of translog files.
* @param files list of translog files to be deleted.
*/
private void deleteTranslogFilesAsync(long primaryTerm, List<String> files) {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteBaseTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
}

@Override
public void onFailure(Exception e) {
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
}
);
}

/**
* Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool.
*
* @param files list of metadata files to be deleted.
*/
private void deleteMetadataFilesAsync(List<String> files) {
transferService.deleteBlobsAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, files, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.trace("Deleted remote translog metadata files {}", files);
}

@Override
public void onFailure(Exception e) {
logger.error(new ParameterizedMessage("Exception occurred while deleting remote translog metadata files {}", files), e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TranslogTransferMetadata {

private final SetOnce<Map<String, String>> generationToPrimaryTermMapper = new SetOnce<>();

private static final String METADATA_SEPARATOR = "__";
public static final String METADATA_SEPARATOR = "__";

private static final int BUFFER_SIZE = 4096;

Expand Down Expand Up @@ -92,7 +92,7 @@ public Map<String, String> getGenerationToPrimaryTermMapper() {
return generationToPrimaryTermMapper.get();
}

public String getFileName() {
public static String getFileName(long primaryTerm, long generation) {
return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation)));
}

Expand All @@ -101,7 +101,7 @@ public byte[] createMetadataBytes() throws IOException {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"translog transfer metadata " + primaryTerm,
getFileName(),
getFileName(primaryTerm, generation),
output,
BUFFER_SIZE
)
Expand Down
Loading

0 comments on commit 4ddae92

Please sign in to comment.