Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 4, 2024
1 parent 155a898 commit 990ff21
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public interface BlobContainer {
*/
@ExperimentalApi
default InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException {
return null;
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.transfer.BaseTranslogTransferManager;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TransferSnapshot;
import org.opensearch.index.translog.transfer.TranslogSyncSnapshot;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferManagerFactory;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
Expand Down Expand Up @@ -70,7 +70,7 @@
public class RemoteFsTranslog extends Translog {

private final Logger logger;
private final BaseTranslogTransferManager translogTransferManager;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier startedPrimarySupplier;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
Expand Down Expand Up @@ -169,8 +169,7 @@ RemoteTranslogTransferTracker getRemoteTranslogTracker() {
}

private boolean isCkpAsTranslogMetadata(Supplier<Version> minNodeVersionSupplier, BlobStoreRepository blobStoreRepository) {
boolean isBlobMetadataSupported = blobStoreRepository.blobStore().isBlobMetadataSupported();
return isBlobMetadataSupported && Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0;
return blobStoreRepository.blobStore().isBlobMetadataSupported() && Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0;
}

public static void download(
Expand All @@ -193,7 +192,7 @@ public static void download(
// TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567
RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000);
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker, false);
BaseTranslogTransferManager translogTransferManager = buildTranslogTransferManager(
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
Expand All @@ -207,7 +206,7 @@ public static void download(
logger.trace(remoteTranslogTransferTracker.toString());
}

static void download(BaseTranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
Expand Down Expand Up @@ -235,7 +234,7 @@ static void download(BaseTranslogTransferManager translogTransferManager, Path l
throw ex;
}

private static void downloadOnce(BaseTranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
throws IOException {
logger.debug("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
Expand Down Expand Up @@ -300,15 +299,15 @@ private static boolean isEmptyTranslog(Checkpoint checkpoint) {
&& checkpoint.numOps == 0;
}

public static BaseTranslogTransferManager buildTranslogTransferManager(
public static TranslogTransferManager buildTranslogTransferManager(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
ShardId shardId,
FileTransferTracker fileTransferTracker,
RemoteTranslogTransferTracker tracker,
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
boolean shouldUploadTranslogCkpAsMetadata
boolean ckpAsTranslogMetadata
) {
assert Objects.nonNull(pathStrategy);
String indexUUID = shardId.getIndex().getUUID();
Expand Down Expand Up @@ -338,7 +337,7 @@ public static BaseTranslogTransferManager buildTranslogTransferManager(
fileTransferTracker,
tracker,
remoteStoreSettings,
shouldUploadTranslogCkpAsMetadata
ckpAsTranslogMetadata
);
}

Expand Down Expand Up @@ -423,7 +422,7 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException {
logger.trace("uploading translog for primary term {} generation {}", primaryTerm, generation);
try {
TranslogSyncSnapshot transferSnapshotProvider = new TranslogSyncSnapshot.Builder(
TranslogSyncSnapshot translogSyncSnapshot = new TranslogSyncSnapshot.Builder(
primaryTerm,
generation,
location,
Expand All @@ -432,7 +431,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
config.getNodeId()
).build();
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
translogSyncSnapshot,
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
);
} finally {
Expand Down Expand Up @@ -623,7 +622,7 @@ public static void cleanup(
// TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567
RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000);
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker, false);

Check warning on line 624 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L624

Added line #L624 was not covered by tests
BaseTranslogTransferManager translogTransferManager = buildTranslogTransferManager(
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ public class FileSnapshot implements Closeable {
@Nullable
private Map<String, String> metadata;

private FileSnapshot(Path path) throws IOException {
Objects.requireNonNull(path);
this.name = path.getFileName().toString();
this.path = path;
this.fileChannel = FileChannel.open(path, StandardOpenOption.READ);
}

private FileSnapshot(Path path, Map<String, String> metadata) throws IOException {
Objects.requireNonNull(path);
this.name = path.getFileName().toString();
Expand Down Expand Up @@ -125,9 +118,7 @@ public static class TransferFileSnapshot extends FileSnapshot {
private Long checksum;

public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException {
super(path);
this.primaryTerm = primaryTerm;
this.checksum = checksum;
this(path, primaryTerm, checksum, null);
}

private TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Map<String, String> metadata) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,22 @@
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
* FileTransferTracker keeps track of generational translog files uploaded to the remote translog store
*/
public class FileTransferTracker implements FileTransferListener {

private final ConcurrentHashMap<String, TransferState> fileTransferTracker;
private final ConcurrentHashMap<Long, TransferState> generationTransferTracker;
private final Map<String, TransferState> fileTransferTracker;
private final Map<Long, TransferState> generationTransferTracker;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private Map<String, Long> bytesForTlogCkpFileToUpload;
private long fileTransferStartTime = -1;
Expand Down Expand Up @@ -64,26 +63,17 @@ void recordBytesForFiles(Set<TranslogCheckpointSnapshot> toUpload) {
});
}

private void recordFileContentLength(String fileName, ThrowingSupplier<Long, IOException> contentLengthSupplier) {
try {
if (!uploaded(fileName)) {
bytesForTlogCkpFileToUpload.put(fileName, contentLengthSupplier.get());
}
} catch (IOException ignored) {
bytesForTlogCkpFileToUpload.put(fileName, 0L);
private void recordFileContentLength(String fileName, LongSupplier contentLengthSupplier) {
if (!uploaded(fileName)) {
bytesForTlogCkpFileToUpload.put(fileName, contentLengthSupplier.getAsLong());
}
}

@FunctionalInterface
private interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}

long getTotalBytesToUpload() {
return bytesForTlogCkpFileToUpload.values().stream().reduce(0L, Long::sum);
}

private <K> void updateTransferTracker(ConcurrentHashMap<K, TransferState> tracker, K key, TransferState targetState) {
private <K> void updateTransferState(Map<K, TransferState> tracker, K key, TransferState targetState) {
tracker.compute(key, (k, v) -> {
if (v == null || v.validateNextState(targetState)) {
return targetState;
Expand All @@ -98,7 +88,7 @@ void addGeneration(long generation, boolean success) {
}

private void addGeneration(long generation, TransferState targetState) {
updateTransferTracker(generationTransferTracker, generation, targetState);
updateTransferState(generationTransferTracker, generation, targetState);
}

void add(String file, boolean success) {
Expand All @@ -107,7 +97,7 @@ void add(String file, boolean success) {
}

private void add(String file, TransferState targetState) {
updateTransferTracker(fileTransferTracker, file, targetState);
updateTransferState(fileTransferTracker, file, targetState);
}

@Override
Expand All @@ -132,6 +122,16 @@ public void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e) {
remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis);
updateTransferStats(fileSnapshot, false);
addGeneration(fileSnapshot.getGeneration(), TransferState.FAILED);

if (!ckpAsTranslogMetadata) {
assert e instanceof TranslogTransferException;
TranslogTransferException exception = (TranslogTransferException) e;
Set<FileSnapshot.TransferFileSnapshot> failedFiles = exception.getFailedFiles();
Set<FileSnapshot.TransferFileSnapshot> successFiles = exception.getSuccessFiles();
assert failedFiles.isEmpty() == false;
failedFiles.forEach(failedFile -> add(failedFile.getName(), false));
successFiles.forEach(successFile -> add(successFile.getName(), true));
}
}

private void updateTransferStats(TranslogCheckpointSnapshot fileSnapshot, boolean isSuccess) {
Expand Down Expand Up @@ -181,23 +181,19 @@ Set<TranslogCheckpointSnapshot> exclusionFilter(Set<TranslogCheckpointSnapshot>
}

Set<Long> allUploadedGeneration() {
Set<Long> successGenFileTransferTracker = new HashSet<>();
generationTransferTracker.forEach((k, v) -> {
if (v == TransferState.SUCCESS) {
successGenFileTransferTracker.add(k);
}
});
return successGenFileTransferTracker;
return getSuccessfulKeys(generationTransferTracker);
}

public Set<String> allUploaded() {
Set<String> successFileTransferTracker = new HashSet<>();
fileTransferTracker.forEach((k, v) -> {
if (v == TransferState.SUCCESS) {
successFileTransferTracker.add(k);
}
});
return successFileTransferTracker;
return getSuccessfulKeys(fileTransferTracker);
}

private <K> Set<K> getSuccessfulKeys(Map<K, TransferState> tracker) {
return tracker.entrySet()
.stream()
.filter(entry -> entry.getValue() == TransferState.SUCCESS)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.index.translog.transfer.BaseTranslogTransferManager.CHECKPOINT_FILE_DATA_KEY;
import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY;

/**
* Snapshot of a single translog generational files that gets transferred
Expand Down Expand Up @@ -70,15 +70,19 @@ String getCheckpointFileName() {
return checkpointPath.getFileName().toString();
}

long getTranslogFileContentLength() throws IOException {
long getTranslogFileContentLength() {
try (FileChannel fileChannel = FileChannel.open(translogPath, StandardOpenOption.READ)) {
return fileChannel.size();
} catch (IOException ignore) {
return 0L;

Check warning on line 77 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java#L76-L77

Added lines #L76 - L77 were not covered by tests
}
}

long getCheckpointFileContentLength() throws IOException {
long getCheckpointFileContentLength() {
try (FileChannel fileChannel = FileChannel.open(checkpointPath, StandardOpenOption.READ)) {
return fileChannel.size();
} catch (IOException ignore) {
return 0L;

Check warning on line 85 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java#L84-L85

Added lines #L84 - L85 were not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*
* @opensearch.internal
*/
public class TranslogCkpAsMetadataFileTransferManager extends BaseTranslogTransferManager {
public class TranslogCkpAsMetadataFileTransferManager extends TranslogTransferManager {

TransferService transferService;

Expand All @@ -55,14 +55,13 @@ public TranslogCkpAsMetadataFileTransferManager(

@Override
public void transferTranslogCheckpointSnapshot(
Set<TranslogCheckpointSnapshot> generationalSnapshotList,
Set<TranslogCheckpointSnapshot> toUpload,
Map<Long, BlobPath> blobPathMap,
LatchedActionListener<TranslogCheckpointSnapshot> latchedActionListener,
WritePriority writePriority
LatchedActionListener<TranslogCheckpointSnapshot> latchedActionListener
) throws Exception {
Set<TransferFileSnapshot> filesToUpload = new HashSet<>();
Map<TransferFileSnapshot, TranslogCheckpointSnapshot> fileToGenerationSnapshotMap = new HashMap<>();

Check warning on line 63 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java#L62-L63

Added lines #L62 - L63 were not covered by tests
for (TranslogCheckpointSnapshot translogCheckpointSnapshot : generationalSnapshotList) {
for (TranslogCheckpointSnapshot translogCheckpointSnapshot : toUpload) {
TransferFileSnapshot transferFileSnapshot = translogCheckpointSnapshot.getTranslogFileSnapshotWithMetadata();
fileToGenerationSnapshotMap.put(transferFileSnapshot, translogCheckpointSnapshot);
filesToUpload.add(transferFileSnapshot);
Expand All @@ -72,16 +71,12 @@ public void transferTranslogCheckpointSnapshot(
ex -> {
assert ex instanceof FileTransferException;
FileTransferException e = (FileTransferException) ex;
TransferFileSnapshot failedSnapshot = e.getFileSnapshot();
latchedActionListener.onFailure(
new TranslogTransferException(fileToGenerationSnapshotMap.get(e.getFileSnapshot()), ex, null, null)
new TranslogTransferException(fileToGenerationSnapshotMap.get(failedSnapshot), ex, Set.of(failedSnapshot), null)

Check warning on line 76 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java#L73-L76

Added lines #L73 - L76 were not covered by tests
);
}

Check warning on line 78 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java#L78

Added line #L78 was not covered by tests
);
transferService.uploadBlobs(filesToUpload, blobPathMap, actionListener, WritePriority.HIGH);
}

Check warning on line 81 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java#L80-L81

Added lines #L80 - L81 were not covered by tests

@Override
public boolean updateFileNameTransferTracker() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*
* @opensearch.internal
*/
public class TranslogCkpFilesTransferManager extends BaseTranslogTransferManager {
public class TranslogCkpFilesTransferManager extends TranslogTransferManager {

TransferService transferService;
FileTransferTracker fileTransferTracker;
Expand Down Expand Up @@ -60,12 +60,11 @@ public TranslogCkpFilesTransferManager(

@Override
public void transferTranslogCheckpointSnapshot(
Set<TranslogCheckpointSnapshot> generationalSnapshotList,
Set<TranslogCheckpointSnapshot> toUpload,
Map<Long, BlobPath> blobPathMap,
LatchedActionListener<TranslogCheckpointSnapshot> latchedActionListener,
WritePriority writePriority
LatchedActionListener<TranslogCheckpointSnapshot> latchedActionListener
) throws Exception {
for (TranslogCheckpointSnapshot tlogAndCkpTransferFileSnapshot : generationalSnapshotList) {
for (TranslogCheckpointSnapshot tlogAndCkpTransferFileSnapshot : toUpload) {
Set<TransferFileSnapshot> filesToUpload = new HashSet<>();
Set<Exception> exceptionList = ConcurrentCollections.newConcurrentSet();

Expand Down Expand Up @@ -111,12 +110,7 @@ public void transferTranslogCheckpointSnapshot(
}
}
});
transferService.uploadBlobs(filesToUpload, blobPathMap, actionListener, writePriority);
transferService.uploadBlobs(filesToUpload, blobPathMap, actionListener, WritePriority.HIGH);
}
}

@Override
public boolean updateFileNameTransferTracker() {
return true;
}
}
Loading

0 comments on commit 990ff21

Please sign in to comment.