Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[META] Reader/Writer Separation #15306

Open
6 of 14 tasks
mch2 opened this issue Aug 19, 2024 · 10 comments
Open
6 of 14 tasks

[META] Reader/Writer Separation #15306

mch2 opened this issue Aug 19, 2024 · 10 comments
Assignees
Labels
Meta Meta issue, not directly linked to a PR Roadmap:Cost/Performance/Scale Project-wide roadmap label v2.17.0

Comments

@mch2
Copy link
Member

mch2 commented Aug 19, 2024

Please describe the end goal of this project

High level goal is to separate indexing and search traffic within remote store enabled clusters to achieve independent scalability and failure/workload isolation.

Supporting References

#15237,
#7258
#14596

Issues

Experimental feature goals - Achieve workload & failure isolation with read scale to zero.

introduce replica type changes, pull based replica, allocation filter and minimal API changes (stats).

-- Experimental (target 2.17)

GA feature goals: Achieve write scale to zero, ISM/Clients integration, and reasonable (cat/health) API updates.

Related component

Search:Performance

@prudhvigodithi
Copy link
Member

prudhvigodithi commented Nov 7, 2024

For scale to zero implementation.

To be able to scale down the primary and replicas and keep only the search replicas for search traffic and ability to bring back the primary and regular replicas for write (index) traffic.

Update Cluster State (Initial)

  • Set scale_to_zero flag in index settings.
  • Update IndexMetadata to reflect scaled state.
  • Trigger cluster state update.

Store Original Configuration

  • Save number of original shards and replicas.
  • Store original routing table configuration.
  • Preserve all allocation details.
  • This information will be used during scale-up.

Prepare for Scale Down

  • Flush all shards to ensure data persistence.
  • Upload latest state to remote store.
  • Perform final sync for search replicas.
  • Ensure all data is safely stored remotely.

Update Routing Table

  • Mark shards as SCALED_TO_ZERO (new status).
  • Remove primary and replica assignments.
  • Update routing table to reflect scaled state.
  • Maintain cluster state consistency.

Remove Shards

  • Remove primary shard.
  • Remove replica shards.
  • Clean up local resources.
  • Maintain metadata in cluster state.

Handle Cluster Health

  • Implement custom health status for scaled-down indices.
  • Update cluster health APIs.
  • Provide clear status in cluster state.

Scale Up Process (when flag is removed)

  • Detect scale_to_zero flag removal.
  • Retrieve original configuration.
  • Recover from remote store.
  • Restore original routing.
  • Re-establish primary and replicas.
  • Clear scale_to_zero metadata.

Thanks to @mch2 for the initial setup! Based on the above points, please share any additional thoughts and let me know if required any changes to the implementation order.

Thanks
@getsaurabh02 @vinaykpud

@mch2
Copy link
Member Author

mch2 commented Nov 7, 2024

Hey thanks for writing this out @prudhvigodithi, couple thoughts.

Save number of original shards and replicas.
Store original routing table configuration.

I don't think we need to store anything new here other than a flag to indicate that we are deleting writers at an index level, and perhaps the latest synced primary term and SegmentInfos version per shard (so that searchers will sync until they reach that version). The count of pri/replica would remain as is in IndexMetadata. Routing tables I think should remove the primary and writer replica entries so that if writers are scaled up again the shards are re-allocated - or we leave those entries in the tables simply as unassigned shards, but thats unnecessary if they are never reallocated which would likely be the vast majority of cases.

Lastly, I think we should go for a flag name that better indicates that the writers are what are removed. Scale to zero could cover both reads/writes. Maybe something like remove_indexing_shards: true

@prudhvigodithi
Copy link
Member

Thanks @mch2 here is a small POC for adding the index flag remove_indexing_shards and a new RemoveIndexingShardsAllocationDecider that will update the allocation status once remove_indexing_shards is passed.

