Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Reader and Writer Separation in OpenSearch #7258

Open
shwetathareja opened this issue Apr 20, 2023 · 38 comments
Open

[RFC] Reader and Writer Separation in OpenSearch #7258

shwetathareja opened this issue Apr 20, 2023 · 38 comments
Labels
Cluster Manager enhancement Enhancement or improvement to existing feature or request feature New feature or request idea Things we're kicking around. Indexing Indexing, Bulk Indexing and anything related to indexing RFC Issues requesting major changes Roadmap:Cost/Performance/Scale Project-wide roadmap label

Comments

@shwetathareja
Copy link
Member

shwetathareja commented Apr 20, 2023

Background:
Today, in OpenSearch indexing and search are supported via same “data” role. This causes indexing and search workload to adversely impact each other. An expensive query can take up all memory and cpu causing indexing requests to fail or otherwise. On one side we have log analytics customers who want to scale for their millions of docs per sec indexing while on the other side, there are search customers who configure large no. replicas like 20 replicas to support their search traffic and what they both really want is scale for their workload respectively. But currently, OpenSearch doesn’t support isolating indexing and search workloads.

Goal:
Support reader and writer separation in OpenSearch to provide predictable performance for indexing and search workload so that sudden surge in one shouldn’t impact the other. This document talks about how we can achieve this separation in OpenSearch.

Terminology:

  1. Reader denotes Search
  2. Writer denotes Indexing

Benefits:

  1. Workload isolation to provide more predictable performance for indexing and search traffic.
  2. Configure and scale indexing and search independent of each other.
  3. Cost savings and better performance by using specialized hardware instances (e.g. compute instances for indexing and high memory instances for search) for different workload.
  4. Tuning buffers and caches based on workload on the node to get optimal performance.

Proposal
In order to achieve Indexing and Search separation, we would build on the new node role “search”. With searchable snapshots, work has already started for “search” role for searching read only indices from snapshots. #4689. This RFC will expand on that effort. The “search” node role would act as dedicated search nodes. The term “replica” would split in the core logic as “write-replica” and “search-replica”. The write-replica/ primary would be assigned on the node with “data” role whereas search-replica will be assigned to the node with “search” role.

Request Handling and Routing
The primary copy or write-replica can support both read and write and this would be the default configuration where user has not configured any “search-replica” or “search” nodes explicitly. The user would have control at the request level to decide whether the request should be handled by “search-replica” only or fallback to primary or “write-replica” in case search-replica is not available. The latter would be the default mode.

Reader-writer

Lets take a look at the the flow for a bulk/ search request (RequestRouting):

  1. Customer sends a bulk request and it reaches the coordinator node.
  2. Coordinator role would check the RoutingTable in ClusterState. It would find the node which is hosting the primary and would route the request to that node for indexing.
  3. Primary would send the replication request to the write-replicas only.
  4. In case of search traffic and based on the user preference, Coordinator would check the RoutingTable and find nodes which have “search” role and route the request to them.

Search role

The search role on the nodes can be configured in 2 ways:

  1. User assigns dedicated search role to a node. That means this node will not have data role. All good, no issues.
  2. User configures both data and search role to the same node. Then there are 2 choices,
    1. either have strict logic to prevent same node from having data and search role
    2. or, let them configure but they might not get optimal performance and workload separation

For # 2, The proposal is to go with 2.a where we shouldn’t allow same node to have both search and data role as it would defeat the purpose of introducing this separation in the first place. Also if you think more, 2.b essentially becomes current model where any shard can land on any data node and support both read and write.

This separation would enable users to think about their indexing and search workload independent of each other and plan for their capacity accordingly e.g. the search nodes can be added/ removed on demand without impacting the indexing throughput and vice versa.

Characteristics of different shard copies

  1. primary - The primary can serve both indexing and search traffic. But, its primary function is indexing. With reader/ writer separation, primary could also become optional if no writes are expected on that index e.g. in case of read-only/ warm indices.
  2. write-replica - The first question comes to mind is why do we need it all? The write-replica serves different purpose with different underlying storage. In case of local storage, write-replica provides durability against any primary failure. While with remote storage, write-replica provides faster fail-over recovery and is optional.
  3. search-replica - The search-replica should strictly take search traffic. This would mean that it can never be promoted as “primary”. In order to provide true workload isolation and predictable performance it is critical that indexing and search shouldn’t take place on the same node otherwise it would defeat the purpose. This also means search-replica can afford to lag more from the primary when replicating writes as compared to write-replica.

Cluster State changes

Another important aspect is ShardAllocation. With new “search” role and different replicas, shard allocation logic will also have changes to accommodate them.

From leader perspective when assigning shards:

  1. It will differentiate write-replica and search-replica based on a new parameter.
  2. It will always assign primary and write-replica to data node role.
  3. The search-replica would get assigned to search role.
  4. In case of primary copy failure, it will find write-replica and promote it to primary.

Today, Leader maintains list of up-to-date copies for a shard in the cluster state as “in-sync allocations”. With reader/ writer separation, search-replicas can be more flexible and can lag more from primary compared to write-replica. There should be a mechanism for leader to track search-replicas and fail them in case they are not able to catch up within the configured thresholds.

Index (Cluster) Status - Green/ Yellow/ Red

Today, in OpenSearch if all the copies of shard are assigned, then index status is green. If any of the replicas is unassigned it turns yellow and if, all the copies (including primary) are unassigned then it turns red. This works well when single copy can serve both the functions i.e. indexing and search. With Reader/ Writer separation, red would be panic signal only for writable copies but it doesn’t indicate if all the read copies (search-replicas) are unassigned. We would need a way to indicate read and write copies health separately.

