Skip to content

Commit

Permalink
Merge branch 'main' into auto-restore-cluster-state-version-remote-state
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Oct 25, 2023
2 parents ca4e51f + b5299f1 commit 6028a28
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 18 deletions.
29 changes: 13 additions & 16 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -695,24 +695,21 @@ public void setLastAcceptedState(ClusterState clusterState) {
try {
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
if (clusterState.metadata().clusterUUIDCommitted() == true) {
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
if (latestManifest.isPresent()) {
// The previous UUID should not change for the current UUID. So fetching the latest manifest
// from remote store and getting the previous UUID.
previousClusterUUID = latestManifest.get().getPreviousClusterUUID();
} else {
// When the user starts the cluster with remote state disabled but later enables the remote state,
// there will not be any manifest for the current cluster UUID.
logger.error(
"Latest manifest is not present in remote store for cluster UUID: {}",
clusterState.metadata().clusterUUID()
);
if (latestManifest.isPresent()) {
// The previous UUID should not change for the current UUID. So fetching the latest manifest
// from remote store and getting the previous UUID.
previousClusterUUID = latestManifest.get().getPreviousClusterUUID();
} else {
// When the user starts the cluster with remote state disabled but later enables the remote state,
// there will not be any manifest for the current cluster UUID.
logger.error(
"Latest manifest is not present in remote store for cluster UUID: {}",
clusterState.metadata().clusterUUID()
);
previousClusterUUID = ClusterState.UNKNOWN_UUID;
}
}
manifest = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4962,7 +4962,8 @@ private String copySegmentFiles(
return segmentNFile;
}

private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) {
// Visible for testing
boolean localDirectoryContains(Directory localDirectory, String file, long checksum) throws IOException {
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
if (checksum == CodecUtil.retrieveChecksum(indexInput)) {
return true;
Expand All @@ -4981,6 +4982,8 @@ private boolean localDirectoryContains(Directory localDirectory, String file, lo
logger.debug("File {} does not exist in local FS, downloading from remote store", file);
} catch (IOException e) {
logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file);
// For any other exception on reading checksum, we delete the file to re-download again
localDirectory.deleteFile(file);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
Expand Down Expand Up @@ -765,6 +767,43 @@ public void testRemotePersistedState() throws IOException {
assertThat(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted(), equalTo(true));
}

public void testRemotePersistedStateNotCommitted() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
.previousClusterUUID(previousClusterUUID)
.clusterTerm(1L)
.stateVersion(5L)
.build();
Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any()))
.thenReturn(Optional.of(manifest));
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(manifest);

Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(manifest);
CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(
remoteClusterStateService,
ClusterState.UNKNOWN_UUID
);

assertThat(remotePersistedState.getLastAcceptedState(), nullValue());
assertThat(remotePersistedState.getCurrentTerm(), equalTo(0L));

final long clusterTerm = randomNonNegativeLong();
ClusterState clusterState = createClusterState(
randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build()
);
clusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).clusterUUIDCommitted(false).build())
.build();

remotePersistedState.setLastAcceptedState(clusterState);
ArgumentCaptor<String> previousClusterUUIDCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<ClusterState> clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture());
assertEquals(previousClusterUUID, previousClusterUUIDCaptor.getValue());
}

public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -45,6 +46,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.tests.mockfile.ExtrasFS;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -91,6 +93,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -163,11 +166,13 @@
import org.junit.Assert;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -4907,6 +4912,53 @@ public void testRecordsForceMerges() throws IOException {
closeShards(shard);
}

public void testLocalDirectoryContains() throws IOException {
IndexShard indexShard = newStartedShard(true);
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Integer.toString(i));
}
flushShard(indexShard);
indexShard.store().incRef();
Directory localDirectory = indexShard.store().directory();
Path shardPath = indexShard.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
Path tempDir = createTempDir();
for (String file : localDirectory.listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
boolean corrupted = randomBoolean();
long checksum = 0;
try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) {
checksum = CodecUtil.retrieveChecksum(indexInput);
}
if (corrupted) {
Files.copy(shardPath.resolve(file), tempDir.resolve(file));
try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8));
}
}
if (corrupted == false) {
assertTrue(indexShard.localDirectoryContains(localDirectory, file, checksum));
} else {
assertFalse(indexShard.localDirectoryContains(localDirectory, file, checksum));
assertFalse(Files.exists(shardPath.resolve(file)));
}
}
try (Stream<Path> files = Files.list(tempDir)) {
files.forEach(p -> {
try {
Files.copy(p, shardPath.resolve(p.getFileName()));
} catch (IOException e) {
// Ignore
}
});
}
FileSystemUtils.deleteSubDirectories(tempDir);
indexShard.store().decRef();
closeShards(indexShard);
}

private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker tracker) {
tracker.addUploadBytesStarted(30L);
tracker.addUploadBytesSucceeded(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static void corruptFile(Random random, Path... files) throws IOException
}
}

static void corruptAt(Path path, FileChannel channel, int position) throws IOException {
public static void corruptAt(Path path, FileChannel channel, int position) throws IOException {
// read
channel.position(position);
long filePointer = channel.position();
Expand Down

0 comments on commit 6028a28

Please sign in to comment.