curl -X GET "localhost:9200/_cluster/allocation/explain" -H 'Content-Type: application/json' -d'
{
    "index": "my_search_index",
    "shard": 0,
    "primary": true
}' | jq '.'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1869  100  1797  100    72   140k   5774 --:--:-- --:--:-- --:--:--  152k
{
  "index": "my_search_index",
  "shard": 0,
  "primary": true,
  "current_state": "started",
  "current_node": {
    "id": "rbrR8G4DSr-lLIfzvgeUbw",
    "name": "runTask-1",
    "transport_address": "127.0.0.1:9301",
    "attributes": {
      "remote_store": "true",
      "testattr": "test",
      "shard_indexing_pressure_enabled": "true"
    }
  },
  "can_remain_on_current_node": "no",
  "can_remain_decisions": [
    {
      "decider": "remove_indexing_shards",
      "decision": "NO",
      "explanation": "removing primary with healthy search replicas"
    }
  ],
  "can_move_to_other_node": "no",
  "move_explanation": "cannot move shard to another node, even though it is not allowed to remain on its current node",
  "node_allocation_decisions": [
    {
      "node_id": "3DUBKHJgSViD5LSs4K9olw",
      "node_name": "runTask-2",
      "transport_address": "127.0.0.1:9302",
      "node_attributes": {
        "remote_store": "true",
        "testattr": "test",
        "shard_indexing_pressure_enabled": "true"
      },
      "node_decision": "no",
      "weight_ranking": 1,
      "deciders": [
        {
          "decider": "same_shard",
          "decision": "NO",
          "explanation": "a copy of this shard is already allocated to this node [[my_search_index][0], node[3DUBKHJgSViD5LSs4K9olw], [S], s[STARTED], a[id=gEKcvMGHSRaovBbu50KZaQ]]"
        },
        {
          "decider": "remove_indexing_shards",
          "decision": "NO",
          "explanation": "remove indexing shards enabled: blocking allocation"
        }
      ]
    },
    {
      "node_id": "7QJHNj1CS6KbXEfZmxSkcw",
      "node_name": "runTask-0",
      "transport_address": "127.0.0.1:9300",
      "node_attributes": {
        "remote_store": "true",
        "testattr": "test",
        "shard_indexing_pressure_enabled": "true"
      },
      "node_decision": "no",
      "weight_ranking": 2,
      "deciders": [
        {
          "decider": "same_shard",
          "decision": "NO",
          "explanation": "a copy of this shard is already allocated to this node [[my_search_index][0], node[7QJHNj1CS6KbXEfZmxSkcw], [R], s[STARTED], a[id=xR90xrspTluLuFf_CotK2g]]"
        },
        {
          "decider": "remove_indexing_shards",
          "decision": "NO",
          "explanation": "remove indexing shards enabled: blocking allocation"
        }
      ]
    }
  ]
}

main...prudhvigodithi:OpenSearch:searchonly

@prudhvigodithi
Copy link
Member

Coming from #15306 (comment), having the right allocation rules dint help to directly remove the primary and replica shards. I had to make few changes with segment replication code and IndicesClusterStateService.

main...prudhvigodithi:OpenSearch:searchonly, @mch2 please take a look and see if the initial approach.

Now with the following updates I can see the shards are actually removed.

## Create the index
curl -X PUT "http://localhost:9200/my_search_index" -H 'Content-Type: application/json' -d '{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "number_of_search_only_replicas": 1,
      "replication.type": "SEGMENT"
    }
  }
}'

## Add documents
curl -X POST "http://localhost:9200/my_search_index/_doc/1" -H 'Content-Type: application/json' -d '{
  "title": "Sample Document 1",
  "content": "This is a sample document added to the index 1.",
  "timestamp": "2024-11-04T10:00:00Z"
}'

## Scale to zero only search traffic
curl -X PUT "http://localhost:9200/my_search_index/_settings" -H 'Content-Type: application/json' -d '{
  "index": {
    "remove_indexing_shards.enabled": true
  }
}'

## Shard status
curl -X GET "localhost:9200/_cat/shards/my_search_index?v&h=index,shard,prirep,state,unassigned.reason,node,searchOnly"

The status is UNASSIGNED ALLOCATION_FAILED

index           shard prirep state      unassigned.reason node
my_search_index 0     s      STARTED                      runTask-1
my_search_index 0     p      UNASSIGNED ALLOCATION_FAILED 
my_search_index 0     r      UNASSIGNED ALLOCATION_FAILED 