User configurations

  1. The default configuration would use same primary/ replica copy for both read and write traffic. There is no configuration needed if user is not looking for reader/ writer separation.
  2. In case user want reader/ writer separation, he/ she would configure search-replica explicitly. It would be a dynamic setting and can be configured at any time on new/ existing indices. search-replicas can come and go any time. The current replica count would by default mean write-replica only and can be configured to 0.
  3. During any search request, user can specify which copy should serve the request in the preference i.e. search-replica or fallback to primary/write-replica. In case, user has not specified explicitly per request, it will use the auto preference to check search-replica first always.
  4. User has specified search-replica for an index but has not configured any nodes, it will default to using primary/write-replica for search as well.

Why do we need to configure search-replicas with new setting?
The current “replica” setting is overloaded to act as “write-replica” in case of role separation. There is no other way to configure search-replica without addition of new setting. The current replica setting could have meant search-replica instead of write-replica but this would complicate the default configuration where user has not configured node with “search” and replicas would go unassigned.

How to turn off primary shard copy in case search-replicas are enabled?
With reader/ writer separation, user can choose to turn off primary and set write-replicas to 0 in case they don’t expect any writes to that index. e.g. log analytics customer, after rolling over over an index can set all write copies to 0 for old index and only configure “search-replica” as needed. Today, OpenSearch doesn’t offer any setting to turn off primary. This could be an explicit setting or derive it implicitly if an index is marked read-only.

Tuning of buffers/ caching based on workload
There are different node level buffers and caching threshold which are configured considering the same node will serve both indexing and search workload. Those could be fine tuned better now with the separation.

Auto management with ISM
Index State management plugin will simplify some of these aspects for the user where they don’t need to configure different replicas explicitly and ISM policies can take care of it. e.g. during migration of an index from hot to warm, it can configure primary and write-replica to 0 and set search-replica to n as configured in the policy.

A note on the underlying storage
Ideally, reader and writer separation is independent of underlying storage (local or remote) or index replication algorithm (document or segment replication). But, current document replication with local storage can’t offer the isolation between writer and reader for real as both nodes (data & search) would do same amount of work for indexing. There is no concrete plan yet for remote store with document replication. So, in the first phase it will be segment replication enabled indices which will benefit from this reader/ writer separation.

Separating Coordinators

Today, Coordinator role handles the responsibility of coordinating both indexing and search requests. The above proposal doesn't talk separating coordinators yet in OpenSearch. I would create a separate issue to discuss coordinator role separation in detail.

Thanks @dblock for calling out to capture it separately. Regarding the discussion check the comment

Future Improvements
The reader and writer separation would lay the ground work for lot more improvements in the future like auto scaling of read and write copies, partial fetching via block level fetch or on demand fetch segment/ shard only when there is read or write traffic etc. Also, in future we can look into separating segment merges also to dedicated nodes. Some of these improvements are also discussed here - #6528


How can you help?
Any feedback on the overall proposal is welcome. If you have specific requirements/ use cases around indexing and search separation which are not addressed by the above proposal, please let us know.

@shwetathareja shwetathareja added enhancement Enhancement or improvement to existing feature or request feature New feature or request untriaged idea Things we're kicking around. RFC Issues requesting major changes labels Apr 20, 2023
@shwetathareja
Copy link
Member Author

Tagging @nknize @andrross @dblock @sohami @Bukhtawar @muralikpbhat for feedback

@shwetathareja shwetathareja added enhancement Enhancement or improvement to existing feature or request distributed framework and removed enhancement Enhancement or improvement to existing feature or request untriaged labels Apr 20, 2023
@dblock
Copy link
Member

dblock commented Apr 20, 2023

I love the goals in this proposal. Separating reads and writes is a huge step forward and creating a node role that's purely compute for search makes a lot of sense.

Each search has a fetch and reduce phase. If I understand correctly you are proposing to create lagging replicas and move the fetch part to them. The reduce part would need to continue to be executed with no changes today. The node that is coordinating the search would continue to query results from all the shards via the search-replicas and then order the query results.

Questions:

  • Is the reduce phase also a performance concern, and can/should it be moved to dedicated nodes? For example, could a search-replica also do the coordinating work?
  • Can we offer consistency guarantees even with search replicas by offering options in write concerns? For example, does it make sense to be able to ask that an indexing operation replicate to at least a majority of search-replicas before it returns successfully?
  • Does it make sense to offer consistency guarantees at par with today's guarantees by having a search-replica keep track of whether it's up to date with the data node, and redirect/fallback the request to the data node if it's out of date? The option could be to require consistency, lag by max X, etc.

@shwetathareja
Copy link
Member Author

shwetathareja commented Apr 21, 2023

@dblock : Thanks for the feedback. You brought in valuable points which are not captured in detail.

The reduce part would need to continue to be executed with no changes today.

Yes, you are right. The reduce phase would run on the Coordinator. Today, Coordinator role can run on dedicated nodes or co-located with data. The same logic would apply here as well where coordinator role can be co-located with data and search role. This brings the point can we separate the coordinators as well for search and indexing. Yes definitely, I had kept it out of the discussion for now but make sense to capture it. I will update the proposal as well. One way to solve this is to Introduce a thin proxy in OpenSearch which only handles routing the traffic to indexing/ search coordinator. Now, these coordinators could be dedicated or co-located with data or search role. User can choose to run this request routing (using thin proxy) out of OpenSearch as well and forward the request to appropriate coordinators.

Is the reduce phase also a performance concern, and can/should it be moved to dedicated nodes? For example, could a search-replica also do the coordinating work?

The reduce phase could be memory intensive e.g. an aggregation query aggregating over large no. of buckets. Yes, we can definitely think about separating the coordinators as captured above.

