Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DESIGN] RFS Container HLD, rev1 #3

Open
chelma opened this issue Apr 18, 2024 · 0 comments
Open

[DESIGN] RFS Container HLD, rev1 #3

chelma opened this issue Apr 18, 2024 · 0 comments

Comments

@chelma
Copy link
Owner

chelma commented Apr 18, 2024

Objective

The purpose of this doc is to propose a specific design for work coordination amongst workers trying to perform a Reindex-from-Snapshot (RFS) operation. While it primarily focuses on a design viable for initial release, it does attempt to look ahead to possible future iterations. After reading the doc, you should understand the problem being solved and be able to critique the proposed design.

Terminology

  • Lucene: An Apache open source project that provides search across locally stored data
  • Lucene Document: A single data record, as understood by Lucene
  • Lucene Index: A collection of Lucene Documents
  • Elasticsearch: A software stack that can be used to search large data sets (multiple petabytes) by combining many Lucene instances into a single distributed system
  • OpenSearch: An open source fork of Elasticsearch primarily maintained by AWS
  • Elasticsearch Document: A single data record, as understood by Elasticsearch
  • Elasticsearch Index: A collection of Elasticsearch Documents all conforming to a shared specification; split across potentially many Elasticsearch Shards
  • Elasticsearch Shard (Shard): A subdivision of an Elasticsearch Index that corresponds to a unique Lucene Index and stores it’s portion of the Elasticsearch Index’s Elasticsearch Documents in that Lucene Index as Lucene Documents; provides an Elasticsearch-level abstraction around that Lucene Index.
  • Elasticsearch Cluster (Cluster): A collection of nodes, of various types, that store and provide access to some number of Elasticsearch Indexes
  • Elasticsearch Template (Template): A metadata setting on an Elasticsearch Cluster used to pre-populate, route, change, or otherwise enrich the Elasticsearch Documents stored in the Elasticsearch Cluster
  • Snapshot: A copy of the Elasticsearch Indexes and Elasticsearch Templates in an Elasticsearch Cluster that can be stored and then restored to stand up a new Elasticsearch Cluster with the data/metadata of the original
  • Indexing/Ingestion: The process of taking the raw Elasticsearch Document and converting it into a Lucene Document for storage in a Lucene Index

Background - General

Elasticsearch and OpenSearch are distributed software systems that provide the capability to search across large data sets (multiple PBs). Users need a way to easily move their data and metadata from one Elasticsearch/OpenSearch Cluster to another. A major complicating factor is that Lucene only supports backwards compatiblity for one major version; Lucene and Elasticsearch/OpenSearch major versions are linked for this reason. There are a few ways users currently perform this data movement.

One approach is to take a snapshot of the source cluster, then restore it on a new target cluster. This operation happens at the filesystem level and skips the overhead of the distributed system. Snapshot/restore works if the target cluster is the same major version (MV) or one higher than the source (MV + 1). However, before the data can be moved to a target cluster of MV + 2, all the data in the cluster must be re-indexed at MV + 1 to convert it into the correct Lucene format. Another approach is to use the bulk re-reindexing API on the source cluster to send all its Elasticsearch Documents to the target cluster. This happens at the Elasticsearch Index level. The target cluster converts the Elasticsearch Documents into Lucene Documents compatible with its Lucene major version. The faster this process happens, the more load the source cluster experiences. Additionally, the process is sensitive to individual source cluster nodes failing, and it carries the overhead of working via the distributed systems on both the source and target clusters.

Reindex-from-Snapshot (RFS) is a new, alternative approach proposed in further detail here [1]. The premise is to take a snapshot of the source cluster, split it into its component Elasticsearch Shards, and have a fleet of workers each responsible for extracting the Elasticsearch Documents from a single Shard and re-indexing them on the target cluster. This removes the strain on the source cluster while also bypassing the MV + 1 upgrade limit. While not enforced, there is a recommend best practice to limit Shards to 20-50 GB depending on use-case. This means we can some confidence that our unit of work (the Shard) is self-limiting in size. Because every Shard is a separate Lucene Index, the RFS design can fans out at the Shard-level. The work of migrating each Shard is completely independent of the other Shards, except for available ingestion capacity on the target cluster. For a 1 PB source cluster, assuming Shards averaging 50 GB, this means a full fan-out could leverage up to 20,000 workers in parallel.

