From 039b217049ad462bb755eb174a62fd5dcd00dc3a Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Sat, 21 Sep 2024 07:45:32 +0200 Subject: [PATCH] Introduce partition store snapshots This commit adds the foundational support to export and import partition store snapshots. --- Cargo.lock | 7 +- Cargo.toml | 2 +- crates/partition-store/Cargo.toml | 15 +-- crates/partition-store/src/lib.rs | 1 + crates/partition-store/src/partition_store.rs | 35 ++++++ .../src/partition_store_manager.rs | 90 ++++++++++++++-- crates/partition-store/src/snapshots.rs | 101 ++++++++++++++++++ .../partition-store/tests/integration_test.rs | 24 +++-- .../tests/snapshots_test/mod.rs | 81 ++++++++++++++ crates/rocksdb/src/background.rs | 2 + crates/rocksdb/src/error.rs | 3 + crates/rocksdb/src/lib.rs | 58 +++++++++- crates/rocksdb/src/rock_access.rs | 31 +++++- crates/storage-api/src/lib.rs | 2 +- crates/types/src/id_util.rs | 1 + crates/types/src/identifiers.rs | 78 ++++++++++++++ 16 files changed, 502 insertions(+), 29 deletions(-) create mode 100644 crates/partition-store/src/snapshots.rs create mode 100644 crates/partition-store/tests/snapshots_test/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c56e65a35a..ff78f0a5f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6016,6 +6016,7 @@ dependencies = [ "futures", "futures-util", "googletest", + "humantime", "num-bigint", "once_cell", "paste", @@ -6030,6 +6031,8 @@ dependencies = [ "rust-rocksdb", "schemars", "serde", + "serde_json", + "serde_with", "static_assertions", "strum 0.26.2", "sync_wrapper 1.0.1", @@ -6596,7 +6599,7 @@ dependencies = [ [[package]] name = "rust-librocksdb-sys" version = "0.25.0+9.5.2" -source = "git+https://github.com/restatedev/rust-rocksdb?rev=c6a279a40416cb47bbf576ffb190523d55818073#c6a279a40416cb47bbf576ffb190523d55818073" +source = "git+https://github.com/restatedev/rust-rocksdb?rev=8f832b7e742e0d826fb9fed05a62e4bd747969bf#8f832b7e742e0d826fb9fed05a62e4bd747969bf" dependencies = [ "bindgen", "bzip2-sys", @@ -6612,7 +6615,7 @@ dependencies = [ [[package]] name = "rust-rocksdb" version = "0.29.0" -source = "git+https://github.com/restatedev/rust-rocksdb?rev=c6a279a40416cb47bbf576ffb190523d55818073#c6a279a40416cb47bbf576ffb190523d55818073" +source = "git+https://github.com/restatedev/rust-rocksdb?rev=8f832b7e742e0d826fb9fed05a62e4bd747969bf#8f832b7e742e0d826fb9fed05a62e4bd747969bf" dependencies = [ "libc", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index d50c6987df..e6ea05deba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ reqwest = { version = "0.12.5", default-features = false, features = [ "stream", ] } rlimit = { version = "0.10.1" } -rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "c6a279a40416cb47bbf576ffb190523d55818073" } +rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "8f832b7e742e0d826fb9fed05a62e4bd747969bf" } rustls = { version = "0.23.11", default-features = false, features = ["ring"] } schemars = { version = "0.8", features = ["bytes", "enumset"] } serde = { version = "1.0", features = ["derive"] } diff --git a/crates/partition-store/Cargo.toml b/crates/partition-store/Cargo.toml index 411842b1dd..54136948e4 100644 --- a/crates/partition-store/Cargo.toml +++ b/crates/partition-store/Cargo.toml @@ -12,12 +12,6 @@ default = [] options_schema = ["dep:schemars"] [dependencies] -restate-core = { workspace = true } -restate-errors = { workspace = true } -restate-rocksdb = { workspace = true } -restate-storage-api = { workspace = true } -restate-types = { workspace = true } - anyhow = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } @@ -27,12 +21,20 @@ derive_more = { workspace = true } enum-map = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +humantime = { workspace = true } once_cell = { workspace = true } paste = { workspace = true } prost = { workspace = true } +restate-core = { workspace = true } +restate-errors = { workspace = true } +restate-rocksdb = { workspace = true } +restate-storage-api = { workspace = true } +restate-types = { workspace = true } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +serde_json = { workspace = true } +serde_with = { workspace = true, features = ["hex"] } static_assertions = { workspace = true } strum = { workspace = true } sync_wrapper = { workspace = true } @@ -51,6 +53,7 @@ criterion = { workspace = true, features = ["async_tokio"] } googletest = { workspace = true } num-bigint = "0.4" rand = { workspace = true } +serde_json = { workspace = true } tempfile = { workspace = true } [[bench]] diff --git a/crates/partition-store/src/lib.rs b/crates/partition-store/src/lib.rs index 01430f0a8d..6d6fd640b4 100644 --- a/crates/partition-store/src/lib.rs +++ b/crates/partition-store/src/lib.rs @@ -22,6 +22,7 @@ mod partition_store_manager; pub mod promise_table; pub mod scan; pub mod service_status_table; +pub mod snapshots; pub mod state_table; pub mod timer_table; diff --git a/crates/partition-store/src/partition_store.rs b/crates/partition-store/src/partition_store.rs index bc94f9b8e5..171342d88e 100644 --- a/crates/partition-store/src/partition_store.rs +++ b/crates/partition-store/src/partition_store.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::ops::RangeInclusive; +use std::path::PathBuf; use std::slice; use std::sync::Arc; @@ -18,6 +19,7 @@ use codederror::CodedError; use restate_rocksdb::CfName; use restate_rocksdb::IoMode; use restate_rocksdb::Priority; +use restate_storage_api::fsm_table::ReadOnlyFsmTable; use restate_types::config::Configuration; use rocksdb::DBCompressionType; use rocksdb::DBPinnableSlice; @@ -39,6 +41,7 @@ use crate::keys::KeyKind; use crate::keys::TableKey; use crate::scan::PhysicalScan; use crate::scan::TableScan; +use crate::snapshots::LocalPartitionSnapshot; pub type DB = rocksdb::DB; @@ -417,6 +420,38 @@ impl PartitionStore { .map_err(|err| StorageError::Generic(err.into()))?; Ok(()) } + + /// Creates a snapshot of the partition in the given directory, which must not exist prior to + /// the export. The snapshot is atomic and contains, at a minimum, the reported applied LSN. + /// Additional log records may have been applied between when the LSN was read, and when the + /// snapshot was actually created. The actual snapshot applied LSN will always be equal to, or + /// greater than, the reported applied LSN. + /// + /// *NB:* Creating a snapshot causes an implicit flush of the column family! + /// + /// See [rocksdb::checkpoint::Checkpoint::export_column_family] for additional implementation details. + pub async fn create_snapshot( + &mut self, + snapshot_dir: PathBuf, + ) -> Result { + let applied_lsn = self + .get_applied_lsn() + .await? + .ok_or(StorageError::DataIntegrityError)?; + + let metadata = self + .rocksdb + .export_cf(self.data_cf_name.clone(), snapshot_dir.clone()) + .await + .map_err(|err| StorageError::Generic(err.into()))?; + + Ok(LocalPartitionSnapshot { + base_dir: snapshot_dir, + files: metadata.get_files(), + db_comparator_name: metadata.get_db_comparator_name(), + min_applied_lsn: applied_lsn, + }) + } } fn find_cf_handle<'a>( diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index a529e916c2..9ef2df0e2b 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -12,20 +12,19 @@ use std::collections::BTreeMap; use std::ops::RangeInclusive; use std::sync::Arc; -use restate_types::live::BoxedLiveLoad; +use rocksdb::ExportImportFilesMetaData; use tokio::sync::Mutex; -use tracing::debug; +use tracing::{debug, error, info, warn}; use restate_rocksdb::{ CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError, }; -use restate_types::config::RocksDbOptions; -use restate_types::config::StorageOptions; -use restate_types::identifiers::PartitionId; -use restate_types::identifiers::PartitionKey; -use restate_types::live::LiveLoad; +use restate_types::config::{RocksDbOptions, StorageOptions}; +use restate_types::identifiers::{PartitionId, PartitionKey}; +use restate_types::live::{BoxedLiveLoad, LiveLoad}; use crate::cf_options; +use crate::snapshots::LocalPartitionSnapshot; use crate::PartitionStore; use crate::DB; @@ -56,7 +55,7 @@ impl PartitionStoreManager { mut storage_opts: impl LiveLoad + Send + 'static, updateable_opts: BoxedLiveLoad, initial_partition_set: &[(PartitionId, RangeInclusive)], - ) -> std::result::Result { + ) -> Result { let options = storage_opts.live_load(); let per_partition_memory_budget = options.rocksdb_memory_budget() @@ -102,7 +101,7 @@ impl PartitionStoreManager { partition_key_range: RangeInclusive, open_mode: OpenMode, opts: &RocksDbOptions, - ) -> std::result::Result { + ) -> Result { let mut guard = self.lookup.lock().await; if let Some(store) = guard.live.get(&partition_id) { return Ok(store.clone()); @@ -130,6 +129,79 @@ impl PartitionStoreManager { Ok(partition_store) } + + /// Imports a partition snapshot and opens it as a partition store. + /// The database must not have an existing column family for the partition id; + /// it will be created based on the supplied snapshot. + pub async fn restore_partition_store_snapshot( + &self, + partition_id: PartitionId, + partition_key_range: RangeInclusive, + snapshot: LocalPartitionSnapshot, + opts: &RocksDbOptions, + ) -> Result { + let mut guard = self.lookup.lock().await; + if guard.live.contains_key(&partition_id) { + warn!( + ?partition_id, + ?snapshot, + "The partition store is already open, refusing to import snapshot" + ); + return Err(RocksError::AlreadyOpen); + } + + let cf_name = cf_for_partition(partition_id); + let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some(); + if cf_exists { + warn!( + ?partition_id, + ?cf_name, + ?snapshot, + "The column family for partition already exists in the database, cannot import snapshot" + ); + return Err(RocksError::ColumnFamilyExists); + } + + let mut import_metadata = ExportImportFilesMetaData::default(); + import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str()); + import_metadata.set_files(&snapshot.files); + + info!( + ?partition_id, + min_applied_lsn = ?snapshot.min_applied_lsn, + "Initializing partition store from snapshot" + ); + + if let Err(e) = self + .rocksdb + .import_cf(cf_name.clone(), opts, import_metadata) + .await + { + error!(?partition_id, "Failed to import snapshot"); + return Err(e); + } + + assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some()); + let partition_store = PartitionStore::new( + self.raw_db.clone(), + self.rocksdb.clone(), + cf_name, + partition_id, + partition_key_range, + ); + guard.live.insert(partition_id, partition_store.clone()); + + Ok(partition_store) + } + + pub async fn drop_partition(&self, partition_id: PartitionId) { + let mut guard = self.lookup.lock().await; + self.raw_db + .drop_cf(&cf_for_partition(partition_id)) + .unwrap(); + + guard.live.remove(&partition_id); + } } fn cf_for_partition(partition_id: PartitionId) -> CfName { diff --git a/crates/partition-store/src/snapshots.rs b/crates/partition-store/src/snapshots.rs new file mode 100644 index 0000000000..8b715446e6 --- /dev/null +++ b/crates/partition-store/src/snapshots.rs @@ -0,0 +1,101 @@ +use std::ops::RangeInclusive; +use std::path::PathBuf; + +use rocksdb::LiveFile; +use serde::{Deserialize, Serialize}; +use serde_with::hex::Hex; +use serde_with::{serde_as, DeserializeAs, SerializeAs}; + +use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId}; +use restate_types::logs::Lsn; + +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)] +pub enum SnapshotFormatVersion { + #[default] + V1, +} + +/// A partition store snapshot. +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct PartitionSnapshotMetadata { + pub version: SnapshotFormatVersion, + + /// Restate cluster name which produced the snapshot. + pub cluster_name: String, + + /// Restate partition id. + pub partition_id: PartitionId, + + /// Node that produced this snapshot. + pub node_name: String, + + /// Local node time when the snapshot was created. + #[serde(with = "serde_with::As::")] + pub created_at: humantime::Timestamp, + + /// Snapshot id. + pub snapshot_id: SnapshotId, + + /// The partition key range that the partition processor which generated this snapshot was + /// responsible for, at the time the snapshot was generated. + pub key_range: RangeInclusive, + + /// The minimum LSN guaranteed to be applied in this snapshot. The actual + /// LSN may be >= [minimum_lsn]. + pub min_applied_lsn: Lsn, + + /// The RocksDB comparator name used by the partition processor which generated this snapshot. + pub db_comparator_name: String, + + /// The RocksDB SST files comprising the snapshot. + #[serde_as(as = "Vec")] + pub files: Vec, +} + +/// A locally-stored partition snapshot. +#[derive(Debug)] +pub struct LocalPartitionSnapshot { + pub base_dir: PathBuf, + pub min_applied_lsn: Lsn, + pub db_comparator_name: String, + pub files: Vec, +} + +/// RocksDB SST file that is part of a snapshot. Serialization wrapper around [LiveFile]. +#[serde_as] +#[derive(Serialize, Deserialize)] +#[serde(remote = "LiveFile")] +pub struct SnapshotSstFile { + pub column_family_name: String, + pub name: String, + pub directory: String, + pub size: usize, + pub level: i32, + #[serde_as(as = "Option")] + pub start_key: Option>, + #[serde_as(as = "Option")] + pub end_key: Option>, + pub smallest_seqno: u64, + pub largest_seqno: u64, + pub num_entries: u64, + pub num_deletions: u64, +} + +impl SerializeAs for SnapshotSstFile { + fn serialize_as(value: &LiveFile, serializer: S) -> Result + where + S: serde::Serializer, + { + SnapshotSstFile::serialize(value, serializer) + } +} + +impl<'de> DeserializeAs<'de, LiveFile> for SnapshotSstFile { + fn deserialize_as(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + SnapshotSstFile::deserialize(deserializer) + } +} diff --git a/crates/partition-store/tests/integration_test.rs b/crates/partition-store/tests/integration_test.rs index b1a3602d5c..f94f3fac56 100644 --- a/crates/partition-store/tests/integration_test.rs +++ b/crates/partition-store/tests/integration_test.rs @@ -32,11 +32,16 @@ mod invocation_status_table_test; mod journal_table_test; mod outbox_table_test; mod promise_table_test; +mod snapshots_test; mod state_table_test; mod timer_table_test; mod virtual_object_status_table_test; async fn storage_test_environment() -> PartitionStore { + storage_test_environment_with_manager().await.1 +} + +async fn storage_test_environment_with_manager() -> (PartitionStoreManager, PartitionStore) { // // create a rocksdb storage from options // @@ -57,7 +62,7 @@ async fn storage_test_environment() -> PartitionStore { .await .expect("DB storage creation succeeds"); // A single partition store that spans all keys. - manager + let store = manager .open_partition_store( PartitionId::MIN, RangeInclusive::new(0, PartitionKey::MAX - 1), @@ -65,21 +70,24 @@ async fn storage_test_environment() -> PartitionStore { &worker_options.pinned().storage.rocksdb, ) .await - .expect("DB storage creation succeeds") + .expect("DB storage creation succeeds"); + + (manager, store) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_read_write() { - let rocksdb = storage_test_environment().await; + let (manager, store) = storage_test_environment_with_manager().await; // // run the tests // - inbox_table_test::run_tests(rocksdb.clone()).await; - outbox_table_test::run_tests(rocksdb.clone()).await; - state_table_test::run_tests(rocksdb.clone()).await; - virtual_object_status_table_test::run_tests(rocksdb.clone()).await; - timer_table_test::run_tests(rocksdb).await; + inbox_table_test::run_tests(store.clone()).await; + outbox_table_test::run_tests(store.clone()).await; + state_table_test::run_tests(store.clone()).await; + virtual_object_status_table_test::run_tests(store.clone()).await; + timer_table_test::run_tests(store.clone()).await; + snapshots_test::run_tests(manager.clone(), store.clone()).await; } pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocation { diff --git a/crates/partition-store/tests/snapshots_test/mod.rs b/crates/partition-store/tests/snapshots_test/mod.rs new file mode 100644 index 0000000000..3cf76db55f --- /dev/null +++ b/crates/partition-store/tests/snapshots_test/mod.rs @@ -0,0 +1,81 @@ +use std::ops::RangeInclusive; +use std::time::SystemTime; +use tempfile::tempdir; + +use restate_partition_store::snapshots::{ + LocalPartitionSnapshot, PartitionSnapshotMetadata, SnapshotFormatVersion, +}; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; +use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable}; +use restate_storage_api::Transaction; +use restate_types::config::WorkerOptions; +use restate_types::identifiers::{PartitionKey, SnapshotId}; +use restate_types::live::Live; +use restate_types::logs::Lsn; +use restate_types::time::MillisSinceEpoch; + +pub(crate) async fn run_tests(manager: PartitionStoreManager, mut partition_store: PartitionStore) { + insert_test_data(&mut partition_store).await; + + let snapshots_dir = tempdir().unwrap(); + + let partition_id = partition_store.partition_id(); + let path_buf = snapshots_dir.path().to_path_buf().join("sn1"); + + let snapshot = partition_store.create_snapshot(path_buf).await.unwrap(); + + let snapshot_meta = PartitionSnapshotMetadata { + version: SnapshotFormatVersion::V1, + cluster_name: "cluster_name".to_string(), + partition_id, + node_name: "node".to_string(), + created_at: humantime::Timestamp::from(SystemTime::from(MillisSinceEpoch::new(0))), + snapshot_id: SnapshotId::from_parts(0, 0), + key_range: partition_store.partition_key_range().clone(), + min_applied_lsn: snapshot.min_applied_lsn, + db_comparator_name: snapshot.db_comparator_name.clone(), + files: snapshot.files.clone(), + }; + let metadata_json = serde_json::to_string_pretty(&snapshot_meta).unwrap(); + + drop(partition_store); + drop(snapshot); + + manager.drop_partition(partition_id).await; + + let snapshot_meta: PartitionSnapshotMetadata = serde_json::from_str(&metadata_json).unwrap(); + + let snapshot = LocalPartitionSnapshot { + base_dir: snapshots_dir.path().into(), + min_applied_lsn: snapshot_meta.min_applied_lsn, + db_comparator_name: snapshot_meta.db_comparator_name.clone(), + files: snapshot_meta.files.clone(), + }; + + let worker_options = Live::from_value(WorkerOptions::default()); + + let mut new_partition_store = manager + .restore_partition_store_snapshot( + partition_id, + RangeInclusive::new(0, PartitionKey::MAX - 1), + snapshot, + &worker_options.pinned().storage.rocksdb, + ) + .await + .unwrap(); + + verify_restored_data(&mut new_partition_store).await; +} + +async fn insert_test_data(partition: &mut PartitionStore) { + let mut txn = partition.transaction(); + txn.put_applied_lsn(Lsn::new(100)).await; + txn.commit().await.expect("commit succeeds"); +} + +async fn verify_restored_data(partition: &mut PartitionStore) { + assert_eq!( + Lsn::new(100), + partition.get_applied_lsn().await.unwrap().unwrap() + ); +} diff --git a/crates/rocksdb/src/background.rs b/crates/rocksdb/src/background.rs index 877a09852e..ec590b27de 100644 --- a/crates/rocksdb/src/background.rs +++ b/crates/rocksdb/src/background.rs @@ -24,6 +24,8 @@ use crate::{Priority, OP_TYPE, PRIORITY, STORAGE_BG_TASK_IN_FLIGHT}; pub enum StorageTaskKind { WriteBatch, OpenColumnFamily, + ImportColumnFamily, + ExportColumnFamily, FlushWal, FlushMemtables, Shutdown, diff --git a/crates/rocksdb/src/error.rs b/crates/rocksdb/src/error.rs index ff3d4bb1aa..4c24d8d5ea 100644 --- a/crates/rocksdb/src/error.rs +++ b/crates/rocksdb/src/error.rs @@ -27,6 +27,9 @@ pub enum RocksError { #[error("already open")] #[code(unknown)] AlreadyOpen, + #[error("already exists")] + #[code(unknown)] + ColumnFamilyExists, #[error(transparent)] #[code(unknown)] Other(#[from] rocksdb::Error), diff --git a/crates/rocksdb/src/lib.rs b/crates/rocksdb/src/lib.rs index 9653bf4105..790f7f31dc 100644 --- a/crates/rocksdb/src/lib.rs +++ b/crates/rocksdb/src/lib.rs @@ -30,11 +30,12 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use rocksdb::checkpoint::Checkpoint; use rocksdb::statistics::Histogram; use rocksdb::statistics::HistogramData; use rocksdb::statistics::Ticker; +use rocksdb::ExportImportFilesMetaData; -use self::background::ReadyStorageTask; // re-exports pub use self::db_manager::RocksDbManager; pub use self::db_spec::*; @@ -42,6 +43,7 @@ pub use self::error::*; pub use self::perf::RocksDbPerfGuard; pub use self::rock_access::RocksAccess; +use self::background::ReadyStorageTask; use self::background::StorageTask; use self::background::StorageTaskKind; use self::metric_definitions::*; @@ -353,6 +355,60 @@ impl RocksDb { self.manager.async_spawn(task).await? } + #[tracing::instrument(skip_all, fields(db = %self.name))] + pub async fn import_cf( + &self, + name: CfName, + opts: &RocksDbOptions, + metadata: ExportImportFilesMetaData, + ) -> Result<(), RocksError> { + let default_cf_options = self.manager.default_cf_options(opts); + let cf_patterns = self.cf_patterns.clone(); + let db = self.db.clone(); + let task = StorageTask::default() + .kind(StorageTaskKind::ImportColumnFamily) + .priority(Priority::Low) + .op(move || { + let _x = RocksDbPerfGuard::new("import-column-family"); + db.import_cf(name, default_cf_options, cf_patterns, metadata) + }) + .build() + .unwrap(); + + self.manager.async_spawn(task).await? + } + + #[tracing::instrument(skip_all, fields(db = %self.name))] + pub async fn export_cf( + &self, + name: CfName, + export_dir: PathBuf, + ) -> Result { + let db = self.db.clone(); + let task = StorageTask::default() + .kind(StorageTaskKind::ExportColumnFamily) + .priority(Priority::Low) + .op(move || { + let _x = RocksDbPerfGuard::new("export-column-family"); + + let checkpoint = Checkpoint::new(db.as_raw_db()).unwrap(); + + let data_cf_handle = db + .cf_handle(name.as_str()) + .ok_or_else(|| RocksError::UnknownColumnFamily(name.clone()))?; + + let metadata = checkpoint + .export_column_family(&data_cf_handle, export_dir.as_path()) + .map_err(RocksError::Other)?; + + Ok(metadata) + }) + .build() + .unwrap(); + + self.manager.async_spawn(task).await? + } + #[tracing::instrument(skip_all, fields(db = %self.name))] pub async fn shutdown(self: Arc) { let manager = self.manager; diff --git a/crates/rocksdb/src/rock_access.rs b/crates/rocksdb/src/rock_access.rs index fcdc3aba6d..446ebbe390 100644 --- a/crates/rocksdb/src/rock_access.rs +++ b/crates/rocksdb/src/rock_access.rs @@ -12,7 +12,8 @@ use std::collections::HashSet; use std::sync::Arc; use rocksdb::perf::MemoryUsageBuilder; -use rocksdb::ColumnFamilyDescriptor; +use rocksdb::ExportImportFilesMetaData; +use rocksdb::{ColumnFamilyDescriptor, ImportColumnFamilyOptions}; use tracing::trace; use crate::BoxedCfMatcher; @@ -45,6 +46,13 @@ pub trait RocksAccess { default_cf_options: rocksdb::Options, cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>, ) -> Result<(), RocksError>; + fn import_cf( + &self, + name: CfName, + default_cf_options: rocksdb::Options, + cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>, + metadata: ExportImportFilesMetaData, + ) -> Result<(), RocksError>; fn cfs(&self) -> Vec; fn write_batch( @@ -145,6 +153,27 @@ impl RocksAccess for rocksdb::DB { Ok(Self::create_cf(self, name.as_str(), &options)?) } + fn import_cf( + &self, + name: CfName, + default_cf_options: rocksdb::Options, + cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>, + metadata: ExportImportFilesMetaData, + ) -> Result<(), RocksError> { + let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?; + + let mut import_opts = ImportColumnFamilyOptions::default(); + import_opts.set_move_files(true); + + Ok(Self::create_column_family_with_import( + self, + &options, + name.as_str(), + &import_opts, + &metadata, + )?) + } + fn flush_memtables(&self, cfs: &[CfName], wait: bool) -> Result<(), RocksError> { let mut flushopts = rocksdb::FlushOptions::default(); flushopts.set_wait(wait); diff --git a/crates/storage-api/src/lib.rs b/crates/storage-api/src/lib.rs index daf109fd59..713c0803b9 100644 --- a/crates/storage-api/src/lib.rs +++ b/crates/storage-api/src/lib.rs @@ -17,7 +17,7 @@ pub enum StorageError { Generic(#[from] anyhow::Error), #[error("failed to convert Rust objects to/from protobuf: {0}")] Conversion(anyhow::Error), - #[error("Integrity constrained is violated")] + #[error("Integrity constraint is violated")] DataIntegrityError, #[error("Operational error that can be caused during a graceful shutdown")] OperationalError, diff --git a/crates/types/src/id_util.rs b/crates/types/src/id_util.rs index 69431a7f43..44fefee830 100644 --- a/crates/types/src/id_util.rs +++ b/crates/types/src/id_util.rs @@ -45,6 +45,7 @@ prefixed_ids! { Deployment("dp"), Subscription("sub"), Awakeable("prom"), + Snapshot("snap"), } } diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index eb3f04a5d0..6ff5c20314 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -860,6 +860,84 @@ impl FromStr for IngressRequestId { } } +/// Unique Id of a partition snapshot. +#[derive( + Debug, + PartialEq, + Eq, + Clone, + Copy, + Hash, + PartialOrd, + Ord, + serde_with::SerializeDisplay, + serde_with::DeserializeFromStr, +)] +pub struct SnapshotId(pub(crate) Ulid); + +impl SnapshotId { + pub fn new() -> Self { + Self(Ulid::new()) + } + + pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self { + Self(Ulid::from_parts(timestamp_ms, random)) + } +} + +impl Default for SnapshotId { + fn default() -> Self { + Self::new() + } +} + +impl ResourceId for SnapshotId { + const SIZE_IN_BYTES: usize = size_of::(); + const RESOURCE_TYPE: IdResourceType = IdResourceType::Snapshot; + const STRING_CAPACITY_HINT: usize = base62_max_length_for_type::(); + + fn push_contents_to_encoder(&self, encoder: &mut IdEncoder) { + let raw: u128 = self.0.into(); + encoder.encode_fixed_width(raw); + } +} + +impl TimestampAwareId for SnapshotId { + fn timestamp(&self) -> MillisSinceEpoch { + self.0.timestamp_ms().into() + } +} + +impl FromStr for SnapshotId { + type Err = IdDecodeError; + + fn from_str(input: &str) -> Result { + let mut decoder = IdDecoder::new(input)?; + // Ensure we are decoding the correct resource type + if decoder.resource_type != Self::RESOURCE_TYPE { + return Err(IdDecodeError::TypeMismatch); + } + + // ulid (u128) + let raw_ulid: u128 = decoder.cursor.decode_next()?; + Ok(Self::from(raw_ulid)) + } +} + +impl fmt::Display for SnapshotId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut encoder = IdEncoder::::new(); + self.push_contents_to_encoder(&mut encoder); + fmt::Display::fmt(&encoder.finalize(), f) + } +} + +impl From for SnapshotId { + fn from(value: u128) -> Self { + Self(Ulid::from(value)) + } +} + #[cfg(any(test, feature = "test-util"))] mod mocks { use super::*;