Can we offer consistency guarantees even with search replicas by offering options in write concerns? For example, does it make sense to be able to ask that an indexing operation replicate to at least a majority of search-replicas before it returns successfully?

Yes, that could be a configuration option where user decides if all search-replicas should always remain in sync (same as today with replicas) or can lag for x duration. With this separation, we can offer this flexibility to the user. Supporting only majority of them could be tricky as today primary doesn't fail write in case replicas didn't respond and rather fail the replica which didn't respond and ack the request. One of the motivation for not keeping search-replica strictly in sync was to decouple indexing from search-replicas completely so that indexing latencies are not impacted in case search-replica can't write at the same pace. Also, prevent aggressively failing search-replicas if they are not in sync with primary.
Thanks for calling out, I didn't discuss that in much detail in the proposal. I will update based on how this discussion goes.

Does it make sense to offer consistency guarantees at par with today's guarantees by having a search-replica keep track of whether it's up to date with the data node, and redirect/fallback the request to the data node if it's out of date? The option could be to require consistency, lag by max X, etc.

Leader node will always track the search-replica to ensure it is synced up either always in sync with primary vs or lets say within x duration from the primary. But, there can't be redirection. In case any of the search-replica is out-of-date, it should go unassigned and coordinator will not send any search request to that search-replica.

@nknize
Copy link
Collaborator

nknize commented Apr 21, 2023

What I like about the separation is that the architecture is effectively a Quorum Consensus one and that's the direction we should move.

What I would think differently about is the following:

we would build on the new node role “search”.

Node roles are becoming a sledge hammer for "separating" everything (manager, data, ingest, ml, warm,....). This is occurring naturally because it's easy to separate the design, but there's several drawbacks: 1. cold start time is expensive if/when we need to autoscale, 2. long running nodes cost customers money, 3. on prem users don't have infinite resources. In the example above I see 5 nodes for one shard. Default number of shards is 5. So in this use case a user would need 25 nodes for one index. We should think of designing for the constrained resource user, not one that has infinite resources in a cloud hosted environment. Empower a node to read / write based on the changing demands of the cluster and "autotune" existing nodes when the demand spikes on write vs read time. Use the "remote" as "swap" space. Look at the existing lego pieces we have today and work backwards from why they won't work. I advocate we have most of the pieces already there with segrep to work toward this Quorum Consensus architecture without the need for a whole new "search" node role.

@dblock
Copy link
Member

dblock commented Apr 21, 2023

@nknize brings very valid points on the explosion of number of nodes - as a middle ground, would it make sense to be able to have fewer read-only replica nodes than data nodes? in the example of 5 data nodes, could a user bring up just 2 additional "search-replica" nodes to improve search performance? I think the answer is yes.

But then if we continue this thought, then we could dynamically mark compute-only nodes as "search-replica" and assign N nodes to M shards to replicate and to be used as search based on needs at runtime.

@nknize
Copy link
Collaborator

nknize commented Apr 21, 2023

...could a user bring up just 2 additional "search-replica" nodes to improve search performance?

Yes. However, the main point I'm making is that a node should be able to serve multiple roles at any given time and we shouldn't bias towards "extreme separation". As I mention in #6528 (comment) If the write load spikes then we start simple by using existing mechanisms such as IndexWriter#addIndexes to parallelize writes. Of course there will be the "exhaustive resources" scenario where autoscaling will need to add new nodes, but this should be the exception not the rule.

@andrross
Copy link
Member

andrross commented Apr 21, 2023

@shwetathareja What is the behavior here if a primary fails and the only remaining copy of the shard is on a search-replica (i.e. no remote-backed storage and no healthy write-replica exists)?

We should think of designing for the constrained resource user / a node should be able to serve multiple roles at any given time

One of the proposed benefits here is that specialized hardware for a given role will allow a user to do more work with fewer resources. If hardware is tailored for search or indexing, then you'll need some mechanism to ensure that hardware only gets the workload it is optimized for. Any cloud user has enormous flexibility in choosing hardware configurations and could plausibly benefit from such an architecture. It would probably be super useful to have some prototype numbers to get real data on the potential for cost savings here.

re "autotune"

Another proposed benefit is predictable performance, which is a little bit at odds with autotuning. Its great when your cluster can automatically adapt and optimize for the given workload, but it can make it challenging to know the limits of your system, when to scale what, how it will respond when overloaded, etc.

@nknize
Copy link
Collaborator

nknize commented Apr 21, 2023

....it can make it challenging to know the limits of your system, when to scale what, how it will respond when overloaded, etc.

💯 Reproducibility becomes much more complicated and trace logging becomes much more important. Although I think standard rules still apply. If even with autotuning you see pathological replication lag you're still going to look at thread pools, memory & CPU pressure, along w/ the parameters at the time of query rejection. But I think it's here that our existing Constraint mechanisms play a big part in the cluster insights and evasive actions taken. Take Disk watermarks. If low watermark is hit on the primary, in today's implementation, you can expect the node to take aggressive invasive action and forbid new shard allocations on that node. It's at this point your autoscaling configuration matters. i.e., it could leverage it's remote store weapons to take evasive action like your "roll off" scenario described in #6528 and invoke a "cache by query" mechanism that "swaps" local segments to warm storage to alleviate disk pressure on that node.

@shwetathareja
Copy link
Member Author

shwetathareja commented Apr 24, 2023

Thanks @nknize for the feedback and bringing these interesting points around auto-tune/ consensus based read/ writes.

Node roles are becoming a sledge hammer for "separating" everything (manager, data, ingest, ml, warm,....).

