From 131908a740174831b72f1edd78ce74882f21919a Mon Sep 17 00:00:00 2001 From: sudarshan baliga Date: Fri, 14 Jul 2023 18:28:32 +0530 Subject: [PATCH] Remove the helper function for rsa transport and write the logic in the class file. Signed-off-by: sudarshan baliga --- .../gateway/ReplicaShardAllocator.java | 27 +- ...sportNodesBatchListShardStoreMetadata.java | 328 ----------- .../TransportNodesListShardStoreMetadata.java | 196 ++++++- ...sportNodesListShardStoreMetadataBatch.java | 522 ++++++++++++++++++ ...portNodesListShardStoreMetadataHelper.java | 230 -------- .../gateway/ReplicaShardAllocatorTests.java | 7 +- .../opensearch/index/store/StoreTests.java | 10 +- 7 files changed, 722 insertions(+), 598 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java create mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java delete mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index 8026415915491..5216dd2fcb4b5 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -51,8 +51,8 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import java.util.ArrayList; import java.util.Collections; @@ -106,7 +106,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) { assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard @@ -223,7 +223,7 @@ public AllocateUnassignedDecision makeAllocationDecision( } assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica @@ -357,7 +357,7 @@ private static List augmentExplanationsWithStoreInfo( /** * Finds the store for the assigned shard in the fetched data, returns null if none is found. */ - private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore( + private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore( DiscoveryNode node, AsyncShardFetch.FetchResult data ) { @@ -373,7 +373,7 @@ private MatchingNodes findMatchingNodes( RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, AsyncShardFetch.FetchResult data, boolean explain ) { @@ -386,8 +386,7 @@ private MatchingNodes findMatchingNodes( && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { continue; } - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue() - .storeFilesMetadata(); + TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata(); // we don't have any files at all, it is an empty index if (storeFilesMetadata.isEmpty()) { continue; @@ -443,8 +442,8 @@ private MatchingNodes findMatchingNodes( } private static long computeMatchingBytes( - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata + TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata ) { long sizeMatched = 0; for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) { @@ -457,8 +456,8 @@ private static long computeMatchingBytes( } private static boolean hasMatchingSyncId( - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore + TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore ) { String primarySyncId = primaryStore.syncId(); return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); @@ -466,9 +465,9 @@ private static boolean hasMatchingSyncId( private static MatchingNode computeMatchingNode( DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, DiscoveryNode replicaNode, - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore + TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore ) { final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); @@ -479,7 +478,7 @@ private static MatchingNode computeMatchingNode( } private static boolean canPerformOperationBasedRecovery( - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, AsyncShardFetch.FetchResult shardStores, DiscoveryNode targetNode ) { diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java deleted file mode 100644 index c34758099b114..0000000000000 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.store; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; -import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionType; -import org.opensearch.action.FailedNodeException; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.action.support.nodes.BaseNodesRequest; -import org.opensearch.action.support.nodes.BaseNodesResponse; -import org.opensearch.action.support.nodes.TransportNodesAction; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.settings.Settings; -import org.opensearch.env.NodeEnvironment; -import org.opensearch.gateway.AsyncShardsFetchPerNode; -import org.opensearch.index.shard.ShardId; -import org.opensearch.indices.IndicesService; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportRequest; -import org.opensearch.transport.TransportService; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * Transport action for fetching the batch of shard stores Metadata from a list of transport nodes - * - * @opensearch.internal - */ -public class TransportNodesBatchListShardStoreMetadata extends TransportNodesAction< - TransportNodesBatchListShardStoreMetadata.Request, - TransportNodesBatchListShardStoreMetadata.NodesStoreFilesMetadata, - TransportNodesBatchListShardStoreMetadata.NodeRequest, - TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> - implements - AsyncShardsFetchPerNode.Lister< - TransportNodesBatchListShardStoreMetadata.NodesStoreFilesMetadata, - TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> { - - public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store/batch"; - public static final ActionType TYPE = new ActionType<>( - ACTION_NAME, - TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata::new - ); - - private final Settings settings; - private final IndicesService indicesService; - private final NodeEnvironment nodeEnv; - - @Inject - public TransportNodesBatchListShardStoreMetadata( - Settings settings, - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - IndicesService indicesService, - NodeEnvironment nodeEnv, - ActionFilters actionFilters - ) { - super( - ACTION_NAME, - threadPool, - clusterService, - transportService, - actionFilters, - Request::new, - NodeRequest::new, - ThreadPool.Names.FETCH_SHARD_STORE, - NodeStoreFilesMetadataBatch.class - ); - this.settings = settings; - this.indicesService = indicesService; - this.nodeEnv = nodeEnv; - } - - @Override - public void list( - DiscoveryNode[] nodes, - Map shardIdsWithCustomDataPath, - ActionListener listener - ) { - execute(new TransportNodesBatchListShardStoreMetadata.Request(shardIdsWithCustomDataPath, nodes), listener); - } - - @Override - protected NodeRequest newNodeRequest(Request request) { - return new NodeRequest(request); - } - - @Override - protected NodeStoreFilesMetadataBatch newNodeResponse(StreamInput in) throws IOException { - return new NodeStoreFilesMetadataBatch(in); - } - - @Override - protected NodesStoreFilesMetadata newResponse( - Request request, - List responses, - List failures - ) { - return new NodesStoreFilesMetadata(clusterService.getClusterName(), responses, failures); - } - - @Override - protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) { - try { - return new NodeStoreFilesMetadataBatch(clusterService.localNode(), listStoreMetadata(request)); - } catch (IOException e) { - throw new OpenSearchException("Failed to list store metadata for shards [" + request.getShardIdsWithCustomDataPath() + "]", e); - } - } - - /** - * This method is similar to listStoreMetadata method of {@link TransportNodesListShardStoreMetadata} - * In this case we fetch the shard store files for batch of shards instead of one shard. - */ - private Map listStoreMetadata(NodeRequest request) throws IOException { - Map shardStoreMetadataMap = new HashMap(); - for (Map.Entry shardToCustomDataPathEntry : request.getShardIdsWithCustomDataPath().entrySet()) { - final ShardId shardId = shardToCustomDataPathEntry.getKey(); - try { - logger.debug("Listing store meta data for {}", shardId); - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata nodeStoreFilesMetadata = - TransportNodesListShardStoreMetadataHelper.getStoreFilesMetadata( - logger, - indicesService, - clusterService, - shardId, - shardToCustomDataPathEntry.getValue(), - settings, - nodeEnv - ); - shardStoreMetadataMap.put(shardId, new NodeStoreFilesMetadata(nodeStoreFilesMetadata, null)); - } catch (Exception storeFileFetchException) { - logger.trace(new ParameterizedMessage("Unable to fetch store files for shard [{}]", shardId), storeFileFetchException); - shardStoreMetadataMap.put(shardId, new NodeStoreFilesMetadata(null, storeFileFetchException)); - } - } - logger.debug("Loaded store meta data for {} shards", shardStoreMetadataMap); - return shardStoreMetadataMap; - } - - /** - * Request is used in constructing the request for making the transport request to set of other node. - * Refer {@link TransportNodesAction} class start method. - * - * @opensearch.internal - */ - public static class Request extends BaseNodesRequest { - - private final Map shardIdsWithCustomDataPath; - - public Request(StreamInput in) throws IOException { - super(in); - shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); - } - - public Request(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes) { - super(nodes); - this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdsWithCustomDataPath); - } - - public Map getShardIdsWithCustomDataPath() { - return shardIdsWithCustomDataPath; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); - } - } - - /** - * Metadata for the nodes store files - * - * @opensearch.internal - */ - public static class NodesStoreFilesMetadata extends BaseNodesResponse { - - public NodesStoreFilesMetadata(StreamInput in) throws IOException { - super(in); - } - - public NodesStoreFilesMetadata( - ClusterName clusterName, - List nodes, - List failures - ) { - super(clusterName, nodes, failures); - } - - @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readList(NodeStoreFilesMetadataBatch::new); - } - - @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeList(nodes); - } - } - - /** - * The metadata for the node store files - * - * @opensearch.internal - */ - public static class NodeStoreFilesMetadata { - - private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata; - private Exception storeFileFetchException; - - public NodeStoreFilesMetadata(TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata) { - this.storeFilesMetadata = storeFilesMetadata; - this.storeFileFetchException = null; - } - - public NodeStoreFilesMetadata(StreamInput in) throws IOException { - storeFilesMetadata = new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); - this.storeFileFetchException = null; - } - - public NodeStoreFilesMetadata( - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata, - Exception storeFileFetchException - ) { - this.storeFilesMetadata = storeFilesMetadata; - this.storeFileFetchException = storeFileFetchException; - } - - public TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata() { - return storeFilesMetadata; - } - - public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException { - return new NodeStoreFilesMetadata(in); - } - - public void writeTo(StreamOutput out) throws IOException { - storeFilesMetadata.writeTo(out); - } - - public Exception getStoreFileFetchException() { - return storeFileFetchException; - } - - @Override - public String toString() { - return "[[" + storeFilesMetadata + "]]"; - } - } - - /** - * NodeRequest class is for deserializing the request received by this node from other node for this transport action. - * This is used in {@link TransportNodesAction} - * @opensearch.internal - */ - public static class NodeRequest extends TransportRequest { - - private final Map shardIdsWithCustomDataPath; - - public NodeRequest(StreamInput in) throws IOException { - super(in); - shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); - } - - public NodeRequest(Request request) { - this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsWithCustomDataPath()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); - } - - public Map getShardIdsWithCustomDataPath() { - return shardIdsWithCustomDataPath; - } - } - - /** - * NodeStoreFilesMetadataBatch Response received by the node from other node for this transport action. - * Refer {@link TransportNodesAction} - */ - public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse { - private final Map nodeStoreFilesMetadataBatch; - - protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException { - super(in); - this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new); - } - - public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map nodeStoreFilesMetadataBatch) { - super(node); - this.nodeStoreFilesMetadataBatch = nodeStoreFilesMetadataBatch; - } - - public Map getNodeStoreFilesMetadataBatch() { - return this.nodeStoreFilesMetadataBatch; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); - } - } - -} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index 6436b288aea4e..bdb0d99fa93b0 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -32,6 +32,7 @@ package org.opensearch.indices.store; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionType; @@ -42,24 +43,38 @@ import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.gateway.AsyncShardFetch; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * Metadata for shard stores from a list of transport nodes @@ -142,18 +157,168 @@ protected NodeStoreFilesMetadata nodeOperation(NodeRequest request) { } } - private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException { + private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException { final ShardId shardId = request.getShardId(); logger.trace("listing store meta data for {}", shardId); - return TransportNodesListShardStoreMetadataHelper.getStoreFilesMetadata( - logger, - indicesService, - clusterService, - shardId, - request.getCustomDataPath(), - settings, - nodeEnv - ); + long startTimeNS = System.nanoTime(); + boolean exists = false; + try { + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null) { + IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + if (indexShard != null) { + try { + final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( + shardId, + indexShard.snapshotStoreMetadata(), + indexShard.getPeerRecoveryRetentionLeases() + ); + exists = true; + return storeFilesMetadata; + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + } + } + final String customDataPath; + if (request.getCustomDataPath() != null) { + customDataPath = request.getCustomDataPath(); + } else { + // TODO: Fallback for BWC with older predecessor (ES) versions. + // Remove this once request.getCustomDataPath() always returns non-null + if (indexService != null) { + customDataPath = indexService.getIndexSettings().customDataPath(); + } else { + IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + } + final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: + // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica + // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not + // reuse local resources. + final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( + shardPath.resolveIndex(), + shardId, + nodeEnv::shardLock, + logger + ); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); + } finally { + TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + if (exists) { + logger.debug("{} loaded store meta data (took [{}])", shardId, took); + } else { + logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); + } + } + } + + /** + * Metadata for store files + * + * @opensearch.internal + */ + public static class StoreFilesMetadata implements Iterable, Writeable { + private final ShardId shardId; + private final Store.MetadataSnapshot metadataSnapshot; + private final List peerRecoveryRetentionLeases; + + public StoreFilesMetadata( + ShardId shardId, + Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases + ) { + this.shardId = shardId; + this.metadataSnapshot = metadataSnapshot; + this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; + } + + public StoreFilesMetadata(StreamInput in) throws IOException { + this.shardId = new ShardId(in); + this.metadataSnapshot = new Store.MetadataSnapshot(in); + this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + metadataSnapshot.writeTo(out); + out.writeList(peerRecoveryRetentionLeases); + } + + public ShardId shardId() { + return this.shardId; + } + + public boolean isEmpty() { + return metadataSnapshot.size() == 0; + } + + @Override + public Iterator iterator() { + return metadataSnapshot.iterator(); + } + + public boolean fileExists(String name) { + return metadataSnapshot.asMap().containsKey(name); + } + + public StoreFileMetadata file(String name) { + return metadataSnapshot.asMap().get(name); + } + + /** + * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. + */ + public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { + assert node != null; + final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); + return peerRecoveryRetentionLeases.stream() + .filter(lease -> lease.id().equals(retentionLeaseId)) + .mapToLong(RetentionLease::retainingSequenceNumber) + .findFirst() + .orElse(-1L); + } + + public List peerRecoveryRetentionLeases() { + return peerRecoveryRetentionLeases; + } + + /** + * @return commit sync id if exists, else null + */ + public String syncId() { + return metadataSnapshot.getSyncId(); + } + + @Override + public String toString() { + return "StoreFilesMetadata{" + + ", shardId=" + + shardId + + ", metadataSnapshot{size=" + + metadataSnapshot.size() + + ", syncId=" + + metadataSnapshot.getSyncId() + + "}" + + '}'; + } } /** @@ -279,22 +444,19 @@ public String getCustomDataPath() { */ public static class NodeStoreFilesMetadata extends BaseNodeResponse { - private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata; + private StoreFilesMetadata storeFilesMetadata; public NodeStoreFilesMetadata(StreamInput in) throws IOException { super(in); - storeFilesMetadata = new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); + storeFilesMetadata = new StoreFilesMetadata(in); } - public NodeStoreFilesMetadata( - DiscoveryNode node, - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata - ) { + public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) { super(node); this.storeFilesMetadata = storeFilesMetadata; } - public TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata() { + public StoreFilesMetadata storeFilesMetadata() { return storeFilesMetadata; } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java new file mode 100644 index 0000000000000..36b2d44193e5b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -0,0 +1,522 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.gateway.AsyncShardsFetchPerNode; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Transport action for fetching the batch of shard stores Metadata from a list of transport nodes + * + * @opensearch.internal + */ +public class TransportNodesListShardStoreMetadataBatch extends TransportNodesAction< + TransportNodesListShardStoreMetadataBatch.Request, + TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch, + TransportNodesListShardStoreMetadataBatch.NodeRequest, + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> + implements + AsyncShardsFetchPerNode.Lister< + TransportNodesListShardStoreMetadataBatch.NodesStoreFilesMetadataBatch, + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> { + + public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store/batch"; + public static final ActionType TYPE = new ActionType<>( + ACTION_NAME, + TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata::new + ); + + private final Settings settings; + private final IndicesService indicesService; + private final NodeEnvironment nodeEnv; + + @Inject + public TransportNodesListShardStoreMetadataBatch( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + NodeEnvironment nodeEnv, + ActionFilters actionFilters + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STORE, + NodeStoreFilesMetadataBatch.class + ); + this.settings = settings; + this.indicesService = indicesService; + this.nodeEnv = nodeEnv; + } + + @Override + public void list( + DiscoveryNode[] nodes, + Map shardIdsWithCustomDataPath, + ActionListener listener + ) { + execute(new TransportNodesListShardStoreMetadataBatch.Request(shardIdsWithCustomDataPath, nodes), listener); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeStoreFilesMetadataBatch newNodeResponse(StreamInput in) throws IOException { + return new NodeStoreFilesMetadataBatch(in); + } + + @Override + protected NodesStoreFilesMetadataBatch newResponse( + Request request, + List responses, + List failures + ) { + return new NodesStoreFilesMetadataBatch(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) { + try { + return new NodeStoreFilesMetadataBatch(clusterService.localNode(), listStoreMetadata(request)); + } catch (IOException e) { + throw new OpenSearchException("Failed to list store metadata for shards [" + request.getShardIdsWithCustomDataPath() + "]", e); + } + } + + /** + * This method is similar to listStoreMetadata method of {@link TransportNodesListShardStoreMetadata} + * In this case we fetch the shard store files for batch of shards instead of one shard. + */ + private Map listStoreMetadata(NodeRequest request) throws IOException { + Map shardStoreMetadataMap = new HashMap(); + for (Map.Entry shardToCustomDataPathEntry : request.getShardIdsWithCustomDataPath().entrySet()) { + final ShardId shardId = shardToCustomDataPathEntry.getKey(); + logger.trace("listing store meta data for {}", shardId); + long startTimeNS = System.nanoTime(); + boolean exists = false; + try { + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null) { + IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + if (indexShard != null) { + try { + final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( + shardId, + indexShard.snapshotStoreMetadata(), + indexShard.getPeerRecoveryRetentionLeases() + ); + exists = true; + shardStoreMetadataMap.put(shardId, new NodeStoreFilesMetadata(storeFilesMetadata, null)); + continue; + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), + e + ) + ); + continue; + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), + e + ) + ); + continue; + } + } + } + final String customDataPath; + if (shardToCustomDataPathEntry.getValue() != null) { + customDataPath = shardToCustomDataPathEntry.getValue(); + } else { + // TODO: Fallback for BWC with older predecessor (ES) versions. + // Remove this once request.getCustomDataPath() always returns non-null + if (indexService != null) { + customDataPath = indexService.getIndexSettings().customDataPath(); + } else { + IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), + new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()) + ) + ); + continue; + } + } + } + final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), + null + ) + ); + continue; + } + // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this + // means: + // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica + // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may + // not + // reuse local resources. + final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( + shardPath.resolveIndex(), + shardId, + nodeEnv::shardLock, + logger + ); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases + // when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata(new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()), null) + ); + } catch (Exception e) { + logger.trace("{} failed to load store metadata {}", shardId, e); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()), + new OpenSearchException("failed to load store metadata", e) + ) + ); + } finally { + TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + if (exists) { + logger.debug("{} loaded store meta data (took [{}])", shardId, took); + } else { + logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); + } + } + } + return shardStoreMetadataMap; + } + + /** + * Metadata for store files + * + * @opensearch.internal + */ + public static class StoreFilesMetadata implements Iterable, Writeable { + private final ShardId shardId; + private final Store.MetadataSnapshot metadataSnapshot; + private final List peerRecoveryRetentionLeases; + + public StoreFilesMetadata( + ShardId shardId, + Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases + ) { + this.shardId = shardId; + this.metadataSnapshot = metadataSnapshot; + this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; + } + + public StoreFilesMetadata(StreamInput in) throws IOException { + this.shardId = new ShardId(in); + this.metadataSnapshot = new Store.MetadataSnapshot(in); + this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + metadataSnapshot.writeTo(out); + out.writeList(peerRecoveryRetentionLeases); + } + + public ShardId shardId() { + return this.shardId; + } + + public boolean isEmpty() { + return metadataSnapshot.size() == 0; + } + + @Override + public Iterator iterator() { + return metadataSnapshot.iterator(); + } + + public boolean fileExists(String name) { + return metadataSnapshot.asMap().containsKey(name); + } + + public StoreFileMetadata file(String name) { + return metadataSnapshot.asMap().get(name); + } + + /** + * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. + */ + public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { + assert node != null; + final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); + return peerRecoveryRetentionLeases.stream() + .filter(lease -> lease.id().equals(retentionLeaseId)) + .mapToLong(RetentionLease::retainingSequenceNumber) + .findFirst() + .orElse(-1L); + } + + public List peerRecoveryRetentionLeases() { + return peerRecoveryRetentionLeases; + } + + /** + * @return commit sync id if exists, else null + */ + public String syncId() { + return metadataSnapshot.getSyncId(); + } + + @Override + public String toString() { + return "StoreFilesMetadata{" + + ", shardId=" + + shardId + + ", metadataSnapshot{size=" + + metadataSnapshot.size() + + ", syncId=" + + metadataSnapshot.getSyncId() + + "}" + + '}'; + } + } + + /** + * Request is used in constructing the request for making the transport request to set of other node. + * Refer {@link TransportNodesAction} class start method. + * + * @opensearch.internal + */ + public static class Request extends BaseNodesRequest { + + private final Map shardIdsWithCustomDataPath; + + public Request(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public Request(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes) { + super(nodes); + this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdsWithCustomDataPath); + } + + public Map getShardIdsWithCustomDataPath() { + return shardIdsWithCustomDataPath; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + } + + /** + * Metadata for the nodes store files + * + * @opensearch.internal + */ + public static class NodesStoreFilesMetadataBatch extends BaseNodesResponse { + + public NodesStoreFilesMetadataBatch(StreamInput in) throws IOException { + super(in); + } + + public NodesStoreFilesMetadataBatch( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeStoreFilesMetadataBatch::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + /** + * The metadata for the node store files + * + * @opensearch.internal + */ + public static class NodeStoreFilesMetadata { + + private StoreFilesMetadata storeFilesMetadata; + private Exception storeFileFetchException; + + public NodeStoreFilesMetadata(StoreFilesMetadata storeFilesMetadata) { + this.storeFilesMetadata = storeFilesMetadata; + this.storeFileFetchException = null; + } + + public NodeStoreFilesMetadata(StreamInput in) throws IOException { + storeFilesMetadata = new StoreFilesMetadata(in); + this.storeFileFetchException = null; + } + + public NodeStoreFilesMetadata(StoreFilesMetadata storeFilesMetadata, Exception storeFileFetchException) { + this.storeFilesMetadata = storeFilesMetadata; + this.storeFileFetchException = storeFileFetchException; + } + + public StoreFilesMetadata storeFilesMetadata() { + return storeFilesMetadata; + } + + public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException { + return new NodeStoreFilesMetadata(in); + } + + public void writeTo(StreamOutput out) throws IOException { + storeFilesMetadata.writeTo(out); + } + + public Exception getStoreFileFetchException() { + return storeFileFetchException; + } + + @Override + public String toString() { + return "[[" + storeFilesMetadata + "]]"; + } + } + + /** + * NodeRequest class is for deserializing the request received by this node from other node for this transport action. + * This is used in {@link TransportNodesAction} + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + private final Map shardIdsWithCustomDataPath; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public NodeRequest(Request request) { + this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsWithCustomDataPath()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + public Map getShardIdsWithCustomDataPath() { + return shardIdsWithCustomDataPath; + } + } + + /** + * NodeStoreFilesMetadataBatch Response received by the node from other node for this transport action. + * Refer {@link TransportNodesAction} + */ + public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse { + private final Map nodeStoreFilesMetadataBatch; + + protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException { + super(in); + this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new); + } + + public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map nodeStoreFilesMetadataBatch) { + super(node); + this.nodeStoreFilesMetadataBatch = nodeStoreFilesMetadataBatch; + } + + public Map getNodeStoreFilesMetadataBatch() { + return this.nodeStoreFilesMetadataBatch; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + } + +} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java deleted file mode 100644 index 4404a728a8f1c..0000000000000 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices.store; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.IndexService; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.seqno.ReplicationTracker; -import org.opensearch.index.seqno.RetentionLease; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.ShardId; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.IndicesService; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * Helper class for common code used in {@link TransportNodesListShardStoreMetadata} and - * {@link TransportNodesBatchListShardStoreMetadata} which are used to list the store metadata of a shard - * on a set of nodes. - * - * @opensearch.internal - */ -public class TransportNodesListShardStoreMetadataHelper { - /** - * Metadata for store files - * - * @opensearch.internal - */ - public static class StoreFilesMetadata implements Iterable, Writeable { - private final ShardId shardId; - private final Store.MetadataSnapshot metadataSnapshot; - private final List peerRecoveryRetentionLeases; - - public StoreFilesMetadata( - ShardId shardId, - Store.MetadataSnapshot metadataSnapshot, - List peerRecoveryRetentionLeases - ) { - this.shardId = shardId; - this.metadataSnapshot = metadataSnapshot; - this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; - } - - public StoreFilesMetadata(StreamInput in) throws IOException { - this.shardId = new ShardId(in); - this.metadataSnapshot = new Store.MetadataSnapshot(in); - this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); - metadataSnapshot.writeTo(out); - out.writeList(peerRecoveryRetentionLeases); - } - - public ShardId shardId() { - return this.shardId; - } - - public boolean isEmpty() { - return metadataSnapshot.size() == 0; - } - - @Override - public Iterator iterator() { - return metadataSnapshot.iterator(); - } - - public boolean fileExists(String name) { - return metadataSnapshot.asMap().containsKey(name); - } - - public StoreFileMetadata file(String name) { - return metadataSnapshot.asMap().get(name); - } - - /** - * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. - */ - public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { - assert node != null; - final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); - return peerRecoveryRetentionLeases.stream() - .filter(lease -> lease.id().equals(retentionLeaseId)) - .mapToLong(RetentionLease::retainingSequenceNumber) - .findFirst() - .orElse(-1L); - } - - public List peerRecoveryRetentionLeases() { - return peerRecoveryRetentionLeases; - } - - /** - * @return commit sync id if exists, else null - */ - public String syncId() { - return metadataSnapshot.getSyncId(); - } - - @Override - public String toString() { - return "StoreFilesMetadata{" - + ", shardId=" - + shardId - + ", metadataSnapshot{size=" - + metadataSnapshot.size() - + ", syncId=" - + metadataSnapshot.getSyncId() - + "}" - + '}'; - } - } - - /** - * Function to fetch metadata of the store files on the local node. - * @param logger - * @param indicesService - * @param clusterService - * @param shardId - * @param shardDataPathInRequest - * @param settings - * @param nodeEnv - * @return - * @throws IOException - */ - public static StoreFilesMetadata getStoreFilesMetadata( - Logger logger, - IndicesService indicesService, - ClusterService clusterService, - ShardId shardId, - String shardDataPathInRequest, - Settings settings, - NodeEnvironment nodeEnv - ) throws IOException { - boolean exists = false; - long startTimeNS = System.nanoTime(); - try { - IndexService indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null) { - IndexShard indexShard = indexService.getShardOrNull(shardId.id()); - if (indexShard != null) { - try { - final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( - shardId, - indexShard.snapshotStoreMetadata(), - indexShard.getPeerRecoveryRetentionLeases() - ); - exists = true; - return storeFilesMetadata; - } catch (org.apache.lucene.index.IndexNotFoundException e) { - logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } catch (IOException e) { - logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } - } - } - final String customDataPath; - if (shardDataPathInRequest != null) { - customDataPath = shardDataPathInRequest; - } else { - // TODO: Fallback for BWC with older predecessor (ES) versions. - // Remove this once request.getCustomDataPath() always returns non-null - if (indexService != null) { - customDataPath = indexService.getIndexSettings().customDataPath(); - } else { - IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - if (metadata != null) { - customDataPath = new IndexSettings(metadata, settings).customDataPath(); - } else { - logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); - } - } - } - final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); - if (shardPath == null) { - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } - // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: - // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica - // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not - // reuse local resources. - final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( - shardPath.resolveIndex(), - shardId, - nodeEnv::shardLock, - logger - ); - // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when - // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. - return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); - } finally { - TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); - if (exists) { - logger.debug("{} loaded store meta data (took [{}])", shardId, took); - } else { - logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); - } - } - } - -} diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index 1e33eca425e87..cda8e0b1c5953 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -68,7 +68,6 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.snapshots.SnapshotShardSizeInfo; import java.util.ArrayList; @@ -665,7 +664,7 @@ static String randomSyncId() { class TestAllocator extends ReplicaShardAllocator { - private Map data = null; + private Map data = null; private AtomicBoolean fetchDataCalled = new AtomicBoolean(false); public void clean() { @@ -703,7 +702,7 @@ TestAllocator addData( } data.put( node, - new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + new TransportNodesListShardStoreMetadata.StoreFilesMetadata( shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases @@ -721,7 +720,7 @@ protected AsyncShardFetch.FetchResult tData = null; if (data != null) { tData = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { + for (Map.Entry entry : data.entrySet()) { tData.put( entry.getKey(), new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(entry.getKey(), entry.getValue()) diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 0e9403255fa66..b11e8554027b1 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -86,7 +86,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.test.DummyShardLock; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.IndexSettingsModule; @@ -958,8 +958,8 @@ public void testStreamStoreFilesMetadata() throws Exception { ) ); } - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata outStoreFileMetadata = - new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( + TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata = + new TransportNodesListShardStoreMetadata.StoreFilesMetadata( new ShardId("test", "_na_", 0), metadataSnapshot, peerRecoveryRetentionLeases @@ -972,8 +972,8 @@ public void testStreamStoreFilesMetadata() throws Exception { ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); in.setVersion(targetNodeVersion); - TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata inStoreFileMetadata = - new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); + TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata = + new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in); Iterator outFiles = outStoreFileMetadata.iterator(); for (StoreFileMetadata inFile : inStoreFileMetadata) { assertThat(inFile.name(), equalTo(outFiles.next().name()));