Skip to content

Commit

Permalink
fix test failures and clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 12, 2024
1 parent 2d6a296 commit bd58d92
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws

// Visible for testing
public Set<String> allUploaded() {
return fileTransferTracker.allUploaded();
return fileTransferTracker.allUploadedGeneration();
}

private boolean syncToDisk() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public FileTransferTracker(ShardId shardId, RemoteTranslogTransferTracker remote

public abstract boolean isUploaded(String key);

public abstract Set<String> allUploaded();

abstract void deleteGenerations(Set<Long> generations);

abstract void recordBytesForFiles(Set<TranslogCheckpointSnapshot> toUpload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
generation,
location
);

// Download translog.tlog file with object metadata from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
Map<String, String> metadata = downloadTranslogToFSAndGetMetadata(translogFilename, location, primaryTerm, generation);

try {
assert metadata != null && !metadata.isEmpty() && metadata.containsKey(CHECKPOINT_FILE_DATA_KEY);
recoverCkpFileFromMetadata(metadata, location, generation, translogFilename);
Expand All @@ -110,21 +108,6 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
return true;
}

/**
* Retrieves and processes translog files from a remote store.
*
* <p>This method first attempts to download the "translog.tlog" file from the remote store
* and copy it to the FS. If the downloaded file does not contain specific
* metadata, it then tries to download the "translog.ckp" file from the remote store and
* copy it to the local FS. If the "translog.tlog" file contains the required
* metadata, its content is used to write the "translog.ckp" file to FS.
*
* @param fileName The name of the translog file (e.g., "translog.tlog").
* @param location The local file system path where the translog files will be stored.
* @param primaryTerm The primary term associated with the translog files.
* @param generation The generation associated with the translog files.
* @throws IOException If an I/O error occurs during the file operations.
*/
private Map<String, String> downloadTranslogToFSAndGetMetadata(String fileName, Path location, String primaryTerm, String generation)
throws IOException {
Path filePath = location.resolve(fileName);
Expand Down Expand Up @@ -161,15 +144,6 @@ private Map<String, String> downloadTranslogToFSAndGetMetadata(String fileName,

/**
* Process the provided metadata and tries to write the content of the checkpoint (ckp) file to the FS.
*
* <p>This method takes the metadata from the translog file download and uses it to generate the content
* of the checkpoint file. The checkpoint file is then written to the specified local file system path
* with the given file name and generation.
*
* @param metadata A map containing the metadata extracted from the translog file.
* @param location The local file system path where the checkpoint file will be written.
* @param generation The generation associated with the checkpoint file.
* @param fileName The name of the checkpoint file (e.g., "translog.ckp").
*/
private void recoverCkpFileFromMetadata(Map<String, String> metadata, Path location, String generation, String fileName)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public boolean isUploaded(String generation) {
return super.isGenerationUploaded(Long.parseLong(generation));

Check warning on line 49 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTracker.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTracker.java#L49

Added line #L49 was not covered by tests
}

@Override
public Set<String> allUploaded() {
return super.allUploadedGeneration();
}

@Override
void recordBytesForFiles(Set<TranslogCheckpointSnapshot> toUpload) {
bytesForTlogCkpFileToUpload = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,9 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
generation,
location
);

// Download translog.tlog and translog.ckp files from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));

downloadFileToFS(translogFilename, location, primaryTerm);
downloadFileToFS(ckpFileName, location, primaryTerm);
fileTransferTracker.addGeneration(Long.parseLong(generation), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ public boolean isUploaded(String file) {
return fileTransferTracker.get(file) == TransferState.SUCCESS;
}

@Override
public Set<String> allUploaded() {
return getSuccessfulKeys(fileTransferTracker);
}

// here along with generation we also mark status of files in the tracker.
@Override
void addGeneration(long generation, boolean success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,14 @@ public static Metadata createIndexMetadataWithRemoteStoreSettings(String indexNa
)
.putCustom(
REMOTE_STORE_CUSTOM_KEY,
Map.of(RemoteStoreEnums.PathType.NAME, "dummy", RemoteStoreEnums.PathHashAlgorithm.NAME, "dummy")
Map.of(
RemoteStoreEnums.PathType.NAME,
"dummy",
RemoteStoreEnums.PathHashAlgorithm.NAME,
"dummy",
RemoteStoreEnums.CKP_AS_METADATA,
"dummy"
)
)
.build();
return Metadata.builder().put(indexMetadata).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,21 @@ public void testGetStrategyWithDynamicUpdate() {
}

public void testTranslogCkpAsMetadataAllowedMinVersionNewer() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), randomBoolean()).build();
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), true).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.CURRENT);
assertTrue(resolver.getRemoteStoreTranslogCkpAsMetadataAllowed());
}

