Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 5, 2024
1 parent 990ff21 commit c36cecf
Show file tree
Hide file tree
Showing 5 changed files with 889 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws I
this(path, primaryTerm, checksum, null);
}

private TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Map<String, String> metadata) throws IOException {
public TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Map<String, String> metadata) throws IOException {
super(path, metadata);
this.primaryTerm = primaryTerm;
this.checksum = checksum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -120,37 +122,52 @@ public void onSuccess(TranslogCheckpointSnapshot fileSnapshot) {
public void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e) {
long durationInMillis = (System.nanoTime() - fileTransferStartTime) / 1_000_000L;
remoteTranslogTransferTracker.addUploadTimeInMillis(durationInMillis);
updateTransferStats(fileSnapshot, false);
addGeneration(fileSnapshot.getGeneration(), TransferState.FAILED);

if (!ckpAsTranslogMetadata) {
if (ckpAsTranslogMetadata) {
updateTransferStats(fileSnapshot, false);
} else {
assert e instanceof TranslogTransferException;
TranslogTransferException exception = (TranslogTransferException) e;
Set<FileSnapshot.TransferFileSnapshot> failedFiles = exception.getFailedFiles();
Set<FileSnapshot.TransferFileSnapshot> successFiles = exception.getSuccessFiles();
Set<TransferFileSnapshot> failedFiles = exception.getFailedFiles();
Set<TransferFileSnapshot> successFiles = exception.getSuccessFiles();
assert failedFiles.isEmpty() == false;
failedFiles.forEach(failedFile -> add(failedFile.getName(), false));
successFiles.forEach(successFile -> add(successFile.getName(), true));
failedFiles.forEach(failedFile -> {
add(failedFile.getName(), false);
long failedBytes = 0;
try {
failedBytes = failedFile.getContentLength();
} catch (IOException ignore) {}
updateBytesInRemoteTranslogTransferTracker(failedBytes, false);
});
successFiles.forEach(successFile -> {
add(successFile.getName(), true);
long succededBytes = 0;

Check warning on line 145 in server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java#L144-L145

Added lines #L144 - L145 were not covered by tests
try {
succededBytes = successFile.getContentLength();
} catch (IOException ignore) {}
updateBytesInRemoteTranslogTransferTracker(succededBytes, true);
});

Check warning on line 150 in server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java#L147-L150

Added lines #L147 - L150 were not covered by tests
}
}

private void updateTransferStats(TranslogCheckpointSnapshot fileSnapshot, boolean isSuccess) {
Long translogFileBytes = bytesForTlogCkpFileToUpload.get(fileSnapshot.getTranslogFileName());
if (translogFileBytes != null) {
if (isSuccess) {
remoteTranslogTransferTracker.addUploadBytesSucceeded(translogFileBytes);
} else {
remoteTranslogTransferTracker.addUploadBytesFailed(translogFileBytes);
}
updateBytesInRemoteTranslogTransferTracker(translogFileBytes, isSuccess);
}

Long checkpointFileBytes = bytesForTlogCkpFileToUpload.get(fileSnapshot.getCheckpointFileName());
if (checkpointFileBytes != null) {
if (isSuccess) {
remoteTranslogTransferTracker.addUploadBytesSucceeded(checkpointFileBytes);
} else {
remoteTranslogTransferTracker.addUploadBytesFailed(checkpointFileBytes);
}
updateBytesInRemoteTranslogTransferTracker(checkpointFileBytes, isSuccess);
}
}

private void updateBytesInRemoteTranslogTransferTracker(long bytes, boolean isSuccess) {
if (isSuccess) {
remoteTranslogTransferTracker.addUploadBytesSucceeded(bytes);
} else {
remoteTranslogTransferTracker.addUploadBytesFailed(bytes);
}
}

Expand Down
Loading

0 comments on commit c36cecf

Please sign in to comment.