From 25a89c005a9cbb282fc4fea8e96fff0a1495b076 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Mon, 23 Dec 2024 14:05:10 +0200 Subject: [PATCH] Fix: Ensure local snapshot staging directory exists for restore (#2449) --- Cargo.lock | 4 + crates/types/src/config/worker.rs | 4 + crates/worker/src/lib.rs | 2 +- .../src/partition/snapshots/repository.rs | 4 + server/Cargo.toml | 7 +- server/tests/snapshots.rs | 169 ++++++++++++++++++ 6 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 server/tests/snapshots.rs diff --git a/Cargo.lock b/Cargo.lock index 23b54a9fd..436138462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6735,6 +6735,7 @@ dependencies = [ "futures-util", "googletest", "humantime", + "hyper-util", "pin-project", "regex", "restate-admin", @@ -6761,9 +6762,12 @@ dependencies = [ "tokio", "tokio-util", "toml", + "tonic", + "tower 0.4.13", "tracing", "tracing-panic", "tracing-subscriber", + "url", "vergen", ] diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 684493d56..99c75f6d3 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -335,6 +335,10 @@ impl StorageOptions { pub fn data_dir(&self) -> PathBuf { super::data_dir("db") } + + pub fn snapshots_staging_dir(&self) -> PathBuf { + super::data_dir("pp-snapshots") + } } impl Default for StorageOptions { diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index a8500b72f..575eee8db 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -158,7 +158,7 @@ impl Worker { bifrost, SnapshotRepository::create_if_configured( snapshots_options, - config.common.base_dir().join("pp-snapshots"), + config.worker.storage.snapshots_staging_dir(), config.common.cluster_name().to_owned(), ) .await diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 0e27b6e72..cf0bbcdd4 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -405,6 +405,10 @@ impl SnapshotRepository { return Ok(None); // perhaps this needs to be a configuration error } + if !self.staging_dir.exists() { + std::fs::create_dir_all(&self.staging_dir)?; + } + // The snapshot ingest directory should be on the same filesystem as the partition store // to minimize IO and disk space usage during import. let snapshot_dir = TempDir::with_prefix_in( diff --git a/server/Cargo.toml b/server/Cargo.toml index fb5e3cb0e..e8485a7c1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -63,9 +63,10 @@ toml = { version = "0.8.12" } tracing = { workspace = true } tracing-panic = { version = "0.1.2" } regex = "1.10.4" +url = { version = "2.5.4", features = [] } [dev-dependencies] -restate-admin = { workspace = true, features = ["memory-loglet"] } +restate-admin = { workspace = true, features = ["memory-loglet", "clients"] } restate-bifrost = { workspace = true, features = ["test-util"] } restate-core = { workspace = true, features = ["test-util"] } restate-local-cluster-runner = { workspace = true } @@ -76,9 +77,13 @@ anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } googletest = { workspace = true } +hyper-util = { workspace = true } tempfile = { workspace = true } test-log = { workspace = true } +tonic = { workspace = true, features = ["transport", "prost"] } +tower = { workspace = true } tracing-subscriber = { workspace = true } +url = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] } diff --git a/server/tests/snapshots.rs b/server/tests/snapshots.rs new file mode 100644 index 000000000..5f0fdc584 --- /dev/null +++ b/server/tests/snapshots.rs @@ -0,0 +1,169 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::time::Duration; + +use enumset::enum_set; +use futures_util::StreamExt; +use googletest::fail; +use hyper_util::rt::TokioIo; +use regex::Regex; +use tempfile::TempDir; +use test_log::test; +use tokio::io; +use tokio::net::UnixStream; +use tonic::codec::CompressionEncoding; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; +use url::Url; + +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::{ + ClusterStateRequest, CreatePartitionSnapshotRequest, +}; +use restate_local_cluster_runner::{ + cluster::Cluster, + node::{BinarySource, Node}, +}; +use restate_types::config::{LogFormat, MetadataStoreClient}; +use restate_types::net::AdvertisedAddress; +use restate_types::protobuf::cluster::node_state::State; +use restate_types::{config::Configuration, nodes_config::Role}; + +mod common; + +#[test(restate_core::test)] +async fn create_and_restore_snapshot() -> googletest::Result<()> { + let mut base_config = Configuration::default(); + base_config.common.bootstrap_num_partitions = 1.try_into()?; + base_config.common.log_filter = "restate=debug,warn".to_owned(); + base_config.common.log_format = LogFormat::Compact; + + let snapshots_dir = TempDir::new()?; + base_config.worker.snapshots.destination = Some( + Url::from_file_path(snapshots_dir.path()) + .unwrap() + .to_string(), + ); + + let nodes = Node::new_test_nodes_with_metadata( + base_config.clone(), + BinarySource::CargoTest, + enum_set!(Role::Worker), + 1, + ); + + let mut partition_ready = nodes[1].lines(Regex::new("Won the leadership campaign")?); + + let cluster = Cluster::builder() + .temp_base_dir() + .nodes(nodes.clone()) + .build() + .start() + .await?; + + cluster.wait_healthy(Duration::from_secs(30)).await?; + assert!(partition_ready.next().await.is_some()); + + let mut client = + ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?) + .accept_compressed(CompressionEncoding::Gzip); + + any_partition_active(&mut client, Duration::from_secs(5)).await?; + + let snapshot_response = client + .create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 }) + .await? + .into_inner(); + + let mut node_2 = Node::new_test_node( + "node-2", + base_config, + BinarySource::CargoTest, + enum_set!(Role::Worker), + ); + *node_2.metadata_store_client_mut() = MetadataStoreClient::Embedded { + address: cluster.nodes[0].node_address().clone(), + }; + + let mut snapshot_restored = node_2.lines( + format!( + "Importing partition store snapshot.*{}", + snapshot_response.snapshot_id + ) + .parse()?, + ); + + node_2 + .start_clustered(cluster.base_dir(), cluster.cluster_name()) + .await?; + + assert!(snapshot_restored.next().await.is_some()); + Ok(()) +} + +async fn any_partition_active( + client: &mut ClusterCtrlSvcClient, + timeout: Duration, +) -> googletest::Result<()> { + let deadline = tokio::time::Instant::now() + timeout; + loop { + let cluster_state = client + .get_cluster_state(ClusterStateRequest {}) + .await? + .into_inner() + .cluster_state + .unwrap(); + + if cluster_state.nodes.values().any(|n| { + n.state.as_ref().is_some_and(|s| match s { + State::Alive(s) => s + .partitions + .values() + .any(|p| p.effective_mode.cmp(&1).is_eq()), + _ => false, + }) + }) { + break; // partition is ready; we can request snapshot + } + if tokio::time::Instant::now() > deadline { + fail!( + "Partition processor did not become ready within {:?}", + timeout + )?; + } + tokio::time::sleep(Duration::from_millis(250)).await; + } + Ok(()) +} + +async fn grpc_connect(address: AdvertisedAddress) -> Result { + match address { + AdvertisedAddress::Uds(uds_path) => { + // dummy endpoint required to specify an uds connector, it is not used anywhere + Endpoint::try_from("http://127.0.0.1") + .expect("/ should be a valid Uri") + .connect_with_connector(service_fn(move |_: Uri| { + let uds_path = uds_path.clone(); + async move { + Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?)) + } + })).await + } + AdvertisedAddress::Http(uri) => { + Channel::builder(uri) + .connect_timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(2)) + .http2_adaptive_window(true) + .connect() + .await + } + } +}