The key benefit of this “extreme separation” is isolation. And with different node roles, it becomes easy to provide the isolation between the workload. I agree, there are too many node roles already. We can definitely look into resource isolation within the JVM for different operations like indexing/ search etc, but that is still a thought and would need to evaluate how far can we go with that.

  1. long running nodes cost customers money

Today, also customers benchmark to find our their capacity in terms of data nodes. With Indexing/ Search separation, they can plan out for their capacity better and can also use specialised hardware which was not possible earlier.

We should think of designing for the constrained resource user,

I think we should design for giving the flexibility to the user. The trade offs are not same for all users. There would be users who are looking for predictable performance in terms of latency and throughput and complete isolation of workloads.

Empower a node to read / write based on the changing demands of the cluster and "autotune" existing nodes when the demand spikes on write vs read time.

I feel auto tune is separate problem than providing workload isolation. I love the idea of auto tune in general. Auto-tune would help users to use their resources more optimally. Also using remote storage as the swap space whenever nodes are in panic (due to low disk space) and have system automatically adapt to such needs. But, we can achieve auto tune where indexing and search are co-located and served from same shard.

I advocate we have most of the pieces already there with segrep to work toward this Quorum Consensus architecture without the need for a whole new "search" node role.

Isn’t Quorum Consensus Architecture achievable without any separation at all with just the “data” nodes today and same shard participating in quorum for both read and write. Also, with remote store, consensus based writes might become less attractive if remote store is able to provide consistency and durability guarantees.

In general, I feel we need to start again here. What is our goal with Indexing/ Search separation? What are the customer use cases we are trying to solve here? What are future improvements which require this separation and not possible with current architecture where indexing and search are co-located on the same node or same shard is serving both read and write.

@shwetathareja
Copy link
Member Author

shwetathareja commented Apr 24, 2023

@shwetathareja What is the behavior here if a primary fails and the only remaining copy of the shard is on a search-replica (i.e. no remote-backed storage and no healthy write-replica exists)?

Good point @andrross . In that case ideally, write copies will turn red. Now, based on the in sync allocations (checkpoint) leader can identify if it was up-to-date with primary or not. In case it was up-to-date, it can be used to resurrect/ promoted to primary. If it is not up-to-date, then there would be data loss and hence user can manually choose to resurrect the primary from search-replica or promote it acknowledging the data loss.

@nknize
Copy link
Collaborator

nknize commented Apr 24, 2023

In general, I feel we need to start again here. What is our goal with Indexing/ Search separation?

First I think we should empirically investigate what level of separation is really needed before making code changes to this end. I strongly feel we have most of the mechanisms in place to maximize cluster resources and throughput at minimal cost to the user before drawing a conclusion that node separation is really needed to this degree.