My initial approach is to have a POC to completely scale down (scale to zero) the primary and replicas having just the my_search_index 0 s STARTED runTask-1 for the search traffic. In the above experiment I was able to delete the shards and still the search traffic is working. I was able to handle the segment replication side to exclude primary when it has flag "remove_indexing_shards.enabled": true and while its being removed, exclude search replicas in the same way.

## Document Search
curl -X GET "http://localhost:9200/my_search_index/_search" -H 'Content-Type: application/json' -d '{
  "query": {
    "match": {
      "title": "Sample Document 1"
    }
  }
}' | jq '.'

Now next steps is

  • Have the shard API _cat/shards/ to even remove this message UNASSIGNED ALLOCATION_FAILED.
  • Since Primary shards are remove,d we cant scale up the search replicas, have a mechanism to scale the search replicas from existing remote store or from existing search replicas.
curl -X PUT "http://localhost:9200/my_search_index/_settings" -H 'Content-Type: application/json' -d '{
  "index": {
    "number_of_search_only_replicas": 2
  }
}'
  • Better cluster status to show when primary is removed and just having the search replicas.
  • Do one final sync and the search replicas should refresh these segments before the primaries are removed.
  • Restore the primaries and replicas from existing cluster settings when "remove_indexing_shards.enabled": false.

Thank you
@getsaurabh02

@prudhvigodithi
Copy link
Member

prudhvigodithi commented Nov 15, 2024

Had an offline sync up with @mch2 and yes we should be able to remove the custom allocation decider (RemoveIndexingShardsAllocationDecider) which was added previously, I have added some new changes main...prudhvigodithi:OpenSearch:searchonly, running

curl -X PUT "http://localhost:9200/my_search_index/_settings" -H 'Content-Type: application/json' -d '{
  "index": {
    "remove_indexing_shards.enabled": true
  }
}'

Directly modifies the routing tables and closes the primary and replica shards having only search replicas and allowing the search traffic.

From

curl -X GET "localhost:9200/_cat/shards/my_search_index?v&h=index,shard,prirep,state,unassigned.reason,node,searchOnly"
index           shard prirep state   unassigned.reason node
my_search_index 0     p      STARTED                   runTask-0
my_search_index 0     r      STARTED                   runTask-1
my_search_index 0     s      STARTED                   runTask-2

To

curl -X GET "localhost:9200/_cat/shards/my_search_index?v&h=index,shard,prirep,state,unassigned.reason,node,searchOnly"
index           shard prirep state   unassigned.reason node
my_search_index 0     s      STARTED                   runTask-2

When updated to "remove_indexing_shards.enabled": false should switch back to the existing state of primary and replicas.

Now next steps is:

  • Do one final sync before the primary shard is closed and the search replicas should refresh these segments.
  • Search replicas should identify the "remove_indexing_shards.enabled": true and stop the continuous sync and should only resume once changed to false and primaries are up and running.
  • Better cluster status to show when primary is removed and just having the search replicas.

@prudhvigodithi
Copy link
Member

Coming from #15306 (comment) with the latest change main...prudhvigodithi:OpenSearch:searchonly, a final sync is done by the primary shards to remote store before closing the shard.