The ultimate goal of RFS is to enable the movement of the data in a large (multiple petabytes) source cluster to a new target cluster with a better user experience than either of the existing solutions - assuming the user can’t just use snapshot/restore. Users whose target cluster is the same major version as the source, or just want to upgrade a single major version but have no intention of upgrading beyond that, should just use snapshot/restore. The source cluster may have thousands of Elasticsearch Indices and Shards. Achieving this ultimate goal means distributing the work of reading the snapshot and re-indexing on the target cluster across many (potentially hundreds or thousands) of workers.

[1] opensearch-project/OpenSearch#12667

Background - Snapshots

Elasticsearch Snapshots are a directory tree containing both data and metadata. Each Elasticsearch Index has its own sub-directory, and each Elasticsearch Shard has its own sub-directory under the directory of its parent Elasticsearch Index. The raw data for a given Elasticsearch Shard is store in its corresponding Shard sub-directory as a collection of Lucene files, which Elasticsearch obfuscates. Metadata files exist in the snapshot to provide details about the snapshot as a whole, the source cluster’s global metadata and settings, each index in the snapshot, and each shard in the snapshot.

Proposal, at a very high level

It is proposed that we split the responsibility of performing an RFS operation into two groups of actors. The first group of actors are the RFS Containers, which this design doc focuses on. The second group of actors are the RFS Controllers, whose design was briefly explored in prior design work. Based on that prior work, we lay out starting assumptions informing this split up (see: Appendix: Assumptions).

It is proposed that the RFS Containers will coordinate amongst themselves in a decentralized manner using the target cluster as the source-of-truth for the state of the overall operation. Each RFS Container is oblivious to the existence of any other RFS Container working on the same operation, except as expressed in changes to overall operation’s metadata stored on the target cluster. Each RFS Container is solely concerned with answering the question, “given the operation metadata in the source-of-truth, what should I do right now?” Any given RFS Container may perform every step in the overall process, some of them, or none of them. Any given RFS Container should be able to die at any point in its work, and have a new RFS Container resume that work gracefully. The steps in an RFS operation are:

  1. Take a snapshot of the source cluster
  2. Migrate the Elasticsearch Legacy Templates, Component Templates, and Index Templates
  3. Migrate the Elasticsearch Index settings and configuration
  4. Migrate the documents by retrieving each Elasticsearch Shard, unpacking it into a Lucene Index locally, and re-indexing its contents against the target cluster

RFS Controllers are expected to be a stateless task that runs periodically and looks at the metrics produced by RFS Containers to answer the question, “based on the metrics, how many RFS Containers should exist right now?”. It is also expected to reap any unhealthy RFS Containers that it finds. Further work is required to flesh out the design of the RFS Controllers, though we speculate in this doc’s design about some of the metrics/features they are likely to need.

Key RFS Container concepts

Global Data Store (GDS)

The Global Data Store (GDS) is the source of truth for the status of the overall Reindex-from-Snapshot operation. For the first iteration, it is proposed that the target Elasticsearch/Opensearch cluster be used for this purpose, but it could just as easily be a PostgreSQL instance, Dynamo DB, etc.

RFS Containers will query the GDS to infer what work they should next attempt, and update the Global Data Store with any progress they have made. RFS Containers will interact with the GDS without making assumptions about how many other RFS Containers are doing the same thing.

Features required of the GDS:

  • Atomic Creates: Will be used by the RFS Containers to ensure only one of them can successfully create a given record no matter how many make the attempt; the winner then assumes they can perform some associated work
  • Optimistic Locking: Will be used by the RFS Containers to ensure only one of them can successfully update a given record no matter how many make the attempt; the winner then assumes they can perform some associated work
  • Efficient Search: Will be used by RFS Containers to find available work items registered in the GDS

Features desired of the GDS:

  • Server-Side Timestamping/Clock: The RFS Containers should ideally be able to use the GDS’s clock when creating timestamps on work item records stored within the GDS, and in search operations to find expired timestamps

Work leases

An RFS Container will typically “acquire” a work item by either winning an atomic creation or an optimistic update on the GDS. When it does so, it will set a maximum duration for it to complete work on the item as a part of the create/update operation. Ideally, it will use the GDS’s clock to do this. The RFS Container is assumed to have a lease on that work item until that duration expires. If the work is not completed in that time, it will be assumed that the problem is with the RFS Container and it should be reaped.

As a specfic example, an RFS Container queries the GDS to find an Elasticsearch Shard to migrate to the target cluster. The GDS returns a record corresponding to a specific Elasticsearch Shard’s progress that either has not been started or has an expired work lease, and the RFS Container performs an optimistic update of its timestamp field, setting it (hypothetically) for 5 hours from the current time (according to the GDS’s clock).

