Skip to content

Commit

Permalink
Implement partition store restore-from-snapshot (#2353)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov authored Dec 6, 2024
1 parent 561679c commit 8c5ae2e
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 54 deletions.
46 changes: 27 additions & 19 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;

use rocksdb::ExportImportFilesMetaData;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
Expand Down Expand Up @@ -86,9 +86,11 @@ impl PartitionStoreManager {
})
}

pub async fn has_partition(&self, partition_id: PartitionId) -> bool {
let guard = self.lookup.lock().await;
guard.live.contains_key(&partition_id)
/// Check whether we have a partition store for the given partition id, irrespective of whether
/// the store is open or not.
pub async fn has_partition_store(&self, partition_id: PartitionId) -> bool {
let cf_name = cf_for_partition(partition_id);
self.rocksdb.inner().cf_handle(&cf_name).is_some()
}

pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option<PartitionStore> {
Expand Down Expand Up @@ -147,8 +149,7 @@ impl PartitionStoreManager {
let mut guard = self.lookup.lock().await;
if guard.live.contains_key(&partition_id) {
warn!(
?partition_id,
?snapshot,
%partition_id,
"The partition store is already open, refusing to import snapshot"
);
return Err(RocksError::AlreadyOpen);
Expand All @@ -158,32 +159,39 @@ impl PartitionStoreManager {
let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();
if cf_exists {
warn!(
?partition_id,
?cf_name,
?snapshot,
%partition_id,
%cf_name,
"The column family for partition already exists in the database, cannot import snapshot"
);
return Err(RocksError::ColumnFamilyExists);
}

if snapshot.key_range.start() > partition_key_range.start()
|| snapshot.key_range.end() < partition_key_range.end()
{
warn!(
%partition_id,
snapshot_range = ?snapshot.key_range,
partition_range = ?partition_key_range,
"The snapshot key range does not fully cover the partition key range"
);
return Err(RocksError::SnapshotKeyRangeMismatch);
}

let mut import_metadata = ExportImportFilesMetaData::default();
import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str());
import_metadata.set_files(&snapshot.files);

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
%partition_id,
min_lsn = %snapshot.min_applied_lsn,
path = ?snapshot.base_dir,
"Importing partition store snapshot"
);

if let Err(e) = self
.rocksdb
self.rocksdb
.import_cf(cf_name.clone(), opts, import_metadata)
.await
{
error!(?partition_id, "Failed to import snapshot");
return Err(e);
}
.await?;

assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some());
let partition_store = PartitionStore::new(
Expand Down
3 changes: 3 additions & 0 deletions crates/rocksdb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum RocksError {
#[error("already exists")]
#[code(unknown)]
ColumnFamilyExists,
#[error("invalid key range for partition")]
#[code(unknown)]
SnapshotKeyRangeMismatch,
#[error(transparent)]
#[code(unknown)]
Other(#[from] rocksdb::Error),
Expand Down
4 changes: 3 additions & 1 deletion crates/rocksdb/src/rock_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub trait RocksAccess {
default_cf_options: rocksdb::Options,
cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>,
) -> Result<(), RocksError>;
/// Create a column family from a snapshot. The data files referenced by
/// `metadata` will be moved into the RocksDB data directory.
fn import_cf(
&self,
name: CfName,
Expand Down Expand Up @@ -163,7 +165,7 @@ impl RocksAccess for rocksdb::DB {
let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?;

let mut import_opts = ImportColumnFamilyOptions::default();
import_opts.set_move_files(false); // keep the snapshot files intact
import_opts.set_move_files(true);

Ok(Self::create_column_family_with_import(
self,
Expand Down
3 changes: 2 additions & 1 deletion crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["io-util"] }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ulid = { workspace = true }
Expand Down
10 changes: 7 additions & 3 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,13 @@ impl Worker {
partition_store_manager.clone(),
router_builder,
bifrost,
SnapshotRepository::create_if_configured(snapshots_options)
.await
.map_err(BuildError::SnapshotRepository)?,
SnapshotRepository::create_if_configured(
snapshots_options,
config.common.base_dir().join("pp-snapshots"),
config.common.cluster_name().to_owned(),
)
.await
.map_err(BuildError::SnapshotRepository)?,
);

// handle RPCs
Expand Down
Loading

0 comments on commit 8c5ae2e

Please sign in to comment.