Skip to content

Commit

Permalink
Cleaned up fix for hasInitiatedFetching
Browse files Browse the repository at this point in the history
  • Loading branch information
Rahul Karajgikar committed Jul 22, 2024
1 parent 8ae728c commit 2a43b36
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,210 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
ensureGreen("test");
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception {
// Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are returning NO
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), false).build()
);
internalCluster().startDataOnlyNodes(5);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4)
.build()
);
ensureGreen("test");
ensureStableCluster(6);

// Stop one of the nodes to make the cluster yellow
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to INDEX_CREATED
List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));

ensureStableCluster(5);
ensureYellow("test");

logger.info("--> calling allocation explain API");
// shard should have decision NO because there is no valid node for the extra replica to go to
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

// Now creating a new index with too many replicas and trying again
createIndex(
"test2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5)
.build()
);

ensureYellowAndNoInitializingShards("test2");

logger.info("--> calling allocation explain API again");
// shard should have decision NO because there are 6 replicas and 4 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

logger.info("--> restarting the stopped node");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
);

ensureStableCluster(6);
ensureGreen("test");

logger.info("--> calling allocation explain API 3rd time");
// shard should still have decision NO because there are 6 replicas and 5 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

internalCluster().startDataOnlyNodes(1);

ensureStableCluster(7);
ensureGreen("test2");
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception {
// Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are returning NO
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true).build()
);
internalCluster().startDataOnlyNodes(5);
createIndex(
"test",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4)
.build()
);
ensureGreen("test");
ensureStableCluster(6);

// Stop one of the nodes to make the cluster yellow
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to INDEX_CREATED
List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));

ensureStableCluster(5);
ensureYellow("test");

logger.info("--> calling allocation explain API");
// shard should have decision NO because there is no valid node for the extra replica to go to
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

// Now creating a new index with too many replicas and trying again
createIndex(
"test2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5)
.build()
);

ensureYellowAndNoInitializingShards("test2");

logger.info("--> calling allocation explain API again");
// shard should have decision NO because there are 6 replicas and 4 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

logger.info("--> restarting the stopped node");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
);

ensureStableCluster(6);
ensureGreen("test");

logger.info("--> calling allocation explain API 3rd time");
// shard should still have decision NO because there are 6 replicas and 5 data nodes
assertEquals(
AllocationDecision.NO,
client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision()
.getAllocationDecision()
);

internalCluster().startDataOnlyNodes(1);

ensureStableCluster(7);
ensureGreen("test2");
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public synchronized void clearShard(ShardId shardId) {
this.cache.deleteShard(shardId);
}

public boolean hasEmptyCache() {
return this.cache.getCache().isEmpty();
}

public AsyncShardFetchCache<T> getCache() {
return this.cache;
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,29 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
/**
* This function is currently only used in the case of replica shards when all deciders returned NO/THROTTLE, and explain mode is ON.
* Since we already know that the decision is not YES, we have 2 options:
* 1. Return early decision NO/THROTTLE without shard store information. Shard store information shows the number of matching bytes per node.
* 2. In case the shard store information is present, we can get this shard store information along with the NO/THROTTLE decision
* This function checks for option 2 - we check the shard batch cache to see if we already have data from all the nodes
* If the shard batch cache is empty, we know that fetch has never happened so we return false
* If we see that the list of nodes to fetch from is empty, we know that all nodes have data in the cache, and we can use their shard store info, so we return true.
* This function should never actually trigger a new async fetch, it should only use existing fetched data if it exists
* see {@link AsyncShardFetchCache#findNodesToFetch()}
*/
String batchId = getBatchId(shard, shard.primary());
return batchId != null;
logger.trace("Checking if fetching done for batch id {}", batchId);
ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId);

// if fetchData has never been called, the per node cache will be empty and have no nodes
// this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData
if (shardsBatch.getAsyncFetcher().hasEmptyCache()) {
logger.trace("Batch cache is empty for batch {} ", batchId);
return false;
}
// this check below is to make sure we already have all the data and that we wouldn't create a new async fetchData call
return shardsBatch.getAsyncFetcher().getCache().findNodesToFetch().isEmpty();
}
}

Expand Down

0 comments on commit 2a43b36

Please sign in to comment.