diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index 053423ad4..bdc604692 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use rocksdb::ExportImportFilesMetaData; use tokio::sync::Mutex; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; use crate::cf_options; use crate::snapshots::LocalPartitionSnapshot; @@ -86,9 +86,11 @@ impl PartitionStoreManager { }) } - pub async fn has_partition(&self, partition_id: PartitionId) -> bool { - let guard = self.lookup.lock().await; - guard.live.contains_key(&partition_id) + /// Check whether we have a partition store for the given partition id, irrespective of whether + /// the store is open or not. + pub async fn has_partition_store(&self, partition_id: PartitionId) -> bool { + let cf_name = cf_for_partition(partition_id); + self.rocksdb.inner().cf_handle(&cf_name).is_some() } pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option { @@ -147,8 +149,7 @@ impl PartitionStoreManager { let mut guard = self.lookup.lock().await; if guard.live.contains_key(&partition_id) { warn!( - ?partition_id, - ?snapshot, + %partition_id, "The partition store is already open, refusing to import snapshot" ); return Err(RocksError::AlreadyOpen); @@ -158,32 +159,39 @@ impl PartitionStoreManager { let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some(); if cf_exists { warn!( - ?partition_id, - ?cf_name, - ?snapshot, + %partition_id, + %cf_name, "The column family for partition already exists in the database, cannot import snapshot" ); return Err(RocksError::ColumnFamilyExists); } + if snapshot.key_range.start() > partition_key_range.start() + || snapshot.key_range.end() < partition_key_range.end() + { + warn!( + %partition_id, + snapshot_range = ?snapshot.key_range, + partition_range = ?partition_key_range, + "The snapshot key range does not fully cover the partition key range" + ); + return Err(RocksError::SnapshotKeyRangeMismatch); + } + let mut import_metadata = ExportImportFilesMetaData::default(); import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str()); import_metadata.set_files(&snapshot.files); info!( - ?partition_id, - min_applied_lsn = ?snapshot.min_applied_lsn, - "Initializing partition store from snapshot" + %partition_id, + min_lsn = %snapshot.min_applied_lsn, + path = ?snapshot.base_dir, + "Importing partition store snapshot" ); - if let Err(e) = self - .rocksdb + self.rocksdb .import_cf(cf_name.clone(), opts, import_metadata) - .await - { - error!(?partition_id, "Failed to import snapshot"); - return Err(e); - } + .await?; assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some()); let partition_store = PartitionStore::new( diff --git a/crates/rocksdb/src/error.rs b/crates/rocksdb/src/error.rs index ade83f436..4cba08e89 100644 --- a/crates/rocksdb/src/error.rs +++ b/crates/rocksdb/src/error.rs @@ -30,6 +30,9 @@ pub enum RocksError { #[error("already exists")] #[code(unknown)] ColumnFamilyExists, + #[error("invalid key range for partition")] + #[code(unknown)] + SnapshotKeyRangeMismatch, #[error(transparent)] #[code(unknown)] Other(#[from] rocksdb::Error), diff --git a/crates/rocksdb/src/rock_access.rs b/crates/rocksdb/src/rock_access.rs index b6548c887..fee3646be 100644 --- a/crates/rocksdb/src/rock_access.rs +++ b/crates/rocksdb/src/rock_access.rs @@ -46,6 +46,8 @@ pub trait RocksAccess { default_cf_options: rocksdb::Options, cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>, ) -> Result<(), RocksError>; + /// Create a column family from a snapshot. The data files referenced by + /// `metadata` will be moved into the RocksDB data directory. fn import_cf( &self, name: CfName, @@ -163,7 +165,7 @@ impl RocksAccess for rocksdb::DB { let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?; let mut import_opts = ImportColumnFamilyOptions::default(); - import_opts.set_move_files(false); // keep the snapshot files intact + import_opts.set_move_files(true); Ok(Self::create_column_family_with_import( self, diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 8a7eadde0..a507f79a3 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -68,10 +68,11 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } strum = { workspace = true } +tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } -tokio-util = { workspace = true } +tokio-util = { workspace = true, features = ["io-util"] } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index e4592dbdb..a8500b72f 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -156,9 +156,13 @@ impl Worker { partition_store_manager.clone(), router_builder, bifrost, - SnapshotRepository::create_if_configured(snapshots_options) - .await - .map_err(BuildError::SnapshotRepository)?, + SnapshotRepository::create_if_configured( + snapshots_options, + config.common.base_dir().join("pp-snapshots"), + config.common.cluster_name().to_owned(), + ) + .await + .map_err(BuildError::SnapshotRepository)?, ); // handle RPCs diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index f30b1adb6..0e27b6e72 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -22,11 +23,18 @@ use object_store::aws::{AmazonS3Builder, S3ConditionalPut}; use object_store::{MultipartUpload, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVersion}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use tempfile::TempDir; +use tokio::io; use tokio::io::AsyncReadExt; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; +use tokio_util::io::StreamReader; use tracing::{debug, info, instrument, warn}; use url::Url; -use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; +use restate_partition_store::snapshots::{ + LocalPartitionSnapshot, PartitionSnapshotMetadata, SnapshotFormatVersion, +}; use restate_types::config::SnapshotsOptions; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::logs::Lsn; @@ -50,8 +58,19 @@ pub struct SnapshotRepository { object_store: Arc, destination: Url, prefix: String, + /// Ingested snapshots staging location. + staging_dir: PathBuf, + /// Expected cluster name for the snapshots in this repository. + cluster_name: String, } +/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an +/// API error to attempt a multipart put below this size, apart from the final segment. +const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024; + +/// Maximum number of concurrent downloads when getting snapshots from the repository. +const DOWNLOAD_CONCURRENCY_LIMIT: usize = 8; + #[serde_as] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct LatestSnapshot { @@ -99,6 +118,8 @@ impl SnapshotRepository { /// Creates an instance of the repository if a snapshots destination is configured. pub async fn create_if_configured( snapshots_options: &SnapshotsOptions, + staging_dir: PathBuf, + cluster_name: String, ) -> anyhow::Result> { let mut destination = if let Some(ref destination) = snapshots_options.destination { Url::parse(destination).context("Failed parsing snapshot repository URL")? @@ -125,6 +146,8 @@ impl SnapshotRepository { object_store, destination, prefix, + staging_dir, + cluster_name, })) } @@ -314,6 +337,173 @@ impl SnapshotRepository { ) } + /// Discover and download the latest snapshot available. It is the caller's responsibility + /// to delete the snapshot directory when it is no longer needed. + #[instrument( + level = "debug", + skip_all, + err, + fields(%partition_id), + )] + pub(crate) async fn get_latest( + &self, + partition_id: PartitionId, + ) -> anyhow::Result> { + let latest_path = object_store::path::Path::from(format!( + "{prefix}{partition_id}/latest.json", + prefix = self.prefix, + partition_id = partition_id, + )); + + let latest = self.object_store.get(&latest_path).await; + + let latest = match latest { + Ok(result) => result, + Err(object_store::Error::NotFound { .. }) => { + debug!("Latest snapshot data not found in repository"); + return Ok(None); + } + Err(e) => return Err(e.into()), + }; + + let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?; + debug!("Latest snapshot metadata: {:?}", latest); + + let snapshot_metadata_path = object_store::path::Path::from(format!( + "{prefix}{partition_id}/{path}/metadata.json", + prefix = self.prefix, + partition_id = partition_id, + path = latest.path, + )); + let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await; + + let snapshot_metadata = match snapshot_metadata { + Ok(result) => result, + Err(object_store::Error::NotFound { .. }) => { + // todo(pavel): revisit whether we shouldn't just panic at this point - this is a bad sign! + warn!("Latest snapshot points to a snapshot that was not found in the repository!"); + return Ok(None); // arguably this could also be an error + } + Err(e) => return Err(e.into()), + }; + + let mut snapshot_metadata: PartitionSnapshotMetadata = + serde_json::from_slice(&snapshot_metadata.bytes().await?)?; + if snapshot_metadata.version != SnapshotFormatVersion::V1 { + return Err(anyhow!( + "Unsupported snapshot format version: {:?}", + snapshot_metadata.version + )); + } + + if snapshot_metadata.cluster_name != self.cluster_name { + // todo(pavel): revisit whether we shouldn't just panic at this point - this is a bad sign! + warn!("Snapshot does not match the cluster name of latest snapshot at destination in snapshot id {}! Expected: cluster name=\"{}\", found: \"{}\"", + snapshot_metadata.snapshot_id, + self.cluster_name, + snapshot_metadata.cluster_name); + return Ok(None); // perhaps this needs to be a configuration error + } + + // The snapshot ingest directory should be on the same filesystem as the partition store + // to minimize IO and disk space usage during import. + let snapshot_dir = TempDir::with_prefix_in( + format!("{}-", snapshot_metadata.snapshot_id), + &self.staging_dir, + )?; + debug!( + snapshot_id = %snapshot_metadata.snapshot_id, + path = ?snapshot_dir.path(), + "Getting snapshot data", + ); + + let directory = snapshot_dir.path().to_string_lossy().to_string(); + let concurrency_limiter = Arc::new(Semaphore::new(DOWNLOAD_CONCURRENCY_LIMIT)); + let mut downloads = JoinSet::new(); + let mut task_handles = HashMap::with_capacity(snapshot_metadata.files.len()); + for file in &mut snapshot_metadata.files { + let filename = file.name.trim_start_matches("/"); + let expected_size = file.size; + let key = object_store::path::Path::from(format!( + "{prefix}{partition_id}/{path}/{filename}", + prefix = self.prefix, + partition_id = partition_id, + path = latest.path, + filename = filename, + )); + let file_path = snapshot_dir.path().join(filename); + let concurrency_limiter = Arc::clone(&concurrency_limiter); + let object_store = Arc::clone(&self.object_store); + + let handle = downloads.build_task().name(filename).spawn(async move { + let _permit = concurrency_limiter.acquire().await?; + let mut file_data = StreamReader::new( + object_store + .get(&key) + .await + .map_err(|e| anyhow!("Failed to download snapshot file {:?}: {}", key, e))? + .into_stream(), + ); + let mut snapshot_file = + tokio::fs::File::create_new(&file_path).await.map_err(|e| { + anyhow!("Failed to create snapshot file {:?}: {}", file_path, e) + })?; + let size = io::copy(&mut file_data, &mut snapshot_file) + .await + .map_err(|e| anyhow!("Failed to download snapshot file {:?}: {}", key, e))?; + if size != expected_size as u64 { + return Err(anyhow!( + "Downloaded snapshot file {:?} has unexpected size: expected {}, got {}", + key, + expected_size, + size + )); + } + debug!(%key, ?size, "Downloaded snapshot data file to {:?}", file_path); + anyhow::Ok(()) + })?; + task_handles.insert(handle.id(), filename.to_string()); + // patch the directory path to reflect the actual location on the restoring node + file.directory = directory.clone(); + } + + loop { + match downloads.join_next().await { + None => { + debug!("All download tasks completed"); + break; + } + Some(Err(join_error)) => { + let failed = task_handles.get(&join_error.id()); + abort_tasks(downloads).await; + return Err(anyhow!( + "Failed to download snapshot file {:?}: {}", + failed, + join_error + )); + } + Some(Ok(Err(error))) => { + abort_tasks(downloads).await; + return Err(error); + } + Some(Ok(Ok(_))) => {} + } + } + + info!( + snapshot_id = %snapshot_metadata.snapshot_id, + path = ?snapshot_dir.path(), + "Downloaded partition snapshot", + ); + Ok(Some(LocalPartitionSnapshot { + base_dir: snapshot_dir.into_path(), + min_applied_lsn: snapshot_metadata.min_applied_lsn, + db_comparator_name: snapshot_metadata.db_comparator_name, + files: snapshot_metadata.files, + key_range: snapshot_metadata.key_range.clone(), + })) + } + async fn get_latest_snapshot_metadata_for_update( &self, snapshot: &PartitionSnapshotMetadata, @@ -326,7 +516,7 @@ impl SnapshotRepository { version: result.meta.version.clone(), }; let latest: LatestSnapshot = serde_json::from_slice( - result.bytes().await?.iter().as_slice(), + &result.bytes().await?, ) .inspect_err(|e| { debug!( @@ -436,10 +626,6 @@ impl PutSnapshotError { } } -/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an -/// API error to attempt a multipart put below this size, apart from the final segment. -const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024; - // The object_store `put_multipart` method does not currently support PutMode, so we don't pass this // at all; however since we upload snapshots to a unique path on every attempt, we don't expect any // conflicts to arise. @@ -497,6 +683,11 @@ async fn put_snapshot_object( } } +async fn abort_tasks(mut join_set: JoinSet) { + join_set.abort_all(); + while join_set.join_next().await.is_some() {} +} + #[derive(Debug)] struct AwsSdkCredentialsProvider { credentials_provider: DefaultCredentialsChain, @@ -558,12 +749,14 @@ mod tests { let snapshot_source = TempDir::new()?; let source_dir = snapshot_source.path().to_path_buf(); - let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; - data.write_all(b"snapshot-data").await?; + let data = b"snapshot-data"; + let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data_file.write_all(data).await?; let snapshot = mock_snapshot_metadata( "/data.sst".to_owned(), source_dir.to_string_lossy().to_string(), + data.len(), ); let snapshots_destination: TempDir = TempDir::new()?; @@ -576,9 +769,13 @@ mod tests { ), ..SnapshotsOptions::default() }; - let repository = SnapshotRepository::create_if_configured(&opts) - .await? - .unwrap(); + let repository = SnapshotRepository::create_if_configured( + &opts, + TempDir::new().unwrap().into_path(), + "cluster".to_owned(), + ) + .await? + .unwrap(); // Write invalid JSON to latest.json let latest_path = destination_dir.join(format!("{}/latest.json", PartitionId::MIN)); @@ -634,12 +831,14 @@ mod tests { .await; assert!(matches!(latest, Err(object_store::Error::NotFound { .. }))); - let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; - data.write_all(b"snapshot-data").await?; + let data = b"snapshot-data"; + let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data_file.write_all(data).await?; let mut snapshot1 = mock_snapshot_metadata( "/data.sst".to_owned(), source_dir.to_string_lossy().to_string(), + data.len(), ); snapshot1.min_applied_lsn = Lsn::new( SystemTime::now() @@ -652,9 +851,13 @@ mod tests { ..SnapshotsOptions::default() }; - let repository = SnapshotRepository::create_if_configured(&opts) - .await? - .unwrap(); + let repository = SnapshotRepository::create_if_configured( + &opts, + TempDir::new().unwrap().into_path(), + "cluster".to_owned(), + ) + .await? + .unwrap(); repository.put(&snapshot1, source_dir.clone()).await?; @@ -691,12 +894,14 @@ mod tests { let snapshot_source = TempDir::new()?; let source_dir = snapshot_source.path().to_path_buf(); - let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; - data.write_all(b"snapshot-data").await?; + let data = b"snapshot-data"; + let mut data_file = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data_file.write_all(data).await?; let mut snapshot2 = mock_snapshot_metadata( "/data.sst".to_owned(), source_dir.to_string_lossy().to_string(), + data.len(), ); snapshot2.min_applied_lsn = snapshot1.min_applied_lsn.next(); @@ -717,13 +922,26 @@ mod tests { latest ); + let latest = repository.get_latest(PartitionId::MIN).await?.unwrap(); + assert_eq!(latest.min_applied_lsn, snapshot2.min_applied_lsn); + let local_path = latest.base_dir.as_path().to_string_lossy().to_string(); + drop(latest); + + let local_dir_exists = tokio::fs::try_exists(&local_path).await?; + assert!(local_dir_exists); + tokio::fs::remove_dir_all(&local_path).await?; + Ok(()) } - fn mock_snapshot_metadata(file_name: String, directory: String) -> PartitionSnapshotMetadata { + fn mock_snapshot_metadata( + file_name: String, + directory: String, + size: usize, + ) -> PartitionSnapshotMetadata { PartitionSnapshotMetadata { version: SnapshotFormatVersion::V1, - cluster_name: "test-cluster".to_string(), + cluster_name: "cluster".to_string(), node_name: "node".to_string(), partition_id: PartitionId::MIN, created_at: humantime::Timestamp::from(SystemTime::now()), @@ -736,7 +954,7 @@ mod tests { column_family_name: "data-0".to_owned(), name: file_name, directory, - size: 0, + size, level: 0, start_key: Some(vec![0]), end_key: Some(vec![0xff, 0xff]), diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 7027b1f9e..1a0492d89 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -875,6 +875,7 @@ impl PartitionProcessorManager { self.updateable_config.clone(), self.bifrost.clone(), self.partition_store_manager.clone(), + self.snapshot_repository.clone(), ) } diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index d12c54a45..f71d9907d 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -11,7 +11,7 @@ use std::ops::RangeInclusive; use tokio::sync::{mpsc, watch}; -use tracing::instrument; +use tracing::{debug, info, instrument, warn}; use restate_bifrost::Bifrost; use restate_core::{Metadata, RuntimeTaskHandle, TaskCenter, TaskKind}; @@ -26,6 +26,7 @@ use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; use crate::partition::invoker_storage_reader::InvokerStorageReader; +use crate::partition::snapshots::SnapshotRepository; use crate::partition_processor_manager::processor_state::StartedProcessor; use crate::PartitionProcessorBuilder; @@ -36,6 +37,7 @@ pub struct SpawnPartitionProcessorTask { configuration: Live, bifrost: Bifrost, partition_store_manager: PartitionStoreManager, + snapshot_repository: Option, } impl SpawnPartitionProcessorTask { @@ -47,6 +49,7 @@ impl SpawnPartitionProcessorTask { configuration: Live, bifrost: Bifrost, partition_store_manager: PartitionStoreManager, + snapshot_repository: Option, ) -> Self { Self { task_name, @@ -55,6 +58,7 @@ impl SpawnPartitionProcessorTask { configuration, bifrost, partition_store_manager, + snapshot_repository, } } @@ -72,6 +76,7 @@ impl SpawnPartitionProcessorTask { configuration, bifrost, partition_store_manager, + snapshot_repository, } = self; let config = configuration.pinned(); @@ -117,15 +122,90 @@ impl SpawnPartitionProcessorTask { { let options = options.clone(); let key_range = key_range.clone(); + move || async move { - let partition_store = partition_store_manager - .open_partition_store( - partition_id, - key_range, - OpenMode::CreateIfMissing, - &options.storage.rocksdb, - ) - .await?; + let partition_store = if !partition_store_manager + .has_partition_store(pp_builder.partition_id) + .await + { + let snapshot = if snapshot_repository.is_none() { + debug!( + partition_id = %partition_id, + "No snapshot repository configured", + ); + None + } else { + debug!( + partition_id = %partition_id, + "Looking for partition snapshot from which to bootstrap partition store", + ); + snapshot_repository.expect("is some").get_latest(partition_id).await? + }; + + + if let Some(snapshot) = snapshot { + info!( + partition_id = %partition_id, + "Found snapshot to bootstrap partition, restoring it", + ); + + let snapshot_path = snapshot.base_dir.clone(); + match partition_store_manager + .open_partition_store_from_snapshot( + partition_id, + key_range.clone(), + snapshot, + &options.storage.rocksdb, + ) + .await { + Ok(partition_store) => { + let res = tokio::fs::remove_dir_all(&snapshot_path).await; + if let Err(e) = res { + // This is not critical; since we move the SST files into RocksDB on import, at + // worst the snapshot metadata file will be retained. + warn!( + partition_id = %partition_id, + ?snapshot_path, + "Failed to remove local snapshot directory, continuing with startup: {:?}", + e + ); + } + partition_store + } + Err(e) => { + warn!( + partition_id = %partition_id, + ?snapshot_path, + "Failed to import snapshot, local copy retained" + ); + return Err(anyhow::anyhow!(e)); + } + } + } else { + info!( + partition_id = %partition_id, + "No snapshot found to bootstrap partition, creating new store", + ); + partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::CreateIfMissing, + &options.storage.rocksdb, + ) + .await? + } + } else { + partition_store_manager + .open_partition_store( + partition_id, + key_range, + OpenMode::OpenExisting, + &options.storage.rocksdb, + ) + .await? + }; + TaskCenter::spawn_child( TaskKind::SystemService, invoker_name,