[2024-11-17T13:47:44,696][INFO ][o.o.i.IndexService       ] [runTask-0] [my_search_index] Doing final sync before closing shard, remove_indexing_shards is enabled
[2024-11-17T13:47:44,747][INFO ][o.o.i.IndexService       ] [runTask-1] [my_search_index] Doing final sync before closing shard, remove_indexing_shards is enabled
[2024-11-17T13:47:44,696][INFO ][o.o.i.IndexService       ] [runTask-0] [my_search_index] [my_search_index][0] Primary shard starting final sync. Current segments: _0.cfe,_0.cfs,_0.si,segments_3,write.lock
[2024-11-17T13:47:44,747][INFO ][o.o.i.IndexService       ] [runTask-1] [my_search_index] [my_search_index][0] Primary shard starting final sync. Current segments: _0.cfe,_0.cfs,_0.si,segments_5,write.lock
[2024-11-17T13:47:44,724][INFO ][o.o.i.IndexService       ] [runTask-0] [my_search_index] [my_search_index][0] Primary shard sync completed, waiting for remote store sync
[2024-11-17T13:47:44,748][INFO ][o.o.i.IndexService       ] [runTask-1] [my_search_index] [my_search_index][0] Primary shard sync completed, waiting for remote store sync
[2024-11-17T13:47:44,724][INFO ][o.o.i.IndexService       ] [runTask-0] [my_search_index] Waiting for final sync to complete
[2024-11-17T13:47:44,748][INFO ][o.o.i.IndexService       ] [runTask-1] [my_search_index] Waiting for final sync to complete
[2024-11-17T13:47:44,726][INFO ][o.o.i.IndexService       ] [runTask-0] [my_search_index] [my_search_index][0] Primary shard final sync completed. Final segments: _0.cfe,_0.cfs,_0.si,segments_3,write.lock
[2024-11-17T13:47:44,748][INFO ][o.o.i.IndexService       ] [runTask-1] [my_search_index] [my_search_index][0] Primary shard final sync completed. Final segments: _0.cfe,_0.cfs,_0.si,segments_5,write.lock
[2024-11-17T13:47:44,738][INFO ][o.o.i.s.RemoteStoreRefreshListener] [runTask-0] [my_search_index][0] Shard is already closed. Not attempting sync to remote store
[2024-11-17T13:47:44,738][INFO ][o.o.i.s.RemoteStoreRefreshListener] [runTask-0] [my_search_index][0] Shard is already closed. Not attempting sync to remote store

The cluster health and cluster state and is also handled, notice the active_primary_shards and active_shards when "remove_indexing_shards.enabled": true.

curl -X GET "localhost:9200/_cluster/health?pretty"  
{
  "cluster_name" : "runTask",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "discovered_master" : true,
  "discovered_cluster_manager" : true,
  "active_primary_shards" : 0,
  "active_shards" : 1,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}
 curl -X GET "localhost:9200/_cluster/state?pretty&filter_path=routing_table.indices.my_search_index"
{
  "routing_table" : {
    "indices" : {
      "my_search_index" : {
        "shards" : {
          "0" : [
            {
              "state" : "STARTED",
              "primary" : false,
              "searchOnly" : true,
              "node" : "HSgKmAnOSAmHZ2d8uIi71A",
              "relocating_node" : null,
              "shard" : 0,
              "index" : "my_search_index",
              "allocation_id" : {
                "id" : "yR6xrvtnQkiK9CDcquBi_Q"
              }
            }
          ]
        }
      }
    }
  }
}

@mch2
Copy link
Member Author

mch2 commented Nov 18, 2024

thanks @prudhvigodithi for pushing this fwd, looking good so far! Feel free to raise a draft when ready and we can discuss there.

Since Primary shards are remove,d we cant scale up the search replicas, have a mechanism to scale the search replicas from existing remote store or from existing search replicas.

Lets prioritize the remote case. Users who want this separation likely won't be syncing from primary shards in the first place.

As far as your next steps:

Do one final sync before the primary shard is closed and the search replicas should refresh these segments.

This should be taken care of on primary close if we ensure the engine is flushed

indexShard.close(reason, flushEngine, deleted.get());

Search replicas should identify the "remove_indexing_shards.enabled": true and stop the continuous sync and should only resume once changed to false and primaries are up and running.

Yes, we'll need to ensure these shards have the latest checkpoint before stopping the async task in IndexService. We should store additional metadata with the store itself rather than putting more into cluster state.

@prudhvigodithi
Copy link
Member

This should be taken care of on primary close if we ensure the engine is flushed

Thanks @mch2, I have updated the method closeShard (coming from server/src/main/java/org/opensearch/index/IndexService.java) which calls indexShard.sync() and indexShard.waitForRemoteStoreSync() methods explicitly before the primary shards gets closed. Because in the logs I see as Shard is already closed. Not attempting sync to remote store.

AFAIK the last pending POC check is to stop the search replicas (or do a final sync) once remove_indexing_shards.enabled, I will look into this.