So if we define "separation" (as I think this RFC does?) as separating the nodes through strict roles (vs separating IndexReader and IndexWriter at a search API, Lucene, level) I actually see this as a "cloud hosted" feature and it shouldn't be part of the min distribution. We should implement it by way of plugins. But even before doing that, cloud native providers could achieve this separation today without a big RFC / code change by adding back search preference to keep search off the indexing nodes (see history why this was removed for more context on where I'm coming from) and using shard allocation filters (what you define in the Shard Allocation section in this RFC) to prevent primaries from ever being allocated to specific nodes (note there's an extra step required where you need to disable and then reset number of replicas). If we empirically determine where and why improving existing mechanisms like this (and others, like quorum consensus, autotune/scale mechanism, etc) in the min distribution is not completely sufficient for a cloud provider and we need (or want) to further optimize that separation by implementing more complex mechanisms, then we do so by building on the core through plugin extension points in a separate cloud plugin, call it :plugin:cloud-native that cloud hosting customers (e.g., AWS, Aiven, Oracle, Logz) can contribute back and continue to enhance for cloud specific optimizations.

From this RFC I see a couple mechanisms I think we should explore (without going so far as separating the roles):

  1. Shadow Writers - scale the writes through parallel IndexWriters that converge using (IndexWriter.addIndexes). Note the variable array of Directory input. Duplicate write/updates can be reconciled using Lucene sequence numbers (something OpenSearch doesn't exploit to the fullest potential).
  2. Split Coordinators - Multi-thread client establishes two connections (read/write) for bulk operations.

Some of the other mechanisms mentioned are already being worked (e.g., streaming index API is working on tuning buffers / caching during writes based on load).

@nknize
Copy link
Collaborator

nknize commented Apr 24, 2023

  1. Shadow Writers

I'd keep this simple. Scenario: indexing back pressure on the primary node trips some constraint jeopardizing indexing throughput.

Contingency measure one:

  1. Spin up another indexing thread(s) to scale indexing operations through new IndexWriter and use IndexWriter.addIndexes to reduce separate writers into a single index.

Contingency measure two (adaptive shadow writing):

  1. Use AdaptiveSelectionStats to find a node with most available resources.
  2. Coordinating node routes Indexing Operations to new shadow writer
  3. Shadow writer uses a stateless IndexWriter to create local segments on shadow node
  4. Shadow Writer node rsyncs shadow segments to primary (leader) node when back pressure subsides (use the same segment copy methods used by segrep today)
  5. Coordinating node routes back to primary after back pressure subsides

In this way we use existing idle nodes to scale writes. If Adaptive Selection indicates cluster red-lining, then we recommend the user adds a new node. A cloud hosting environment could spin up a new node for them, but I'd put that logic in a :plugins:cloud-native.

This also enables us to separate functional mechanisms from segrep into the core library such that cloud native opensearch integrators could implement a pure serverless solution (separate from the "serverful" :server code in core opensearch). Doing this moves the project closer to being able to support true cloud native serverless opensearch solutions in something like Lambda (e.g., lambda functions that use IndexWriter to write segments to an object store, and IndexReader to run lucene searches over object stored segments).

@nknize
Copy link
Collaborator

nknize commented Apr 24, 2023

Thinking deeper about this, if we implement the shadow writers by teasing out the segrep mechanisms (e.g., copySegment) and components to :libs-opensearch and implementing the functional cluster flow in :server then we should have most of what we need in those libraries alone to be used in something like kubernetes, Knative, or some other oss FaaS, (or AWS lambda) to implement a full Serverless OpenSearch offering. In that solution there shouldn't even be a need for a "node". Just throw away stateless functions to index, merge, search, etc. w/ your choice of storage.

@shwetathareja
Copy link
Member Author

@nknize 💯 to supporting the indexing, search, merge as stateless function where each one can scale independent of each other and would be a big step towards enabling OpenSearch serverless offering by different cloud providers. It requires more brainstorming on how would it work end-to-end e.g. how failure handling would work e.g. if one shadow writer crashes, coordination between primary shard and shadow writer which are creating the segments and copying back to primary shard, does it require changes in checkpointing mechanism, how updates & deletes are handled etc. In general, I agree to start with end state where these functions are stateless, how would management flow look like for serverless and serverful and what abstraction would be useful for both and what should be in core vs cloud-native plugin/ extension.


My thoughts on your previous comments

But even before doing that, cloud native providers could achieve this separation today without a big RFC / code change by #6046 to keep search off the indexing nodes (see history why this was removed for more context on where I'm coming from) and using shard allocation filters

Thanks for sharing the add back search preference discussion. This is exactly what we would have implemented anyway as part of this proposal. I think the RFC looks big as we are trying to explain the proposal in detail but overall changes would be on top of existing constructs like shard allocation filters which would need changes to start differentiating primary and replicas. Now, whether these changes should be part of core vs cloud-native plugin/ extension is open for discussion.

I totally agree with you that we should experiment and get data before we conclude in this direction in terms of both performance and cost benefits. This RFC was to get initial buy-in whether we should invest in this direction or not.

Shadow Writers - scale the writes through parallel IndexWriters that converge using (IndexWriter.addIndexes).

In case the node resources are not exhausted and Indexing backpressure is triggered lets say due to indexing queue, then parallel writers would help scale the indexing work and increase the throughput. But, if back pressure is applied to due to JVM memory pressure/ cpu/ storage then having parallel writer on the same node wouldn't make it any better. +1 to Adaptive Shadow writing for scaling writes to other nodes. It will work for append-only workload.

@nknize
Copy link
Collaborator

nknize commented Apr 25, 2023

how failure handling would work e.g. if one shadow writer crashes, coordination between primary shard and shadow writer which are creating the segments and copying back to primary shard, does it require changes in checkpointing mechanism, how updates & deletes are handled etc

Mechanisms exist to handle these failure situations. Segment copy failures are handled w/ shadow writers the same as done today in segrep when copying primary segments to replicas. Out of order is handled through sequence numbers (and primary term in case of primary failure). In segrep we still forward the op for the translog for failover so that would remain in place. In the second use case the primary reports back to the coordinating node to route the next batch of operations to the shadow writer. Ordering is handled using the xlog sequence number where the starting sequence number of the next batch of operations are sent to the shadow writer. Lucene also maintains sequence numbers so addIndexes can dedup indexing operations at the Lucene level.

Thanks for sharing the add back search preference discussion. This is exactly what we would have implemented anyway as part of this proposal.

ping @mattweber, are you still planning to commit the revert on this elastic PR?

But, if back pressure is applied to due to JVM memory pressure/ cpu/ storage then having parallel writer on the same node wouldn't make it any better.

Exactly! We can leverage Constraints the same way we do w/ ConstraintAllocation today. Maybe just create a new ConstraintShadowWriterDelegation and add IndexingPressure. Once the constraint is breached we notify the coordinating node to Adaptively select the best node to reroute writes.

@shwetathareja
Copy link
Member Author

shwetathareja commented Apr 25, 2023

ping @mattweber, are you still planning to commit the revert on this elastic PR?

Yes, I see value in bringing the change back while we brainstorm and converge on the long term plan. One option that I am wondering is without supporting explicit "search" node role (until we close on it), have search preference for primary/ replica, shard allocation filtering based on primary/ replica and also better visibility in cluster health for replicas so that users (be it cloud provider or self hosted) can start to separate the workload and see performance benefits. These won't be big core changes. Do you see concerns with it @nknize ?

@dblock
Copy link
Member

dblock commented Apr 25, 2023

Yes. However, the main point I'm making is that a node should be able to serve multiple roles at any given time and we shouldn't bias towards "extreme separation".

Agreed. When I read "node role" I imply that "node can have any combination of roles (or only one)". AFAIK, while some older roles were not designed this way, all new roles (e.g. dynamic roles) are. I think we're saying the same thing.

In general, I feel we need to start again here. What is our goal with Indexing/ Search separation?

I believe that the goal is for users to be able to reason about performance, and scale indexing and search independently.

For example, I'd like to be able to guarantee a stable level of quality of service for search that I can do math around with a calculator regardless of the indexing pattern: given XTB of data and a well understood search pattern, I want to reliably know that my cluster can perform N req/s with P100 of 10ms, therefore to support M users I need K search nodes while indexing may run in bursts. In this scenario I also would like to be able to measure traffic and start rejecting requests when the number is > N req/s to keep my QoS stable rather than degrade the search performance for everyone by peanut buttering the load across all search nodes.

@nknize
Copy link
Collaborator

nknize commented Apr 25, 2023

I want to reliably know that my cluster can perform N req/s with P100 of 10ms,...

You can do this through Constraints. If throughput is reaching redline (user defined) Adaptive Shadow Writing kicks in and reroutes. If entire cluster is near redlining (and you're on prem) you can get cluster insights that you'll need to scale out or risk rejecting requests. A hosted solution customer could then choose to install :plugins:cloud-native which would auto launch a new node in a hosted environment. No need for separating via node roles which only proliferates cluster hotspots.

@dblock
Copy link
Member

dblock commented Apr 25, 2023

You can do this through Constraints.

Can you? You have a writer eating away CPU, pegging at 100% in bursts, it makes it very difficult to have predictable reader performance.

@andrross
Copy link
Member

One option that I am wondering is without supporting explicit "search" node role (until we close on it), have search preference for primary/ replica, shard allocation filtering based on primary/ replica and also better visibility in cluster health for replicas so that users (be it cloud provider or self hosted) can start to separate the workload and see performance benefits.

@shwetathareja I like the approach of using these low-level knobs to prove out the behavior and performance we're looking for. The third aspect (beyond search routing and shard allocation) is translog replication (both actual replication used in pure segrep and primary term validation/no-op replication used with remote translog). Can toggling this behavior be inferred from the the allocation filtering (i.e. a node that is excluded from being promoted to primary can skip any translog replication behavior) or would another knob be necessary?

@andrross
Copy link
Member

@shwetathareja I see, with this "soft" separation the replicas nodes can still be promoted to primary (and temporarily serve writes?) but will be quickly relocated to a node designated as the preferred location for primaries, hence the replicas still have to keep all the data and functionality required to act as a primary.

@itiyama
Copy link

itiyama commented Apr 26, 2023

Spin up another indexing thread(s) to scale indexing operations through new IndexWriter and use IndexWriter.addIndexes to reduce separate writers into a single index.

  1. Isn't this the reason for having shards in the system? Why not just add and reduce shards automatically. At lucene layer, this is the same function. Are parallel shadow writers more lightweight in any way compared to shards?
  2. Using ghost writer for parallel shards puts the burden of providing a conflict resolution function to the user. We can use lucene sequence numbers, but how would you use these numbers across different shards with parallel writes for the same document id. I would avoid building another routing layer at the shard level.

Contingency measure two (adaptive shadow writing):

Adaptively add a ghost writer to a new node: I really like this approach but I am nervous about the few guarantees that it will break for existing APIs. In theory, this is similar to hinted handoff used by key value databases to improve availability. The indexing APIs are synchronous and guarantee that once a write is acknowledged, it will be persisted and will not fail with validation errors in future. This is achieved by writing the documents to lucene buffer in the sync path of request, which does the mapping validation implicitly. If we had a system where acknowledging the write does not require knowing the existing state of the system, this approach will work. But in reality, this is not true due to API guarantees of Opensearch, not due to the limitations of the underlying system.

  1. How will ghost learn about existing documents? Every write looks for the document id in the lucene index first. Is the ghost writer always caught up with primary and starts from a sequence number after what primary has already written? How does ghost writer come to know about the sequence numbers that primary already wrote?
  2. Indexing APIs in opensearch rely on the existing document available for search or in translog e.g. a painless script embedded in the bulk request may require updating a document only if certain fields have a specific value. These cases will just break with the stateless ghost writer approach unless we change the guarantees that the indexing API provides.

The ghost writer approach has value even for higher availability for remote backed storage where the translog replay to replica could take longer. We need to identify use-cases or guarantees that it will break and explicitly handle those guarantees in the API itself?

@nknize Thoughts?

@itiyama
Copy link

itiyama commented Apr 26, 2023

@shwetathareja If you do complete isolation, where would operations like update_by_query and delete_by_query run?

Even without autoscaling requirements, users will need reader writer separation. So, there is value to extreme isolation e.g. I have a writer cluster that builds the index daily and a reader cluster which supports read queries and once a day gets all the new changes through a snapshot. Writer cluster has writer specific configurations e.g. large lucene buffer, optimized directory implementation for writes. The reader cluster has instances with large memory to support fast read queries. With remote backed store, this use case can be perfectly automated for the user without having to maintain 2 clusters.

I agree that building it in core may not be best mechanism to achieve it and we should explore other options as explained by @nknize

@kkmr
Copy link
Contributor

kkmr commented Apr 27, 2023

Thanks for putting together this Proposal @shwetathareja. I like the discussion we are having so far. Couple of questions:

  • How does one recover a search-replica? If a search-replica dies, does the new replica we spin up get its data from the data-node or does it need to get it from another search-replica?
  • If all search-replicas fall back significantly, does the primary start rejecting writes? Meaning, the primary keeps track of search-replica states
  • Is the network bandwidth a concern? Given that we are proposing using segrep, do we have to construct some sort of a chain from primary to search-replica1, search-replica2, etc to be able to do the replicate data without consuming all the network bandwidth on the primary?
  • Is there any impact to the snapshot workflow? Where are snapshots taken?
  • How do scroll and PIT work in this scenario?
  • In addition to Itiyama's question about where update-by-query and delete-by-query run, it would be great to know how async queries would work.

@shwetathareja
Copy link
Member Author

Thanks @itiyama for sharing your insights!

@shwetathareja If you do complete isolation, where would operations like update_by_query and delete_by_query run?

Any mutable action would still run on write copies. The OpenSearch code remains the same, it is trying to separate the purely search requests.

With remote backed store, this use case can be perfectly automated for the user without having to maintain 2 clusters.

Totally agree, customers wouldn't need expensive solutions built with CCR (Cross cluster replication) if same cluster can provide the isolation they are looking for.

@shwetathareja
Copy link
Member Author

@kkmr, thanks for your comments.

How does one recover a search-replica? If a search-replica dies, does the new replica we spin up get its data from the data-node or does it need to get it from another search-replica?

All search-replicas would sync from primary for local storage and for remote store, it can sync from remote store directly. It would depend on how recoveries are configured for different storage.

If all search-replicas fall back significantly, does the primary start rejecting writes? Meaning, the primary keeps track of search-replica states

No, primary wouldn't wait for search-replicas. They can lag from primary but eventually would fail if they can't catch up within configured thresholds. We would need to figure out async mechanism to do the same.

Is the network bandwidth a concern? Given that we are proposing using segrep, do we have to construct some sort of a chain from primary to search-replica1, search-replica2, etc to be able to do the replicate data without consuming all the network bandwidth on the primary?

That's a good point. I think someone brought it up during segrep discussion for local storage. I would see that as an optimization once we have core design laid out.

Is there any impact to the snapshot workflow? Where are snapshots taken?

Today also snapshots are taken from primary. So, that should remain the same.

How do scroll and PIT work in this scenario?

Scroll and PIT shouldn't be impacted in the sense. e.g. active scroll context could prevent segment merges and would require keeping old data around. In general, segment replication should address these concerns and should be independent of this separation unless i am missing something.

In addition to Itiyama's question about where update-by-query and delete-by-query run, it would be great to know how async queries would work.

async queries should be served from search-replicas.

@nknize
Copy link
Collaborator

nknize commented Apr 27, 2023

Why not just add and reduce shards automatically.

You could think of this as dynamic sharding without the rehashing and rebalancing of a permanent resharding. Permanent resharding may still be necessary in the case of shard hotspots or full cluster red-lining.

Using ghost writer for parallel shards puts the burden of providing a conflict resolution function to the user.

It puts the burden of providing conflict resolution on the reducer, not the user. Lucene provides two addIndexes. The first is a "dumb but fast" reduce to single index by way of blind segment copy / SegmentInfo update (ids and ordinals are left alone and segments aren't modified, just appended). The second is a slightly more costly re-encoding using CodecReader... that invokes the MergeScheduler to maybeMerge into one lucene index (this is the one that we would use). The idea here is that we would create a "reducer" on the primary that creates FilterLeafReader for local primary segments and remote copied GhostWriter segments that ensures correct ordinal mapping and version conflict resolution so deletes can properly be applied. We wrap the LeafReader as a CodecReader and call IndexWriter.addIndexes(. Lucene 9.4 added concurrency optimizations (multithreading) for this exact use case but I think there's room for more improvement.

I would avoid building another routing layer at the shard level.

Right. Neither case would require a shard level routing table. Only the second case would require a cluster level reroute based on selecting a node with the most available resources. To explore this I started playing around with a scaffolding for adding new OperationRouting to the existing Routing logic based on a new AdaptiveWriteStats mechanism that looks like ARS but includes IndexingPressure heuristics. These changes are not rote refactors, they do require some new interesting logic and the overall fault tolerance will require a few new mechanisms for handling failover on ghost writers (e.g., in on-prem peer to peer cluster mode ghost writers would still forward write operations to replicas for xlog durability in the event a ghost writer fails - just like exsting primary writes).

If we had a system where acknowledging the write does not require knowing the existing state of the system, this approach will work

We would not blindly ack the request, and the approach still works. The ack mechanism would not change. A single Index API call will ack after the call. _bulk calls will ack after all calls complete (regardless of whether they're routed to the primary or a GhostWriter). Acks are still sent back after IndexWriter completes as is done today. Coordinator node still orchestrates this communication w/ the external client.

Is the ghost writer always caught up with primary and starts from a sequence number after what primary has already written?

No. W/ the LeafReader reducer and IndexWriter.addIndexes it's not necessary. GhostWriter is just a new empty IndexWriter for the primary shard. No existing segments on disk for a GhostWriter node so think of it as an empty primary shard. Once the primary is overloaded and its write pool is lagged/exhausted we simply route all write operations for that primary (from that point forward) to an underused node and begin writing to a whole new set of segments. Once that batch is complete we copySegments (along w/ maxSequenceNo) back to the primary and the reducer kicks in to apply soft deletes and merge to a single shard. On completion SegRep continues as usual and copies the newly mutated segments to the replicas.

Indexing APIs in opensearch rely on the existing document available for search ...
updating a document only if certain fields have a specific value

Today _delete_by_query and _update_by_query operate on a point in time. Even in today's implementation if the index changes from underneath these APIs will fail. This contract / behavior would not change.

@nknize
Copy link
Collaborator

nknize commented Apr 27, 2023

How do scroll and PIT work in this scenario?

Scroll and PIT shouldn't be impacted in the sense. e.g. active scroll context could prevent segment merges and would require keeping old data around.

Exactly. That's how it's done today. Segment files are refCounted so they are protected from deletion until all PIT completes and the refCount drops to 0. This affects things like DiskWatermark which could cause disk pressure. This is why Remote Store backed Index is useful for scaling storage.

@nknize
Copy link
Collaborator

nknize commented Apr 27, 2023

I think I'll tease out the Parallel Write conversation (Ghost Writer) from the Node Separation conversation (this issue) by opening a separate issue. We can keep this RFC scoped to just the node role scenario. If we decide to implement node separation as described here then let's work that in a separate plugin (call it :plugin:cloud-native or something) so on-prem users can choose to install if they want. It will be tricky because we'll need to refactor a lot of base classes, which is already a task in itself. But if we get that done in parallel (or first) it will help reduce a lot of code debt.

@itiyama
Copy link

itiyama commented Apr 27, 2023

We would not blindly ack the request, and the approach still works. The ack mechanism would not change. A single Index API call will ack after the call. _bulk calls will ack after all calls complete (regardless of whether they're routed to the primary or a GhostWriter). Acks are still sent back after IndexWriter completes as is done today. Coordinator node still orchestrates this communication w/ the external client.

My point here is that the ack is not a simple ack for index request. We return detailed exceptions to our customers when the request cannot complete. Customers may be taking an action when we do not send them an exception message. Let me explain with a few examples:

  1. Index request with existing doc id and op_type as create: The response is not a simple ack, but an exception message. The way this is achieved today is by peeking into the in-memory doc id map, followed by checking the lucene index. This consistency will go away with an empty indexWriter with no knowledge. We will ack the response, but later not complete the request.
  2. Index request with if_seq_no and if_primary_term: If seq no and primary term already exist, we return an appropriate error message and not ack and then fail it later. A related question to this: seq no is increasing per shard. With multiple ghost writers, how will the seq no change, do we introduce one more version called ghost writer version?
  3. There are many more cases with the more powerful painless scripts e.g. add a new field if a previous field does not exist. In this case, if the old index has a field and a new index does not have the field, only one field should be present and not both. How will conflict resolution work in that case?

I may be missing something here. If you could explain how we will keep the experience consistent with current state for the above cases, it would be great.

You could think of this as dynamic sharding without the rehashing and rebalancing of a permanent resharding. Permanent resharding may still be necessary in the case of shard hotspots or full cluster red-lining.

I was wondering that you are trying to get around the limitation of static sharding in Opensearch, which could be solved by just changing the doc routing strategy through simple principles of consistent hashing. Then you need not do rehashing across shards and you effectively get the same benefits. Additional metadata on the hash space ranges is maintained per shard in cluster metadata. The downside is that re-sharding approach splits the shard first by deleting the documents from each half and then merge them later as opposed to your approach of just doing the merging later. The cost of creating a new writer is minimal, which is what you need when traffic increases immediately. Ghost sharding is better unless there is a conflict resolution downside that we haven't uncovered yet. Ghost writer is also better as it avoids a lot of refactor. Assumption around shard count being static for an index is everywhere in our code.

I have a few more questions on reducer. Will ask them once I get a reply to this.

@shwetathareja
Copy link
Member Author

No. W/ the LeafReader reducer and IndexWriter.addIndexes it's not necessary. GhostWriter is just a new empty IndexWriter for the primary shard. No existing segments on disk for a GhostWriter node so think of it as an empty primary shard. Once the primary is overloaded and its write pool is lagged/exhausted we simply route all write operations for that primary (from that point forward) to an underused node and begin writing to a whole new set of segments. Once that batch is complete we copySegments (along w/ maxSequenceNo) back to the primary and the reducer kicks in to apply soft deletes and merge to a single shard. On completion SegRep continues as usual and copies the newly mutated segments to the replicas.

If GhostWriter is empty and unaware of the existing documents, then it can't execute the updates, user can update only few fields and keep the other values same. Also to prevent conflicts during updates, the internal _version or user provided version is used for optimistic concurrency control. How would reducer merge these conflicts? Today, OpenSearch doesn't guarantee the order of update request but they are applied one after the other and not overwritten completely. Initially, I was wondering if this GhostWriter / ShadowWriter is proposed only for append-only uses.

@shwetathareja
Copy link
Member Author

I think I'll tease out the Parallel Write conversation (Ghost Writer) from the Node Separation conversation (this issue) by opening a separate issue. We can keep this RFC scoped to just the node role scenario. If we decide to implement node separation as described here then let's work that in a separate plugin (call it :plugin:cloud-native or something) so on-prem users can choose to install if they want. It will be tricky because we'll need to refactor a lot of base classes, which is already a task in itself. But if we get that done in parallel (or first) it will help reduce a lot of code debt.

I feel primary/ replica separation in core would benefit any kind of user (on prem/ cloud provider). In terms of extreme separation we can review what should be in core vs plugin. I will think more on this and get back.

@nknize
Copy link
Collaborator

nknize commented Apr 28, 2023

If GhostWriter is empty and unaware of the existing documents, then it can't execute the updates

The reducer would have to "lazy" reconcile the update. The challenge it poses is when to ack so we would need to think a bit about that. A simple phase 0 with minimally invasive changes might be to "best effort" GhostWrite the update requests on replicas only. I have some other thoughts and will put them in a separate design discussion issue.

@itiyama
Copy link

itiyama commented Apr 28, 2023

If GhostWriter is empty and unaware of the existing documents, then it can't execute the updates, user can update only few fields and keep the other values same. Also to prevent conflicts during updates, the internal _version or user provided version is used for optimistic concurrency control. How would reducer merge these conflicts? Today, OpenSearch doesn't guarantee the order of update request but they are applied one after the other and not overwritten completely. Initially, I was wondering if this GhostWriter / ShadowWriter is proposed only for append-only uses.

AFAIK, this is possible as long as the ack to the user does not really mean that the update will be applied. Conflict resolution within a single document can be done for some update cases as well. It will be like the famous shopping cart example of merging carts. If a user added 2 different fields, both can be added. If a user updated an existing field, the one added later will be applied for the contingency where ghost is adaptively moved to a different node. For parallel ghost writers, it would be a bad experience, so I think we should just use ghost writer as a mechanism to move the shard quickly to a different node to start with.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Cluster Manager enhancement Enhancement or improvement to existing feature or request feature New feature or request idea Things we're kicking around. Indexing Indexing, Bulk Indexing and anything related to indexing RFC Issues requesting major changes Roadmap:Cost/Performance/Scale Project-wide roadmap label
Projects
Status: 🆕 New
Status: New
Development

No branches or pull requests

8 participants