Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Design] Splitting Shards In-Place #13923

Open
vikasvb90 opened this issue Jun 1, 2024 · 6 comments
Open

[Design] Splitting Shards In-Place #13923

vikasvb90 opened this issue Jun 1, 2024 · 6 comments
Assignees
Labels
Indexing:Replication Issues and PRs related to core replication framework eg segrep Indexing Indexing, Bulk Indexing and anything related to indexing RFC Issues requesting major changes Roadmap:Cost/Performance/Scale Project-wide roadmap label Scale ShardManagement:Sizing

Comments

@vikasvb90
Copy link
Contributor

vikasvb90 commented Jun 1, 2024

Introduction

In RFC #12918, we requested feedback on the proposal of splitting shards in-place without any data loss or without the need to stop write traffic. We mentioned two approaches and expressed bias on the physical shard split approach with supporting data points taken from various benchmarks. In this document we will dive deep in the design of physically splitting shards in-place.

OpenSearch provides capability to split an index into a new index with more number of shards. This is achieved by first blocking write traffic on the index by putting write block on it and then all shards are recovered locally by executing a split query on each shard of the source index. High level steps of splitting of a shard in this process are as follows:

  1. Local snapshot is created on the source shard in which the latest commit is acquired .
  2. Segments files from this commit are hardlinked into child shard directories.
  3. A new IndexWriter is created and opened in CREATE mode.
  4. ShardSplittingQuery is executed which essentially executes a delete by query and deletes documents which do not belong to the respective child shard. This is done by passing the predicate from OperationRouting which provides the shard ID for a given doc ID. If this shard Id doesn’t match with the shard ID of child shard then the document is discarded.
  5. IndexWriters are committed and child shards are then bootstrapped by creating new translog UUIDs, updating user data on IndexWriter with max seq no., checkpoint, max auto id timestamp, new history UUID, etc.
  6. Engine is opened, translog ops are replayed, shard is set to POST_RECOVERY and shard started action is published to master.
  7. Assigned shards node receives the update from master and puts the shard in STARTED state.

High Level Approach

Since shard split is already implemented, we can reuse shard splitting components like ShardSplittingQuery and hardlinking to build in-place shard split. Major areas which need to be covered to build this are:

1. Recovering shards while handling write traffic i.e. online recovery of child shards.
2. Filtering out non-belonging operations on child shards during recovery.
3. Changes in operation routing to route docs to right child shards.
4. Handling shard routing changes of splitting primary on split start, during split and post recovery.

Online Recovery of Child Shards

Note: We will leaving out details of some of the actions performed during recovery to avoid overloading this doc with too much information. Goal is to provide a general high level sense of how recovery of in-place shard would look like.