public void testTranslogCkpAsMetadataAllowed2MinVersionNewer() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), false).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.CURRENT);
assertFalse(resolver.getRemoteStoreTranslogCkpAsMetadataAllowed());
}

public void testTranslogCkpAsMetadataAllowedMinVersionOlder() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), randomBoolean()).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,13 +671,13 @@ public void testSimpleOperationsUpload() throws Exception {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}

assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
assertEquals(6, translog.allUploaded().size());
assertEquals(3, translog.allUploaded().size());

translog.rollGeneration();
assertEquals(6, translog.allUploaded().size());
assertEquals(3, translog.allUploaded().size());

Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertEquals(2, mdFiles.size());
Expand Down Expand Up @@ -722,7 +722,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(2, translog.readers.size());
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
Expand All @@ -736,7 +736,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
Expand All @@ -755,7 +755,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
assertBusy(() -> {
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
Expand All @@ -775,7 +775,7 @@ public void testMetadataFileDeletion() throws Exception {
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
}
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> assertEquals(2, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
int moreDocs = randomIntBetween(3, 10);
logger.info("numDocs={} moreDocs={}", numDocs, moreDocs);
Expand All @@ -785,7 +785,7 @@ public void testMetadataFileDeletion() throws Exception {
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1 + moreDocs, translog.readers.size());
assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1 + (long) moreDocs, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

int totalDocs = numDocs + moreDocs;
Expand Down Expand Up @@ -845,7 +845,7 @@ public void testDrainSync() throws Exception {
assertEquals(1, translog.readers.size());

addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(0), 0, primaryTerm.get(), new byte[] { 1 }));
assertEquals(4, translog.allUploaded().size());
assertEquals(2, translog.allUploaded().size());
assertEquals(2, translog.readers.size());
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

Expand Down Expand Up @@ -873,7 +873,7 @@ public void testDrainSync() throws Exception {
assertBusy(() -> assertEquals(0, latch.getCount()));
assertEquals(0, translog.availablePermits());
slowDown.setSleepSeconds(0);
assertEquals(6, translog.allUploaded().size());
assertEquals(3, translog.allUploaded().size());
assertEquals(2, translog.readers.size());
Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));

Expand All @@ -882,7 +882,7 @@ public void testDrainSync() throws Exception {
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertEquals(6, translog.allUploaded().size());
assertEquals(3, translog.allUploaded().size());
assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)));

// Case 4 - After drainSync, if an upload is an attempted, we do not upload to remote store.
Expand All @@ -892,21 +892,21 @@ public void testDrainSync() throws Exception {
new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 })
);
assertEquals(1, translog.readers.size());
assertEquals(6, translog.allUploaded().size());
assertEquals(3, translog.allUploaded().size());
assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)));

// Refill the permits back
Releasables.close(releasable);
addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 }));
assertEquals(2, translog.readers.size());
assertEquals(8, translog.allUploaded().size());
assertEquals(4, translog.allUploaded().size());
assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size());

translog.setMinSeqNoToKeep(3);
translog.trimUnreferencedReaders();
assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
assertEquals(1, translog.readers.size());
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(() -> assertEquals(2, translog.allUploaded().size()));
assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,14 @@ public void testOnSuccessStatsFailure_WhenCkpAsMetadata() throws IOException {
RemoteTranslogTransferTracker localRemoteTranslogTransferTracker = spy(remoteTranslogTransferTracker);
doAnswer((count) -> { throw new NullPointerException("Error while updating stats"); }).when(localRemoteTranslogTransferTracker)
.addUploadBytesSucceeded(anyLong());

FileTransferTracker localFileTransferTracker = FileTransferTrackerFactory.getFileTransferTracker(
shardId,
localRemoteTranslogTransferTracker,
true
);

Path testFile = createTempFile();
int fileSize = 128;
Files.write(testFile, randomByteArrayOfLength(fileSize), StandardOpenOption.APPEND);

TranslogCheckpointSnapshot transferFileSnapshot = new TranslogCheckpointSnapshot(
primaryTerm,
generation,
Expand All @@ -161,7 +158,6 @@ public void testOnSuccessStatsFailure_WhenCkpAsMetadata() throws IOException {
null,
generation
);

Set<TranslogCheckpointSnapshot> toUpload = new HashSet<>();
toUpload.add(transferFileSnapshot);
localFileTransferTracker.recordBytesForFiles(toUpload);
Expand Down Expand Up @@ -198,5 +194,4 @@ public void testUploaded_WhenCkpAsMetadata() throws IOException {
fileTransferTrackerCkpAsMetadata.deleteGenerations(Set.of(generation));
assertFalse(fileTransferTrackerCkpAsMetadata.isGenerationUploaded(generation));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,10 @@ public void testDeleteTranslogSuccess() throws Exception {
tracker.addFile(translogFile, true);
tracker.addFile(checkpointFile, true);
assertEquals(1, tracker.allUploadedGeneration().size());
assertEquals(2, tracker.allUploaded().size());

List<String> files = List.of(checkpointFile, translogFile);
translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {});
assertBusy(() -> assertEquals(0, tracker.allUploadedGeneration().size()));
assertBusy(() -> assertEquals(0, tracker.allUploaded().size()));
verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files));
}

Expand Down Expand Up @@ -587,12 +585,10 @@ public void testDeleteTranslogFailure() throws Exception {
tracker.addFile(translogFile, true);
tracker.addFile(checkpointFile, true);
tracker.addGeneration(19, true);
assertEquals(2, tracker.allUploaded().size());
assertEquals(1, tracker.allUploadedGeneration().size());

translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {});
assertEquals(1, tracker.allUploadedGeneration().size());
assertEquals(2, tracker.allUploaded().size());
}

private void assertNoDownloadStats(boolean nonZeroUploadTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public void testOnSuccess() throws IOException {
remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize);
fileTransferTracker.onSuccess(transferFileSnapshot);
assertEquals(fileTransferTracker.allUploadedGeneration().size(), 1);
assertEquals(fileTransferTracker.allUploaded().size(), 2);
try {
remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize);
fileTransferTracker.onFailure(transferFileSnapshot, new IOException("random exception"));
Expand Down Expand Up @@ -132,12 +131,10 @@ public void testOnFailure() throws IOException {
);
fileTransferTracker.onSuccess(translogCheckpointSnapshot2);
assertEquals(fileTransferTracker.allUploadedGeneration().size(), 1);
assertEquals(fileTransferTracker.allUploaded().size(), 2);

remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 2);
fileTransferTracker.onSuccess(translogCheckpointSnapshot1);
assertEquals(fileTransferTracker.allUploadedGeneration().size(), 2);
assertEquals(fileTransferTracker.allUploaded().size(), 4);

checkpointFileSnapshot1.close();
translogFileSnapshot1.close();
Expand Down

0 comments on commit bd58d92

Please sign in to comment.