RFS Containers will regularly poll the GDS to see if their current work item’s lease has expired (according to the GDS’s clock); if they find it has expired, they kill themselves and allow an outside system to spin up a replacement RFS Container. Similarly, the RFS Controller will check for expired work items and ensure that any RFS Containers associated with them have been reaped.

The process of finding the optimal initial work lease duration will be data driven based on actual usage statistics. It is expected that the initial “guess” will improve with experience.

Work lease backoff

When an RFS Container acquires a work item, it increments the number of attempts that have been made to finish it. The RFS Container increases its requested work lease duration based on the number of attempts. If the number of attempts passes a specified threshold, the RFS Container instead marks the work item as problematic so it won’t be picked up again.

The algorithm for backoff based on number of attempts and the maximum number of attempts to allow will both be data driven and expected to improve with experience.

Don’t touch existing templates or indices

While performing its work, if an RFS Container is tasked to create an Elasticsearch Template or Elasticsearch Index on the target cluster, but finds it already exists there, it will skip that creation. The reasoning for this policy is as follows.

First, creation of Elasticsearch Template and Elasticsearch Index is atomic. Second, it prevents re-work in the case that an RFS Container is picking up the partial work of another RFS Container that died. Third, it provides space for users to customize/configure the target cluster differently than the source cluster.

Overwrite documents by ID

While performing its work, if an RFS Container is tasked to create an Elasticsearch Document on the target cluster, it will do so by using the same ID as on the source cluster, clobbering any existing Elasticsearch Document on the target cluster with that ID. The reasoning for this policy is as follows.

The unit of work for an RFS Container migrating Elasticsearch Documents is an Elasticsearch Shard, not an Elasticsearch Document. If an RFS Container dies unexpectedly while moving the Elasticsearch Documents in an Elasticsearch Shard, it would be hard to reliably track exactly which had been successfully moved such that another RFS Container could resume at the correct position. We will instead simplify the initial design by just starting the Elasticsearch Shard over from the beginning and overwriting any partial work.

Phase metrics

Each RFS Container will emit metrics on a regular schedule indicating where it is in the process of performing the overall RFS operation, in addition to any other metrics/logs it might emit. These will be use used by the RFS Controller (and system operators) to assess the progress of the overall RFS operation. At each scheduled emission, RFS Containers will emit a value for each of the following metrics:

  • snapshot_creation_status
  • metadata_migration_status
  • index_migration_status
  • doc_migration_status

The metric value at each emission will be one of the following:

  • 0 - if the phase has not started
  • 1 - if the phase has started, but not completed
  • 2 - if the phase has completed
  • 3 - if an unrecoverable error has been encountered

Wait to be reaped

When an RFS Container finishes its work, either by detecting that all tasks in the overall RFS operation have been completed or finding that it can’t proceed any further due to an external failure state, it waits indefinitely to be reaped rather than shutting itself down. This is to prevent spinning in container management systems that automatically replace containers that terminate (such as Amazon ECS). As a specific example, if the RFS Container detects that the Snapshot that was kicked off on the source cluster has failed, there is nothing it can do to progress the overall RFS operation so it will report its finding and wait for an outside system to address the situation (including reaping the RFS Containers).

The exception is if the RFS Container suspects that there is a problem with itself. In that case, it will self-terminate. A specific example here would be that the RFS Container finds it has failed to complete its current work item within its lease window.

How the RFS Container will work

In this section, we describe in a high-level, narrative manner how the RFS Container will operate. Detailed state machine diagrams that outline the behavior more explicitly can be found at: (Appendix: RFS Container - Main Thread), (Appendix: RFS Container - Healthcheck Thread), and (Appendix: RFS Container - Metrics Thread). Those state diagrams are intended to be the source of truth, so favor their representation over the narrative description below if conflicts exist.

RFS Container threads

The RFS Container’s running process will have three running threads:

  • Main Thread - performs the work of moving the data/metadata from the source cluster to the target cluster; starts the Metrics and Healthcheck Threads
  • Healthcheck Thread - on a regular, scheduled basis will check the process’ shared state to determine which work item the RFS Container currently has a lease on (if any) and confirm the lease is still valid; if the lease has expired, it immediately kills the process and all threads
  • Metrics Thread - on a regular, scheduled basis will check the process’ shared state to determine which phase the RFS Container is operating in and emit its phase metrics

There are two pieces of state shared by the threads of the process, which the Main Thread is solely responsible for writing to. The Metrics Thread and Healthcheck Thread treat this shared state as read-only.

  • Container Phase - which phase the RFS Container is operating in
  • Work Item - which work item the RFS Container currently has a lease on, if any