This part of the design is inspired by the peer recovery flow which ensures safe relocation of primary or replica while handling read and write traffic. We will first outline different stages of existing peer recovery flow and then talk about how we can leverage these to build in-place shard split. Peer recovery starts when a new shard routing is received in the cluster state update event on the target node. A new instance of IndexShard is created on the target node and after setting the shard to the right state a recovery request is sent to the source node where source shard is present. PeerRecoverySourceService receives this transport request and creates a new RecoverySourceHandler which drives the entire recovery process. RecoverySourceHandler consists of certain abstractions which are implemented by respective replication type source handlers and also provides some default implementations common to all source handlers. Similarly, on the target node a target service handles events published by the source handler which are delegated to respective recovery target implementations. Recovery process which is driven by the source handler is divided into following stages and depending on the replication type - DocRep, SegRep or SegRep + Remote Store, these stages may or may not be applicable to the recovery.

  1. Decision between sequence number based recovery and file based recovery: In this step, if a retention lease of the target recovering shard is already present and if source shard contains operations with sequence number greater than or equal to the retaining sequence number found in the retention lease of target shard, then decision is taken to perform sequence number based recovery i.e. copying segments files are skipped and only operations above retaining sequence number are replayed. Otherwise, it fall-backs to file based recovery. This stage is not applicable to SegRep+Remote Store replication because retention leases are not functional on replicas in this replication type.
  2. Acquire history retention lock: A retention lock is obtained to retain all operations on shard’s Lucene index from this point onwards. This stage is not applicable to SegRep+Remote Store replication type because in case of failover when replica is promoted to primary, translogs are downloaded from remote.
  3. Acquire last safe commit: Safe commit is the commit whose max sequence no. is at most the persisted global checkpoint. Last safe commit is acquired in this step and released once all files get copied to target node. This step is executed if retention lease of target shard is not found.
  4. Phase 1: This step involves copying over segment files to target shard directory from safe commit acquired in the previous step. On completion of this step, in DocRep and SegRep cases a new retention lease for the target shard is cloned from retention lease of source shard and added in replication tracker. The retention lease of target which was checked in Step 1 is nothing but the cloned lease we add at this point.
  5. Prepare engine on target shard: In this step a new read-write engine is created to start accepting operations. In case of remote store, translogs and segments are synced from remote as well in addition. On completion of this step, shard is set to tracking in the replication tracker. This means that all new operations on source shard will now also be replicated to target shard.
  6. Phase 2: We are now only left with operations to be applied which came in between acquiring retention lock and putting target shard for tracking. A snapshot of operations starting from seq no. 1 above the commit checkpoint’s local checkpoint is taken from Lucene and then these operations are replayed on target shard. This step is not applicable for Remote Store replication type because translogs are later synced from remote on target shard and replayed.
  7. Finalize Recovery: This step is common to all replication modes. In this step, target shard is marked as in sync ensuring that target shard’s checkpoint is at least equal or above global checkpoint. From this point, target shard starts participating in global checkpoint calculation. Pending flushes or refreshes are triggered on target shard, retention leases are persisted and gc deletes are enabled. After this, operations are blocked on source shard and relocation hand-off takes place. In Remote Store case, segments and translogs are re-synced (downloaded) on target shard. On completion of this step, recovery response is published on the waiting listener on target shard and shard started cluster state update is published.

Changes needed for recoveries of child shards

Online recovery of child shards is similar to the peer recovery of primary relocation but differs in this way that there are multiple different target shards having different shard ids to be recovered from a single source shard. To accommodate this and to reuse recovery mechanisms of all replication modes we would need to re-arrange and define some abstractions and override them for in-place child shard recoveries. At high level, these changes would be as follows:

  1. InPlaceShardSplitRecoverySourceHandler: Like other replication modes, child shard recoveries would need a new source handler to drive recoveries of all child shards. Send files step of Phase 1 of the recovery process in this case will consist of hardlinking segments in child shard directories from source shard directory and then segments files under each child shard directory is split using ShardSplittingQuery. As we saw in the previous section that some steps of recovery differ between different replication modes, to handle this we would need to define abstractions which can be used by child recovery to delegate to respective replication mode handler. Following are some of the abstractions to be used by this handler:

    1. Skip creation of cloned retention lease: We saw earlier that remote store replication doesn’t clone retention lease and hence, we need to skip its creation in this mode. But even in other modes we don’t need leases of target shards in child recovery. The purpose of this lease is to optimize peer recovery in fail over cases and resume it back from where we left off if possible. But in child recoveries in both cases of node failure and source shard failure, we don’t know if parent shard is going to be assigned back on the same old node. Parent shard and child shards need to be on the same node in order to create hardlinks of source shard segments in child shards directories. For the same reason, send files step is pretty quick and is much less heavier operation than in other replication modes and hence, at this point at least it doesn’t look like we need to optimize and therefore, we can always skip retention lease creation in child recoveries.
    2. Retention Lock Acquisition: Since remote store replication doesn’t need a retention lock, retention lock acquisition will vary between replication modes.
    3. Count number of history ops: This is the number of translog ops to be replayed in Phase 2. Since, Remote Store replication mode doesn’t have a Phase 2 step, this number is 0 for this case.
    4. Phase 2 snapshot: Since remote store replication doesn’t have a phase 2, this will be empty for remote.
  2. InPlaceShardSplitRecoveryTargetHandler: We will need a new target handler to handle actions triggered by source handler and execute them on all child shards. Following are some of the events which should be handled by the new target handler:

    1. Receiving files from source and splitting: In this event, source handler will publish file names in phase 1’s send files step. Target handler will receive file names, create hard links of the received file names and execute splitting query on all child shards.
    2. Preparing engine for translog ops: Engine will be opened on all child shards by the target handler to start receiving operations.
    3. Indexing translog ops on shards: This event will be executed by source handler in phase 1 and as mentioned before this will not be available for remote store replication type. Each event of this type will consist of a batch of operations and every batch will be executed on all child shards by the target handler. Filtering of non-belonging operation for a child shard will happen while applying translog operation in IndexShard.
    4. Finalizing Recovery: This will be overridden by target handler to be applied on all child shards. Other events which need to be handled as part of finalize are:
      1. Handing off primary context: During hand-off, primary context of parent shard is first copied to all child shards. Hand-off process involves blocking all incoming operations and replaying them on the new target shard after hand-off is completed. In case of primary relocation, replay of operations to target shard is done on the node which had source primary assigned but in case of child shard recoveries we would need to throw a custom exception i.e. ShardInParentModeException back to coordinator node to re-drive bulk operations based on the new operation routing. It is possible that coordinator is still serving from stale customer state. In this case, coordinator will re-drive operations again on parent shard. Coordinator will eventually receive the operations back to re-drive and if request is not yet timed out and if new cluster state is now available on coordinator, then they will now be re-driven on child shards.
      2. Force syncing segments files: Syncing segments on child shards from remote directory of parent.

