Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FetchData changes for primaries and replicas #8865

Closed
wants to merge 2 commits into from

Conversation

Gaurav614
Copy link
Contributor

Description

This pull request is part of the improvement #5098
It is mainly focussed around fetching the Data for PSA and RSA for eligible shards

The PR is dependent on following PRs:
#8742
#8218
#8356
#8746

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff
  • Commit changes are listed out in CHANGELOG.md file (See: Changelog)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this variable to shardBatchStore to represent that this contains shard store address of primary shard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store suffix is used in conjuction with replicas in code base

return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
}

String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is shard was started or failed in between, we may get null here. So should we iterate on all eligible shards to get the batchId ? relying on first one may be incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not possible single threaded system

@@ -335,10 +396,54 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
}
return shardStores;
}
}

class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement hasInitiatedFetching function in this class or should I pick this whole Internal class in my PR?

@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2023

Compatibility status:

Checks if related components are compatible with change b7e2119

Incompatible components

Skipped components

Compatible components

@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2023

Gradle Check (Jenkins) Run Completed with:

@opensearch-trigger-bot
Copy link
Contributor

This PR is stalled because it has been open for 30 days with no activity.

@opensearch-trigger-bot opensearch-trigger-bot bot added the stalled Issues that have stalled label Oct 6, 2023
Copy link

@khushbr khushbr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add UTs with the next revision of this PR.

@@ -55,9 +55,15 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ typo at the beginning of line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Comment on lines +87 to +88
private final PrimaryShardBatchAllocator primaryBatchShardAllocator;
private final ReplicaShardBatchAllocator replicaBatchShardAllocator;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us stay consistent in our naming. The 'Batch' and 'Shard' in class name and variable name are inverted.
I prefer ShardBatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will update this later once PRs/tasks for Allocators are merged/approved to avoid any back and forth


private final ConcurrentMap<
ShardId,
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
.newConcurrentMap();
.newConcurrentMap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Fix the syntax. Add back the tab spacing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@@ -303,6 +313,59 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
}
}


class InternalPrimaryBatchShardAllocator extends PrimaryShardBatchAllocator {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us fix the naming here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, same comment as above


@Override
@SuppressWarnings("unchecked")
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to eligibleShards and inEligibleShards ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, use DiscoveryNodes.EMPTY_NODES instead of null value for DiscoveryNodes param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replied same as above

Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

for (ShardId shardId : shardsBatch.asyncBatch.shardToCustomDataPath.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the shardToIgnoreNodes map have empty (set) values ? Can we ignore adding the entry in such cases?

Ref:

public Set<String> getIgnoreNodes(ShardId shardId) {
if (ignoredShardToNodes == null) {
return emptySet();
}
Set<String> ignore = ignoredShardToNodes.get(shardId);
if (ignore == null) {
return emptySet();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we ignore it will be later created by AsyncShardFetch object for completeness sake.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you link the code where we are adding the entries with empty set.

Is there scope to optimize here - Avoid creating the empty sets that server no purpose ?

shardToIgnoreNodes
);

if (shardBatchState.hasData()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenario will shardBatchState not have data ? Should we add a log statement for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the fetching is still in progress/failure.

What you want to log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a log statement for it?

It'll start creating too many logs, let's avoid that.


@Override
@SuppressWarnings("unchecked")
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Copy link

@khushbr khushbr Dec 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand, the method fetchData(Set<ShardRouting> shardsEligibleForFetch, Set<ShardRouting> inEligibleShards, RoutingAllocation allocation) is trying to fetch the Response for a single batch or across all the batches ?

I am assuming it is former and the whole convulated logic of shardId -> shardRouting -> batchID -> shardsBatch -> shardBatchState is for the method override. If this is true, then can we:

  1. Rename shardsEligibleForFetch to eligibleShardsInBatch , inEligibleShards to ineligibleShardsInBatch
  2. Split this into 2 method, fetchDataShardsBatch and fetchData :
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchDataShardsBatch (
    Set<ShardRouting> eligibleShardsInBatch,
    Set<ShardRouting> ineligibleShardsInBatch,
    RoutingAllocation allocation) {
  ...
  ShardsBatch shardsBatch = ...
  return fetchData(ShardsBatch, allocation);
}

AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData (
    ShardsBatch shardsBatch,
    RoutingAllocation allocation) {
...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Renaming part if fine.
  2. On second point, that can be a good suggestion only if we have two such functions that cater to fetchData of both primaries and replicas, since both of the fetchData in original code is doing exact same things. Otherwise if follow the above approach as you suggested then dont you think it will overkill since it will lead to 4 different methods(2 each for replicas and primaries)that will be used at single place and have wont be having any reuse.

So extending on your though process, if we can do this then we can avoid some code duplication-

    @Override
        @SuppressWarnings("unchecked")
        protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedBatchShards.NodeGatewayStartedShardsBatch> fetchData(
            Set<ShardRouting> shardsEligibleForFetch,
            Set<ShardRouting> inEligibleShards,
            RoutingAllocation allocation
        )
     ShardsBatch shardsbatch=fetchDataShardBatch(shardsEligibleForFetch, inEligibleShards)
     fetchDataForShardBatch(shardsbatch, shardsbatch.primary())

Same as above for replicas

And based on that we will implement two more methods fetchDataShardBatch to get batch of set of shards
and then a generic response from this method fetchDataForShardBatch, which will be later type casted by repected fetchData() call of primaries/replicas

Comment on lines +452 to +453
String batchId = getBatchId(shard, shard.primary());
return batchId!=null;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge into getBatchId(shard, shard.primary()) != null; ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@ticheng-aws ticheng-aws added the enhancement Enhancement or improvement to existing feature or request label Jan 5, 2024
@ticheng-aws
Copy link
Contributor

Hi @Gaurav614, Is this being worked upon? Pls free to reach out to maintainers for further reviews.

@opensearch-trigger-bot opensearch-trigger-bot bot removed the stalled Issues that have stalled label Jan 12, 2024
@Gaurav614
Copy link
Contributor Author

Changes in this PR are not needed since we have refactored the changes into this PR:https://github.com/opensearch-project/OpenSearch/pull/8746/files

@Gaurav614 Gaurav614 closed this Feb 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants