Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Batch Fetch] Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs #14972

Merged

Conversation

rahulkarajgikar
Copy link
Contributor

@rahulkarajgikar rahulkarajgikar commented Jul 25, 2024

Description

This change fixes the bug in hasInitiatedFetching() where it always returns true in batch mode, causing allocation explain to show AWAITING_INFO even for shards for which decider is returning NO.

The idea here is we want to populate shard store information (number of matching bytes per node) IN ADDITION to the decision made by deciders.
This function is meant to check if it is possible to get the above information, without triggering a new asyncFetch.

The intended behaviour is for this function to return true if a fetch has ever happened before, or is ongoing for a batch.
It should return false if there has never been a fetch for this batch.
This is so that it does not trigger a new asyncFetch call, but also uses the data from existing ones or wait for ongoing ones to populate shard store info.

Details of above points are mentioned in [#14903]

We start by comparing batch mode vs non-batch mode implementations.

Non-batch mode logic for deciding if shard is eligible for fetching

public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
final RoutingAllocation allocation,
final Logger logger
) {
if (isResponsibleFor(unassignedShard) == false) {
// this allocator is not responsible for deciding on this shard
return AllocateUnassignedDecision.NOT_TAKEN;
}
// pre-check if it can be allocated to any node that currently exists, so we won't list the store for it for nothing
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(unassignedShard, allocation);
Decision allocateDecision = result.v1();
if (allocateDecision.type() != Decision.Type.YES
&& (allocation.debugDecision() == false || hasInitiatedFetching(unassignedShard) == false)) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", unassignedShard);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores = fetchData(unassignedShard, allocation);
Map<DiscoveryNode, StoreFilesMetadata> nodeShardStores = convertToNodeStoreFilesMetadataMap(shardStores);
return getAllocationDecision(unassignedShard, allocation, nodeShardStores, result, logger);

Batch mode logic for deciding if shard is eligible for fetching - logic is same.

private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
ShardRouting shardRouting,
RoutingAllocation allocation,
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
) {
if (!isResponsibleFor(shardRouting)) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
final boolean explain = allocation.debugDecision();
Decision allocationDecision = result.v1();
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
);
}
if (nodeStoreFileMetaDataMapSupplier != null) {
Map<DiscoveryNode, StoreFilesMetadata> discoveryNodeStoreFilesMetadataMap = nodeStoreFileMetaDataMapSupplier.get();
return getAllocationDecision(shardRouting, allocation, discoveryNodeStoreFilesMetadataMap, result, logger);
}
return null;

hasInitiatedFetching() has a different implementation though - this is the key difference. In non batch-mode it works as intended, but in batch mode it always returns true.

Details of fix for hasinitiatedFetching()

Implementation in non-batch mode:

protected boolean hasInitiatedFetching(ShardRouting shard) {
return asyncFetchStore.get(shard.shardId()) != null;
}

Here, we check if asyncFetchStore has an entry for this shardId.
The intention here is to check if fetchData() call has ever happened for this shard before.

asyncFetchStore entries are populated as part of GatewayAllocator$InternalReplicaShardAllocator.fetchData()
We are essentially using the fact that if GatewayAllocator$InternalReplicaShardAllocator.fetchData() was ever called for this shard, then asyncFetchStore would have an entry

class InternalReplicaShardAllocator extends ReplicaShardAllocator {
private final TransportNodesListShardStoreMetadata storeAction;
InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction) {
this.storeAction = storeAction;
}
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(
ShardRouting shard,
RoutingAllocation allocation
) {
AsyncShardFetch<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetch = asyncFetchStore.computeIfAbsent(
shard.shardId(),
shardId -> new InternalAsyncFetch<>(
logger,
"shard_store",
shard.shardId(),
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
storeAction
)
);
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> shardStores = fetch.fetchData(
allocation.nodes(),
new HashMap<>() {
{
put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId()));
}
}
);
if (shardStores.hasData()) {
shardStores.processAllocation(allocation);
}
return shardStores;
}
@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
return asyncFetchStore.get(shard.shardId()) != null;
}
}
}

For batch mode, we also need a similar idea - we need to know if fetchData() call has ever happened for this batch before.
Let's check the corresponding code for fetchData in batch mode and see if there is any property we can check to establish this fact.

Implementation in batch mode:

Corresponding to asyncFetchStore in non-batch mode, we have batchIdToStoreShardBatch.
However, the logic in batch mode is different, this structure batchIdToStoreShardBatch is defined when we create the batches, which happens before any fetching/assignment logic is called.

Reroute flow:

// create batches for unassigned shards
Set<String> batchesToAssign = createAndUpdateBatches(allocation, primary);
if (batchesToAssign.isEmpty()) {
return null;

Allocation explain flow:

public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
assert unassignedShard.unassigned();
assert routingAllocation.debugDecision();
if (getBatchId(unassignedShard, unassignedShard.primary()) == null) {
createAndUpdateBatches(routingAllocation, unassignedShard.primary());
}
assert getBatchId(unassignedShard, unassignedShard.primary()) != null;
if (unassignedShard.primary()) {
assert primaryShardBatchAllocator != null;
return primaryShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else {
assert replicaShardBatchAllocator != null;
return replicaShardBatchAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
}
}

Batch creation logic:

Line 349 - addBatch updates this batchIdToStoreShardBatch with the entry.

while (iterator.hasNext()) {
ShardRouting currentShard = iterator.next();
ShardEntry shardEntry = new ShardEntry(
new ShardAttributes(
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings())
),
currentShard
);
perBatchShards.put(currentShard.shardId(), shardEntry);
batchSize--;
iterator.remove();
// add to batch if batch size full or last shard in unassigned list
if (batchSize == 0 || iterator.hasNext() == false) {
String batchUUId = UUIDs.base64UUID();
ShardsBatch shardsBatch = new ShardsBatch(batchUUId, perBatchShards, primary);
// add the batch to list of current batches
addBatch(shardsBatch, primary);
batchesToBeAssigned.add(batchUUId);
perBatchShards.clear();
batchSize = maxBatchSize;
}
}
return batchesToBeAssigned;

private void addBatch(ShardsBatch shardsBatch, boolean primary) {
ConcurrentMap<String, ShardsBatch> batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
if (batches.containsKey(shardsBatch.getBatchId())) {
throw new IllegalStateException("Batch already exists. BatchId = " + shardsBatch.getBatchId());
}
batches.put(shardsBatch.getBatchId(), shardsBatch);
}

Since batchIdToStoreShardBatch is getting updated before fetchData(), we cannot use it to determine if fetching has happened before.

Corresponding to GatewayAllocator$InternalReplicaShardAllocator.fetchData() in non-batch mode, we have ShardsBatchGatewayAllocator$InternalReplicaBatchShardAllocator.fetchData() in batch mode.
Let's check the implementation of this function in batch mode to see if there are other ways.

class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator {
@Override
@SuppressWarnings("unchecked")
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch> fetchData(
List<ShardRouting> eligibleShards,
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
return (AsyncShardFetch.FetchResult<
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch>) fetchDataAndCleanIneligibleShards(
eligibleShards,
inEligibleShards,
allocation
);
}

Nothing we can use here either:

AsyncShardFetch.FetchResult<? extends BaseNodeResponse> fetchDataAndCleanIneligibleShards(
List<ShardRouting> eligibleShards,
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
// get batch id for anyone given shard. We are assuming all shards will have same batchId
ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
String batchId = getBatchId(shardRouting, shardRouting.primary());
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}
ConcurrentMap<String, ShardsBatch> batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
if (batches.containsKey(batchId) == false) {
logger.debug("Batch {} has no shards batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no shards batch");
}
ShardsBatch shardsBatch = batches.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary()));
if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncShardBatchFetch<? extends BaseNodeResponse, ?> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncShardFetch.FetchResult<? extends BaseNodeResponse> fetchResult = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);
if (fetchResult.hasData()) {
fetchResult.processAllocation(allocation);
}
return fetchResult;

In the case the batch is empty or all nodes are ignored, asyncFetcher.fetchData() will not get called.
If the batch is non empty, then asyncFetcher.fetchData() will get called. (line 619)

asyncFetcher is of type AsyncShardBatchFetch which extends AsyncShardFetch class, but does not override fetchData(). So we can check AsyncShardFetch.fetchData() for the exact implementation here.

public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId, Set<String>> ignoreNodes) {

cache.fillShardCacheWithDataNodes(nodes);
List<String> nodeIds = cache.findNodesToFetch();
if (nodeIds.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them
// use a unique round id to detect stale responses in processAsyncFetch
final long fetchingRound = round.incrementAndGet();
cache.markAsFetching(nodeIds, fetchingRound);
DiscoveryNode[] discoNodesToFetch = nodeIds.stream().map(nodes::get).toArray(DiscoveryNode[]::new);
asyncFetch(discoNodesToFetch, fetchingRound);
}

We can see in lines 172 that the batch cache entries are created for any missing nodes here.
In line 173, we see if any node still needs to be fetched.
And if so, we mark those nodes as fetching and trigger asyncFetch()

private final Map<String, NodeEntry<V>> cache;

Structure of batch cache ^

void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
// verify that all current data nodes are there
for (final DiscoveryNode node : nodes.getDataNodes().values()) {
if (getCache().containsKey(node.getId()) == false) {
initData(node);
}
}
// remove nodes that are not longer part of the data nodes set
getCache().keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId));
}

initData() has custom implementation for batch cache:

@Override
public void initData(DiscoveryNode node) {
cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, emptyShardResponsePredicate));
}

So if this code has run and triggered the async Fetch, we can conclude that the following statements will be true:

  1. The batch cache will be non empty and have at least 1 entry.
  2. All nodes currently part of the cluster will either have data in the cache already, or be marked as fetching and have a fetch ongoing.

So we can use these 2 facts as an invariant to see if any async fetching has actually happened before for a batch.
We add checks for these 2 facts in hasInitiatedFetching() to validate if a fetch has happened before or not.