Filtering out non-belonging operations on child shards

Since each child shard will own a part of hash space of parent shard and since in recovery flow all operations will be routed to tracked child shards from parent primary, we will need to filter only those operations which should fall into the hash space of the respective child shard. Additional compute due to filtering will only be incurred during recoveries - peer or store based. Following are the two components where we will need to filter operations:

  1. Transport bulk operations at shard level: Filtering at this stage will only happen if the shard is a recovering child shard. A recovering child shard can receive non belonging bulk operation only after it starts getting tracked in parent’s replication group and before it is started.
  2. Applying translog ops from translog snapshot: Translogs can consist of operations outside of child shard’s hash space in cases where child shard just recovered and was started after parent split where translog operations are still pending to be replayed. We don’t have control over when the translog ops can get replayed over a shard and hence we always execute the filter but since replays from translog only happen on initial seeding of a shard, it should be fine.

Compute cost of identifying whether an operation falls into the hash space of the shard or not will depend on the number of times a shard has been split. For e.g., if a shard was never split its filtering compute cost will be O(1) and if a shard was split n times then the cost to find right child shard after resolving parent shard id from doc id hash would be O(log(n)).
It should be noted that we also need to update shard’s local checkpoint for the filtered out operations as well so that child shards can be marked as in-sync in parent’s replication group during recovery based on their checkpoints. There are three categories of an operation - INDEX, DELETE and NO_OP (CREATE is deprecated). When a non-belonging INDEX operation is received, we create a new NO_OP operation and execute it in-place of INDEX op. With this, checkpoint also gets updated on the shard for the non-belonging operation. We don’t need to handle DELETE op since it is marked as no-op if document is not found on the shard.

Operation Routing Changes

Currently, an operation received on coordinator node is routed to its belonging shard by computing the shard ID based on operation doc id and routing (if provided). A Murmur3 32-bit hash is generated using these operation parameters and routing partition size. Floor mod is then computed between hash and number of shards (over simplified for understanding) to figure out the shard ID. An index setting called number_of_routing_shards is used to resize an index and it imposes certain constraints on the number of shards of the target index. For e.g., an index having 5 initial shards can only be split into indices with number of shards in multiples of 5. Also, there is a limit on how many times an index can be split.

To route operations on child shards, we will start maintaining hash ranges on child shards as child shards metadata. We will continue to use mod based resolution for seed shards (initial shards which were computed during index creation). This means that there won’t be any change in the routing execution if a shard isn’t split. To split a shard, hash range of the shard will be split into the number of splits required. After a split is completed, for an incoming operation, seed shard ID will be computed first by using mode of the operation hash against number of seed shards and then a binary search will be performed on the ranges of child shards to find the child shard which accommodates the given hash in its range.

As we saw earlier, there isn’t any impact of this change on shards which haven’t been split. We carried out micro-benchmarks and explored alternative approaches to route operation belonging to child shards. Details of the same are present in this Design doc of routing algorithm. Overall impact of operation routing on child shards is negligible (<<1% of indexing latency) and distribution quality of routing is also at par with the default routing algorithm. Compute cost of finding shard ID belonging to a child shard would be O(log(N)) [N=number of child shards under the seed shard]. For e.g., if a shard is split say 10 times and every time it was split into 2 shards down the tree, then compute cost incurred for operations on these shards would be log20 (base 2) = ~4.3. There are other benefits of using binary search over mod based approach for child shards which are listed in the linked doc.

Shard Routing Changes

With new shards being created in an index against the parent shard, routing changes will be required to create new shard routings based on the split request parameters and current allocations. Fundamentally, we will re-use the concept of creating sticky shard routing created for a relocating primary shard. We will create n child shard routings and stick them to the parent shard for splitting a shard into n number of child shards. In the end, child shard routings will be added to index shard routing table, sticky routings will be removed from parent routing and parent primary will be removed from the table.

We will also need to validate if a split of a shard into n shards is possible according to the current allocation constraints. Before adding split routings to cluster state, we will also need to execute allocation deciders to check if all allocation constraints are satisfied in the cluster. We will only proceed with split if we don’t get a NO decision from any allocation decider.

Replica Recoveries

In phase 1 of this feature, we don’t plan to split a shard of an index which has replicas configured. This means that in order to split a shard of an index having replica count > 0, user will need to change the replica count to 0. Naturally, there’s an availability impact with this and therefore, we will be launching this feature as experimental in phase 1 and we will work on replica recoveries in phase 2. High level approach of recovering replicas would be as follows. Please not that the following approach still requires some research for validations and may change in future depending on new findings.

  1. Idea is to start child replicas first in-place of parent shard replicas. We will bring child primaries and replicas first as tracking shards on parent shard replication group.
  2. There will be replication tracker changes where replication group on child primaries will be responsible to track and update checkpoints of their respective replicas. Child primaries will themselves act as replicas for parent shard.
  3. We will start replica relocation depending on the number of max concurrent child replica recoveries. Whenever a complete set of child shards becomes active we will replace a replica of parent with this child replica set. Underlying design details of this process will differ for different replication types.
  4. Once all child replicas have started, we will bring all child shards in-sync with parent and perform relocation hand-off with parent primary.
  5. In parent primary failure scenario during the split, if we are left with more than 1 replica of a parent shard, then we will promote one of the parent replicas to primary, otherwise if there is 1 or no replica available anymore to be split we will activate child primaries.

Change in Number of Shards

So far we don’t have any use case in OpenSearch where we ever change the number of shards of an index. But with In-Place split feature, number of shards can dynamically change. Due to this, we may not have a serving shard for an ID between 0 and number of shards (n). In OpenSearch and some of the plugins, there are code blocks in which we iterate from 0 to n-1 to figure out shard IDs and then take respective actions on top of them. With shard split, we need to stop this and instead use servingShardIDs from index metadata to figure out current serving shards. Also, there may be cases where plugins cannot support dynamic sharding in their current state which may either be due to the way they operate but can potentially support dynamic shards or they cannot strictly allow it ever in between their workflow executions. CCR is one such plugin which cannot function with dynamic shard IDs on an index since it has 1-1 mapping between shards of an index on follower and leader clusters.

To handle all of the above cases we will need to expose the capability to plugins to enable/disable splitting of shards in-place. To achieve this, we will take a predicate supplier from installed plugins which can make the core aware of whether in-place split can be allowed at runtime for the index passed in the predicate.

New Split Shard API

We will expose a new API for the usage of this feature. Exact format of the API endpoint will be shared in the PR itself but at high level API will require index name, shardID and number of splits to trigger split of a shard. For new API implementation, we will add new REST action, Transport Action and Metadata Service action.

Scope

Initially we will target splitting of a shard in segrep+remote store replication mode. We will phase out the work starting with this replication mode first. Another reason of scoping work initially only to remote store replication is that it was rolled out recently as a durable alternative and we have observed significantly better performance in this mode than DocRep.

Related component

Indexing:Replication

Thanks @shwetathareja for the detailed review and suggestions!

