Skip to content

Commit

Permalink
Remove the helper function for rsa transport and write the logic in t…
Browse files Browse the repository at this point in the history
…he class file.

Signed-off-by: sudarshan baliga <[email protected]>
  • Loading branch information
sudarshan-baliga committed Jul 14, 2023
1 parent 13c1229 commit 9e1f0ac
Show file tree
Hide file tree
Showing 7 changed files with 724 additions and 600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -223,7 +223,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand Down Expand Up @@ -357,7 +357,7 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore(
private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore(
DiscoveryNode node,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data
) {
Expand All @@ -373,7 +373,7 @@ private MatchingNodes findMatchingNodes(
RoutingAllocation allocation,
boolean noMatchFailedNodes,
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> data,
boolean explain
) {
Expand All @@ -386,8 +386,7 @@ private MatchingNodes findMatchingNodes(
&& shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) {
continue;
}
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue()
.storeFilesMetadata();
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata();
// we don't have any files at all, it is an empty index
if (storeFilesMetadata.isEmpty()) {
continue;
Expand Down Expand Up @@ -443,8 +442,8 @@ private MatchingNodes findMatchingNodes(
}

private static long computeMatchingBytes(
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata
) {
long sizeMatched = 0;
for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) {
Expand All @@ -457,18 +456,18 @@ private static long computeMatchingBytes(
}

private static boolean hasMatchingSyncId(
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
) {
String primarySyncId = primaryStore.syncId();
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
}

private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
DiscoveryNode replicaNode,
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
) {
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
Expand All @@ -479,7 +478,7 @@ private static MatchingNode computeMatchingNode(
}

private static boolean canPerformOperationBasedRecovery(
TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore,
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores,
DiscoveryNode targetNode
) {
Expand Down
Loading

0 comments on commit 9e1f0ac

Please sign in to comment.