@mch2
Copy link
Member Author

mch2 commented Nov 18, 2024

Ah interesting, so in the normal index closure case we do not guarantee the docs have been indexed before closure only persisted via remote translog. We'll need to change this for the scale down case.

@prudhvigodithi
Copy link
Member

I took a stab at the cluster health in more details, I did a small refactoring of ClusterIndexHealth class.

When all green

curl -X GET "localhost:9200/_cat/shards/my_search_index?v&h=index,shard,prirep,state,unassigned.reason,node,searchOnly"
index           shard prirep state   unassigned.reason node
my_search_index 0     p      STARTED                   runTask-1
my_search_index 0     s      STARTED                   runTask-0
my_search_index 0     r      STARTED                   runTask-2

curl -X GET "localhost:9200/_cluster/health?level=shards&pretty"
{
  "cluster_name" : "runTask",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "discovered_master" : true,
  "discovered_cluster_manager" : true,
  "active_primary_shards" : 1,
  "active_shards" : 3,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0,
  "indices" : {
    "my_search_index" : {
      "status" : "green",
      "number_of_shards" : 1,
      "number_of_replicas" : 1,
      "active_primary_shards" : 1,
      "active_shards" : 3,
      "relocating_shards" : 0,
      "initializing_shards" : 0,
      "unassigned_shards" : 0,
      "shards" : {
        "0" : {
          "status" : "green",
          "primary_active" : true,
          "active_shards" : 3,
          "relocating_shards" : 0,
          "initializing_shards" : 0,
          "unassigned_shards" : 0
        }
      }
    }
  }
}

With scale down index (scale to zero)

curl -X GET "localhost:9200/_cat/shards/my_search_index?v&h=index,shard,prirep,state,unassigned.reason,node,searchOnly"

index           shard prirep state   unassigned.reason node
my_search_index 0     s      STARTED                   runTask-0

curl -X GET "localhost:9200/_cluster/health?level=shards&pretty"

{
  "cluster_name" : "runTask",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "discovered_master" : true,
  "discovered_cluster_manager" : true,
  "active_primary_shards" : 0,
  "active_shards" : 1,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0,
  "indices" : {
    "my_search_index" : {
      "status" : "green",
      "number_of_shards" : 1,
      "number_of_replicas" : 1,
      "active_primary_shards" : 0,
      "active_shards" : 1,
      "relocating_shards" : 0,
      "initializing_shards" : 0,
      "unassigned_shards" : 0,
      "shards" : {
        "0" : {
          "status" : "green",
          "primary_active" : false,
          "active_shards" : 1,
          "relocating_shards" : 0,
          "initializing_shards" : 0,
          "unassigned_shards" : 0
        }
      }
    }
  }
}

With unhealthy search replicas:

curl -X GET "localhost:9200/_cat/shards/my_search_index?v&h=index,shard,prirep,state,unassigned.reason,node,searchOnly"
index           shard prirep state      unassigned.reason node
my_search_index 0     s      UNASSIGNED CLUSTER_RECOVERED 

curl -X GET "localhost:9200/_cluster/health?level=shards&pretty"
{
  "cluster_name" : "runTask",
  "status" : "red",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "discovered_master" : true,
  "discovered_cluster_manager" : true,
  "active_primary_shards" : 0,
  "active_shards" : 0,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 1,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 0.0,
  "indices" : {
    "my_search_index" : {
      "status" : "red",
      "number_of_shards" : 1,
      "number_of_replicas" : 1,
      "active_primary_shards" : 0,
      "active_shards" : 0,
      "relocating_shards" : 0,
      "initializing_shards" : 0,
      "unassigned_shards" : 1,
      "shards" : {
        "0" : {
          "status" : "red",
          "primary_active" : false,
          "active_shards" : 0,
          "relocating_shards" : 0,
          "initializing_shards" : 0,
          "unassigned_shards" : 1
        }
      }
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Meta Meta issue, not directly linked to a PR Roadmap:Cost/Performance/Scale Project-wide roadmap label v2.17.0
Projects
Status: New
Status: Todo
Development

No branches or pull requests

3 participants