To check 1, we use:

if (shardsBatch.getAsyncFetcher().hasEmptyCache()) {
                logger.trace("Batch cache is empty for batch {} ", batchId);
                return false;
}

To check 2, we use the findNodesToFetch() function.

This function returns all nodes that have no data and also have no fetches initiated. So if we have a case where this function returns even 1 node, we can say return false from hasInitiatedFetching(), because we don't info from all nodes to populate all the shard store information.

List<String> findNodesToFetch() {
List<String> nodesToFetch = new ArrayList<>();
for (BaseNodeEntry nodeEntry : getCache().values()) {
if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
nodesToFetch.add(nodeEntry.getNodeId());
}
}
return nodesToFetch;
}

Related Issues

Resolves #14903

Check List

  • [Y] Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Testing

Added a new UT for batch mode and non batch mode to simulate allocation explain case.
Verified that allocation explain returns NO decision and does not show AWAITING_INFO.

Also manually tested manual reroutes on a 15 data node, 3 master node setup, and verified that shards with decision NO are not getting added to the batches now.

To repro this:

  • Create a cluster with 15 data nodes, 3 master nodes
  • Create an index with 1 primary and 14 replicas
  • After cluster goes green, shut down 1 node
  • Cluster should go yellow now. Run allocation explain - it should show decision NO.
  • Run manual reroute, no asyncFetch() should get triggered. (verified with inFlightFetches metric and arthas profiling)

Copy link
Contributor

❌ Gradle check result for 2a43b36: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Contributor

❌ Gradle check result for f565b94: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rahulkarajgikar rahulkarajgikar changed the title [Batch Fetch] Fix for hasInitiatedFetching() to fix allocation explain and manual reroute APIs [Batch Fetch] Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs Jul 25, 2024
Copy link
Contributor

❌ Gradle check result for a49b8a4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link

codecov bot commented Jul 26, 2024

Codecov Report

Attention: Patch coverage is 9.09091% with 10 lines in your changes missing coverage. Please review.

Project coverage is 71.76%. Comparing base (1fe58b5) to head (f1bcdc8).
Report is 6 commits behind head on main.

Files Patch % Lines
...pensearch/gateway/ShardsBatchGatewayAllocator.java 0.00% 8 Missing ⚠️
...a/org/opensearch/gateway/AsyncShardBatchFetch.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #14972      +/-   ##
============================================
- Coverage     71.78%   71.76%   -0.03%     
- Complexity    62694    62706      +12     
============================================
  Files          5160     5161       +1     
  Lines        294211   294370     +159     
  Branches      42553    42579      +26     
============================================
+ Hits         211212   211263      +51     
- Misses        65599    65728     +129     
+ Partials      17400    17379      -21     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Rahul Karajgikar <[email protected]>
Copy link
Contributor

❌ Gradle check result for b29502b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rahulkarajgikar
Copy link
Contributor Author

Failing tests in gradle check are flaky:

#14311
#12408

org.opensearch.http.SearchRestCancellationIT.testAutomaticCancellationDuringFetchPhase
org.opensearch.indexing.IndexActionIT.testAutoGenerateIdNoDuplicates {p0={"cluster.indices.replication.strategy":"SEGMENT"}}

Signed-off-by: Rahul Karajgikar <[email protected]>
Copy link
Contributor

❕ Gradle check result for f1bcdc8: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@rahulkarajgikar
Copy link
Contributor Author

Issue already exists for flaky test:

#12802

@shwetathareja shwetathareja added the backport 2.x Backport to 2.x branch label Jul 29, 2024
@shwetathareja shwetathareja merged commit d08c425 into opensearch-project:main Jul 29, 2024
35 of 42 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Jul 29, 2024
…and manual reroute APIs (#14972)

* Fix for hasInitiatedFetching() in batch mode

Signed-off-by: Rahul Karajgikar <[email protected]>
(cherry picked from commit d08c425)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
shwetathareja pushed a commit that referenced this pull request Jul 29, 2024
…and manual reroute APIs (#14972) (#14994)

* Fix for hasInitiatedFetching() in batch mode


(cherry picked from commit d08c425)

Signed-off-by: Rahul Karajgikar <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
@rahulkarajgikar rahulkarajgikar deleted the asf_batch_bug_fix branch July 30, 2024 05:43
harshavamsi pushed a commit to harshavamsi/OpenSearch that referenced this pull request Aug 20, 2024
…and manual reroute APIs (opensearch-project#14972)

* Fix for hasInitiatedFetching() in batch mode

Signed-off-by: Rahul Karajgikar <[email protected]>
wdongyu pushed a commit to wdongyu/OpenSearch that referenced this pull request Aug 22, 2024
…and manual reroute APIs (opensearch-project#14972)

* Fix for hasInitiatedFetching() in batch mode

Signed-off-by: Rahul Karajgikar <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport 2.x Backport to 2.x branch bug Something isn't working Cluster Manager
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

[BUG] [Batch Mode] Allocation explain API is stuck in AWAITING_INFO even though deciders are returning NO
3 participants