Phase 0 - Container Start

On startup, the Main Thread initializes the process shared state and launches the Healthcheck Thread and Metrics Thread.

Phase 1 - Snapshot Creation

The Main Thread queries the GDS to see if a Snapshot of the source cluster has already been created. If so, we proceed to the next phase.

If not, we update the process shared state to indicate we’re creating a snapshot. We attempt to create a record in the GDS to track the Snapshot’s creation status (if it doesn’t already exist), we attempt to kick off the creation of a Snapshot on the source cluster (if not already done), then periodically poll the source cluster for the Snapshot’s status until it either completes or fails. If we see the Snapshot complete, we attempt to update the GDS’s tracking record and proceed to the next phase. If the Snapshot fails, we update the GDS to indicate a phase failure, emit an event reporting the unrecoverable nature of the situation (from the RFS Container’s perspective), stop additional work, and wait to be reaped.

When creating the Snapshot on the source cluster, we use a consistent and deterministic name that all RFS Containers will be able to construct based upon the information available to them within their runtime environment. This means that every RFS Container will know which Snapshot to poll the status of on the source cluster regardless of whether it was the one to kick off its creation.

Phase 2 - Cluster Metadata Migration

The Main Thread queries the GDS to see if a Cluster Metadata of the source cluster has already been migrated. If so, we proceed to the next phase.

If not, we update the process shared state to indicate we’re migrating that metadata. We enter a loop, where we first check the GDS for the current status of the Cluster Metadata migration and then attempt to do some work if we can, and continue until the migration is finished or failed unrecoverably. Each RFS Container attempts to acquire a lease on the work to migrate the Cluster Metadata. The RFS Container that wins the lease downloads the Cluster Metadata file from the Snapshot, then attempts to migrate (in order) the legacy, composite, and index Templates present in it to the target cluster - as long as the given Template matches the user whitelist and isn’t already present on the target cluster. Once every Template has been processed, the RFS Container updates the GDS’s status for the Cluster Metadata Migration to be complete. RFS Containers that fail to win the lease or find that another RFS Container has the lease wait a short, random time before checking again. RFS Containers exit the loop when they discover the Cluster Metadata Migration has been completed or failed.

The work lease for this phase is on the entire Cluster Metadata migration (all Templates, of all types). If something in that process fails enough times, we update the GDS to indicate a phase failure, emit an event reporting the unrecoverable nature of the situation (from the RFS Container’s perspective), stop additional work, and wait to be reaped.

Phase 3 - Index Migration

The Main Thread queries the GDS to see if the Elasticsearch Indices on the source cluster have already been migrated. If so, we proceed to the next phase.

If not, we update the process shared state to indicate we’re migrating those Indices. We then enter a loop to progress through two sub-phases. At the beginning of each iteration, we check the GDS for the state of the Index Migration.

3.1 - Setup

The goal of this sub-phase is to create records (Index Work Entries) in the GDS to track the migration of each Elasticsearch Index. We exit the sub-phase when we find this has been completed, or that it has failed unrecoverably.

Each RFS Container attempts to acquire a lease on the work to create these individual, Index-specific records. The RFS Container that wins the lease downloads the Snapshot Metadata file from the Snapshot, then creates an Index Work Entry for each Index in the GDS (if it does not already exist). The Index Work Entry contains Index’s name, migrations status, and the number of shards that Index has. Once every Index Work Entry exists in the the GDS, it updates the GDS to indicate this sub-phase is completed. RFS Containers that fail to win the lease or find that another RFS Container has the lease wait a short, random time before returning the beginning of the loop.

The work lease for this phase is on the entire setup process (ensuring a record exists for every Index in the Snapshot). If something in that process fails enough times, we update the GDS to indicate a phase failure, emit an event reporting the unrecoverable nature of the situation (from the RFS Container’s perspective), stop additional work, and wait to be reaped .

3.2 - Migrate the indices

The goal of this sub-phase is to migrate all the Elasticsearch Indices from the source cluster to the target cluster. This means creating a corresponding Index on the target cluster for each we find on the source cluster. We exit the sub-phase when we find every Index has been processed (successfully, or unsuccessfully).

Each RFS Container asks the GDS for a random set of Index Work Entry that have not been processed yet. For each Index Work Entry returned, we retrieve from the Snapshot the corresponding Index Metadata file, use the information in that file to attempt to create the Index on the target cluster if it’s not already there, and attempt to update the Index Work Entry to be completed if it hasn’t already been updated. If we fail to create the Index on the target cluster, we attempt to increment the Index Work Entry’s number of attempts. If we find that the number of attempts is over a specified threshold, we attempt to mark the Index Work Entry’s status to be failed.

