Skip to content

Commit

Permalink
Add a basic create/restore snapshot smoke test
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 20, 2024
1 parent 8b80178 commit 92cae7b
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ toml = { version = "0.8.12" }
tracing = { workspace = true }
tracing-panic = { version = "0.1.2" }
regex = "1.10.4"
url = { version = "2.5.4", features = [] }

[dev-dependencies]
restate-admin = { workspace = true, features = ["memory-loglet"] }
restate-admin = { workspace = true, features = ["memory-loglet", "clients"] }
restate-bifrost = { workspace = true, features = ["test-util"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-local-cluster-runner = { workspace = true }
Expand All @@ -76,9 +77,13 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
googletest = { workspace = true }
hyper-util = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tonic = { workspace = true, features = ["transport", "prost"] }
tower = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }
Expand Down
173 changes: 173 additions & 0 deletions server/tests/snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::time::Duration;

use enumset::enum_set;
use futures_util::StreamExt;
use googletest::fail;
use hyper_util::rt::TokioIo;
use regex::Regex;
use tempfile::TempDir;
use test_log::test;
use tokio::io;
use tokio::net::UnixStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint, Uri};
use tower::service_fn;
use url::Url;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, CreatePartitionSnapshotRequest,
};
use restate_local_cluster_runner::{
cluster::Cluster,
node::{BinarySource, Node},
};
use restate_types::config::{LogFormat, MetadataStoreClient};
use restate_types::net::AdvertisedAddress;
use restate_types::protobuf::cluster::node_state::State;
use restate_types::protobuf::cluster::{NodeState, RunMode};

Check failure on line 38 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused imports: `NodeState` and `RunMode`
use restate_types::{config::Configuration, nodes_config::Role};

mod common;

#[test(restate_core::test)]
async fn create_and_restore_snapshot() -> googletest::Result<()> {
let mut base_config = Configuration::default();
base_config.common.bootstrap_num_partitions = 1.try_into()?;
base_config.common.log_filter = "restate=debug,warn".to_owned();
base_config.common.log_format = LogFormat::Compact;

let snapshots_dir = TempDir::new()?;
base_config.worker.snapshots.destination = Some(
Url::from_file_path(snapshots_dir.path())
.unwrap()
.to_string(),
);

let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Worker),
1,
);

let mut partition_ready = nodes[1].lines(Regex::new("Won the leadership campaign")?);

let cluster = Cluster::builder()
.temp_base_dir()
.nodes(nodes.clone())
.build()
.start()
.await?;

cluster.wait_healthy(Duration::from_secs(30)).await?;
assert!(partition_ready.next().await.is_some());

let mut client =
ClusterCtrlSvcClient::new(grpc_connect(cluster.nodes[0].node_address().clone()).await?)
.accept_compressed(CompressionEncoding::Gzip);

any_partition_active(&mut client, Duration::from_secs(5)).await?;

let snapshot_response = client
.create_partition_snapshot(CreatePartitionSnapshotRequest { partition_id: 0 })
.await?
.into_inner();

let mut node_2 = Node::new_test_node(
"node-2",
base_config,
BinarySource::CargoTest,
enum_set!(Role::Worker),
);
*node_2.metadata_store_client_mut() = MetadataStoreClient::Embedded {
address: cluster.nodes[0].node_address().clone(),
};

let mut snapshot_restored = node_2.lines(
format!(
"Importing partition store snapshot.*{}",
snapshot_response.snapshot_id
)
.parse()?,
);

node_2
.start_clustered(cluster.base_dir(), cluster.cluster_name())
.await?;

assert!(snapshot_restored.next().await.is_some());
Ok(())
}

async fn any_partition_active(
client: &mut ClusterCtrlSvcClient<Channel>,
timeout: Duration,
) -> googletest::Result<()> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let cluster_state = client
.get_cluster_state(ClusterStateRequest {})
.await?
.into_inner()
.cluster_state
.unwrap();

if cluster_state

Check failure on line 126 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

called `is_some()` after searching an `Iterator` with `find`
.nodes
.values()
.find(|n| {
n.state.as_ref().is_some_and(|s| match s {
State::Alive(s) => s
.partitions
.values()
.find(|p| p.effective_mode.cmp(&1).is_eq())

Check failure on line 134 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

called `is_some()` after searching an `Iterator` with `find`
.is_some(),
_ => false,
})
})
.is_some()
{
break; // partition is ready; we can request snapshot
}
if tokio::time::Instant::now() > deadline {
fail!("Partition processor did not become ready within the timeout");

Check failure on line 144 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused `std::result::Result` that must be used

Check failure on line 144 in server/tests/snapshots.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

unused return value of `any_partition_active::{closure#0}::create_fail_result` that must be used
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
Ok(())
}

async fn grpc_connect(address: AdvertisedAddress) -> Result<Channel, tonic::transport::Error> {
match address {
AdvertisedAddress::Uds(uds_path) => {
// dummy endpoint required to specify an uds connector, it is not used anywhere
Endpoint::try_from("http://127.0.0.1")
.expect("/ should be a valid Uri")
.connect_with_connector(service_fn(move |_: Uri| {
let uds_path = uds_path.clone();
async move {
Ok::<_, io::Error>(TokioIo::new(UnixStream::connect(uds_path).await?))
}
})).await
}
AdvertisedAddress::Http(uri) => {
Channel::builder(uri)
.connect_timeout(Duration::from_secs(2))
.timeout(Duration::from_secs(2))
.http2_adaptive_window(true)
.connect()
.await
}
}
}

0 comments on commit 92cae7b

Please sign in to comment.