Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Shallow copy snapshot failures on closed index #16868

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -1078,4 +1081,67 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms"));

assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Path shallowSnapshotRepoPath = randomRepoPath();
Settings.Builder settings = Settings.builder()
.put("location", shallowSnapshotRepoPath)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE);
createRepository(shallowSnapshotRepoName, "fs", settings);

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}
flushAndRefresh(INDEX_NAME);

logger.info("Verify shallow snapshot created before close");
final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state());
assertTrue(snapshotInfo1.successfulShards() > 0);
assertEquals(0, snapshotInfo1.failedShards());

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}

// close index
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(1000);
logger.info("Verify shallow snapshot created after close");
final String snapshot2 = "snapshot2";

SnapshotInfo snapshotInfo2 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state());
assertTrue(snapshotInfo2.successfulShards() > 0);
assertEquals(0, snapshotInfo2.failedShards());
}
}
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,26 @@
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}

/**
* reads the last metadata file from remote store and fetches files present in commit and their sizes.
* @return Tuple(Tuple(primaryTerm, commitGeneration), indexFilesToFileLengthMap)
* @throws IOException
*/

public Tuple<Tuple<Long, Long>, Map<String, Long>> acquireLastRemoteUploadedIndexCommit() throws IOException {
if (!indexSettings.isAssignedOnRemoteNode()) {
throw new IllegalStateException("Index is not assigned on Remote Node");

Check warning on line 1635 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L1635

Added line #L1635 was not covered by tests
}
RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile();
final Map<String, Long> indexFilesToFileLengthMap = lastUploadedMetadata.getMetadata()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength()));
long primaryTerm = lastUploadedMetadata.getPrimaryTerm();
long commitGeneration = lastUploadedMetadata.getGeneration();
return new Tuple<>(new Tuple<>(primaryTerm, commitGeneration), indexFilesToFileLengthMap);
}

/**
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ public void snapshotRemoteStoreIndexShard(
String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
in.snapshotRemoteStoreIndexShard(
Expand All @@ -248,7 +250,9 @@ public void snapshotRemoteStoreIndexShard(
shardStateIdentifier,
snapshotStatus,
primaryTerm,
commitGeneration,
startTime,
indexFilesToFileLengthMap,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,13 @@ default void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
IndexCommit snapshotIndexCommit,
@Nullable IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
@Nullable Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3753,27 +3753,42 @@
String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}
if (snapshotIndexCommit == null && indexFilesToFileLengthMap == null) {
listener.onFailure(new RepositoryException(metadata.name(), "both snapshot index commit and index files map cannot be null"));
return;

Check warning on line 3767 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3766-L3767

Added lines #L3766 - L3767 were not covered by tests
}

final ShardId shardId = store.shardId();
try {
final String generation = snapshotStatus.generation();
logger.info("[{}] [{}] shallow copy snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);

long indexTotalFileSize = 0;
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
List<String> fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
List<String> fileNames;

if (snapshotIndexCommit != null) {
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
}
} else {
fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet());
indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum();

Check warning on line 3789 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3788-L3789

Added lines #L3788 - L3789 were not covered by tests
}

int indexTotalNumberOfFiles = fileNames.size();

snapshotStatus.moveToStarted(
Expand All @@ -3784,7 +3799,7 @@
indexTotalFileSize
);

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration);

// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
Expand All @@ -3795,7 +3810,7 @@
snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
primaryTerm,
snapshotIndexCommit.getGeneration(),
commitGeneration,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
indexTotalNumberOfFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -74,7 +76,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -371,7 +372,9 @@
ActionListener<String> listener
) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShardOrNull(shardId.id());
final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE;
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
Expand All @@ -398,36 +401,53 @@
if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) {
long startTime = threadPool.relativeTimeInMillis();
long primaryTerm = indexShard.getOperationPrimaryTerm();
// we flush first to make sure we get the latest writes snapshotted
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
long commitGeneration = snapshotIndexCommit.getGeneration();
long commitGeneration = 0L;
Map<String, Long> indexFilesToFileLengthMap = null;
IndexCommit snapshotIndexCommit = null;

try {
if (closedIndex) {
final Tuple<Tuple<Long, Long>, Map<String, Long>> tuple = indexShard.acquireLastRemoteUploadedIndexCommit();
primaryTerm = tuple.v1().v1();
commitGeneration = tuple.v1().v2();
indexFilesToFileLengthMap = tuple.v2();
} else {
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
}
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (NoSuchFileException e) {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (IOException e) {

Check warning on line 420 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L420

Added line #L420 was not covered by tests
if (closedIndex) {
logger.warn("Exception while reading latest metadata file from remote store");
throw e;

Check warning on line 423 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L422-L423

Added lines #L422 - L423 were not covered by tests
} else {
wrappedSnapshot.close();
logger.warn(

Check warning on line 426 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L425-L426

Added lines #L425 - L426 were not covered by tests
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration

Check warning on line 429 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L428-L429

Added lines #L428 - L429 were not covered by tests
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);

Check warning on line 435 in server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java#L431-L435

Added lines #L431 - L435 were not covered by tests
}
}
try {
repository.snapshotRemoteStoreIndexShard(
indexShard.store(),
snapshot.getSnapshotId(),
indexId,
snapshotIndexCommit,
getShardStateId(indexShard, snapshotIndexCommit),
null,
snapshotStatus,
primaryTerm,
commitGeneration,
startTime,
ActionListener.runBefore(listener, wrappedSnapshot::close)
indexFilesToFileLengthMap,
closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close)
);
} catch (IndexShardSnapshotFailedException e) {
logger.error(
Expand Down
Loading
Loading