@vikasvb90 vikasvb90 added enhancement Enhancement or improvement to existing feature or request untriaged labels Jun 1, 2024
@github-actions github-actions bot added the Indexing:Replication Issues and PRs related to core replication framework eg segrep label Jun 1, 2024
@vikasvb90 vikasvb90 self-assigned this Jun 1, 2024
@vikasvb90 vikasvb90 added Indexing Indexing, Bulk Indexing and anything related to indexing Roadmap:Cost/Performance/Scale Project-wide roadmap label Scale ShardManagement:Sizing and removed enhancement Enhancement or improvement to existing feature or request labels Jun 1, 2024
@peternied peternied added RFC Issues requesting major changes and removed untriaged labels Jun 5, 2024
@peternied peternied changed the title [Design] Splitting Shards In-Place [RFC] [Design] Splitting Shards In-Place Jun 5, 2024
@github-project-automation github-project-automation bot moved this to Planned work items in OpenSearch Roadmap Jun 5, 2024
@peternied
Copy link
Member

[Triage - attendees 1 2 3 4 5 6 7]
@vikasvb90 Thanks for creating this rfc

@vikasvb90 vikasvb90 changed the title [RFC] [Design] Splitting Shards In-Place [Design] Splitting Shards In-Place Jul 6, 2024
@kkewwei
Copy link
Contributor

kkewwei commented Aug 15, 2024

@vikasvb90, Amazing for the detailed.
I have a doubt, if the index has 2 shards, and the split is 1:2, the result shard number is [0,1,2,3](0->0&1, 1->2&3), or [0,1,2,3,4,5](0->2&3, 1->4&5, and shard 0,1 are empty)?

@vikasvb90
Copy link
Contributor Author

@kkewwei Thanks for going through the design! Plan is to assign new shard numbers and the reason is that during split, parent and child shards will co-exist. Therefore, there was a need of new shard numbers for child shards. But the side effect of this would be missing numbers between shard 0 and shard n. So, this will require changes in core and some plugins where shard id is determined by simple iteration from 0 to n.

@kkewwei
Copy link
Contributor

kkewwei commented Aug 15, 2024

@vikasvb90, I have another two doubts.
1.

Count number of history ops: This is the number of translog ops to be replayed in Phase 2. Since, Remote Store replication mode doesn’t have a Phase 2 step, this number is 0 for this case

We don't reply the translogs, but Filtering out non-belonging may be cost too much time in child shard, during the hardlink and deleting, there may be some incoming documents, how does the child shard get the now document?

One of solution I thought is: blocking all incoming operations when hardlink, and put Filtering out non-belonging into background.

2.Applying translog ops from translog snapshot, as we don't replay translogs in phase2, in what case, the child shard will replay the translogs.

@vikasvb90
Copy link
Contributor Author

vikasvb90 commented Aug 16, 2024

@kkewwei Didn't understand your question completely. As far as I understand, you are referring to remote store based segment replication particularly because in DocRep, we do plan to replay translogs in child shards and filter out non-belonging operations in replay as well as when child shards are added to the replication group of parent shard.
Coming to segment replication mode, I was hoping to follow peer recovery design which doesn't need a translog replay because in handoff phase translog is drained and a force segment replication is performed on source shard due to which everything in translog is translated to segments and replicated on all shards in the replication group. But while building this, I didn't find any good way to retrieve operations from n sequence number to m sequence number because we need to replicate delta ops on child shards which is done in translog replay in DocRep mode. We have a way to do this in segrep using retention policy and using Lucene snapshot over translog operations but this approach which is also being used in both DocRep and SegRep peer recovery, has been facing degraded performance in search.
Due to this, I had to fall back on translog replay even in SegRep. I am yet to finish my dev work and conclude this and post this I will go ahead and update this design issue. To summarize, I do plan to perform translog replay both in DocRep and SegRep mode.

Lastly, I highly appreciate you taking time to review the design and add your comments. I will be looking forward for more of such comments and feedback from you. Thanks again for your review!.

@kkewwei
Copy link
Contributor

kkewwei commented Sep 11, 2024

It makes sense to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Indexing:Replication Issues and PRs related to core replication framework eg segrep Indexing Indexing, Bulk Indexing and anything related to indexing RFC Issues requesting major changes Roadmap:Cost/Performance/Scale Project-wide roadmap label Scale ShardManagement:Sizing
Projects
Status: New
Status: 🆕 New
Development

No branches or pull requests

3 participants