If no Entries are returned, we know that this sub-phase is complete and attempt to update the GDS’s status for the Index Migration phase to be complete.

It’s important to point out that we don’t attempt to ensure that each Index is processed only once; we instead rely on our concept of not touching existing templates or indices and optimistic locking on the Index Work Entry in the GDS to ensure that at least one RFS Containers will process each Index to completion without anything disastrous happening. The work required for each Index is small, so there is little cost to having multiple RFS Containers attempt the same Index occasionally.

Phase 4 - Document Migration

The Main Thread queries the GDS to see if the Elasticsearch Documents on the source cluster have already been migrated. If so, we proceed to the next phase.

If not, we update the process shared state to indicate we’re migrating those Documents. We then enter a loop to progress through two sub-phases. At the beginning of each iteration, we check the GDS for the state of the Document Migration.

4.1 - Setup

Similar to (3.1 - Setup) in the Index Migration phase, the goal of this sub-phase is to create records (Shard Work Entries) in the GDS to track the migration of each Shard. We exit the sub-phase when we find this has been completed, or that it has failed unrecoverably.

The process is the same as (3.1 - Setup), with the exception that the RFS Container that wins the lease to do the setup work will use the Index Work Entries to determine the number and details of the Shard Work Entries to be created. For each Index Work Entry that was successfully migrated to target cluster, we check the number of shards set in the Index Work Entry and create a Shard Work Entry for each. The Shard Work Entry will contain the Index name, shard number, and migration status.

4.2 - Migrate the documents

The goal of this sub-phase is to migrate all the Elasticsearch Documents from the source cluster to the target cluster. This means recreating all the Documents in the source cluster onto the target cluster. We exit the sub-phase when we find every Shard has been processed (successfully, or unsuccessfully).

Each RFS Container asks the GDS for a single, random Shard Work Entry that either has not been started or has an expired work lease. When then attempt to acquire the lease on that Entry. If we fail, we ask the GDS for another Entry. If we succeed, we check the number of times this Entry has been attempted. If over the threshold, we mark it as failed if not already done so and skip it. If not over the threshold, we download the Shard’s files from the Snapshot, unpack them, read them as a Lucene Index, extract the original Elasticsearch Documents from them, and HTTP PUT those Documents against the target cluster. When performing the PUT, we use the Document’s original ID from the source cluster and overwrite any Document on the target cluster’s corresponding Elasticsearch Index that has the same ID. Once all Documents in the Shard have been processed, we mark the Shard Work Entry as completed and ask for another entry.

If no Entry is returned, we know that this sub-phase is complete and attempt to update the GDS’s status for the Documents Migration phase to be complete.

The work lease for this sub-phase is on the Shard (ensuring every Elasticsearch Document in that Shard has been processed). We will log/emit metrics to indicate how many Documents are successfully and unsuccessfully migrated but we don’t consider the Shard Work Entry to have failed if some (or even all) of the Documents in it are unsuccessfully migrated. We only retry the Shard Work Entry when an RFS Container fails to process every Document within the lease window. These retries are relatively time consuming, but safe because we overwrite any partial work performed by a previous RFS Container.

Appendix: Assumptions

Per convo w/ planned integration partners, there are some starting constraints placed on our design. Changes to these assumptions would likely have a substantial impact on the design.

  • (A1) - The RFS Containers will cannot assume access to a data store other than the migration’s target cluster as a state-store for coordinating their work.
  • (A2) - The RFS Container will perform all the work required to complete a historical migration.
  • (A3) - The RFS Controller will only need to scale one type of Docker composition, which may be composed of multiple Containers, one of which is the RFS Container
  • (A4) - The RFS Controller will only have access to metrics from the RFS Containers in order to decide how many RFS Containers to have in existence.
  • (A5) - The RFS Controller will listen for events that indicate an exceptional condition occur and remedial action is required
  • (A6) - The RFS Container will emit logs for the customer, and logs for the operator

Appendix: RFS Container - Main Thread

Here is a state machine diagram for the RFS Container’s main thread:

RFS_Container_State_Machine_rev2_main

Appendix: RFS Container - Healthcheck Thread

Here is a state machine diagram for the RFS Container’s healthcheck thread:

RFS_Container_State_Machine_rev2_health

##Appendix: RFS Container - Metrics Thread

Here is a state machine diagram for the RFS Container’s metrics thread:

RFS_Container_State_Machine_rev2_metrics

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant