Skip to content

Commit

Permalink
snapshot on scale to zero and shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
avinassh committed Nov 9, 2023
1 parent 3790007 commit c2ff5ab
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 46 deletions.
30 changes: 27 additions & 3 deletions libsql-server/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,40 @@ use crate::connection::libsql::LibSqlConnection;
use crate::connection::write_proxy::{RpcStream, WriteProxyConnection};
use crate::connection::{Connection, MakeConnection, TrackedConnection};
use crate::replication::{ReplicationLogger, ReplicationLoggerHook};
use async_trait::async_trait;

pub type Result<T> = anyhow::Result<T>;

#[async_trait]
pub trait Database: Sync + Send + 'static {
/// The connection type of the database
type Connection: Connection;

fn connection_maker(&self) -> Arc<dyn MakeConnection<Connection = Self::Connection>>;
fn shutdown(&self);

fn destroy(self);

async fn shutdown(self) -> Result<()>;
}

pub struct ReplicaDatabase {
pub connection_maker:
Arc<dyn MakeConnection<Connection = TrackedConnection<WriteProxyConnection<RpcStream>>>>,
}

#[async_trait]
impl Database for ReplicaDatabase {
type Connection = TrackedConnection<WriteProxyConnection<RpcStream>>;

fn connection_maker(&self) -> Arc<dyn MakeConnection<Connection = Self::Connection>> {
self.connection_maker.clone()
}

fn shutdown(&self) {}
fn destroy(self) {}

async fn shutdown(self) -> Result<()> {
Ok(())
}
}

pub type PrimaryConnection = TrackedConnection<LibSqlConnection<ReplicationLoggerHook>>;
Expand All @@ -35,14 +47,26 @@ pub struct PrimaryDatabase {
pub connection_maker: Arc<dyn MakeConnection<Connection = PrimaryConnection>>,
}

#[async_trait]
impl Database for PrimaryDatabase {
type Connection = PrimaryConnection;

fn connection_maker(&self) -> Arc<dyn MakeConnection<Connection = Self::Connection>> {
self.connection_maker.clone()
}

fn shutdown(&self) {
fn destroy(self) {
self.logger.closed_signal.send_replace(true);
}

async fn shutdown(self) -> Result<()> {
self.logger.closed_signal.send_replace(true);
if let Some(replicator) = &self.logger.bottomless_replicator {
let replicator = replicator.lock().unwrap().take();
if let Some(mut replicator) = replicator {
replicator.wait_until_snapshotted().await?;
}
}
Ok(())
}
}
4 changes: 3 additions & 1 deletion libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub enum Error {
Fork(#[from] ForkError),
#[error("Fatal replication error")]
FatalReplicationError,

#[error("Connection with primary broken")]
PrimaryStreamDisconnect,
#[error("Proxy protocal misuse")]
Expand All @@ -91,6 +90,8 @@ pub enum Error {
PrimaryStreamInterupted,
#[error("Wrong URL: {0}")]
UrlParseError(#[from] url::ParseError),
#[error("Namespace store has shutdown")]
NamespaceStoreShutdown,
}

trait ResponseError: std::error::Error {
Expand Down Expand Up @@ -148,6 +149,7 @@ impl IntoResponse for Error {
PrimaryStreamMisuse => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
PrimaryStreamInterupted => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
UrlParseError(_) => self.format_err(StatusCode::BAD_REQUEST),
NamespaceStoreShutdown => self.format_err(StatusCode::SERVICE_UNAVAILABLE),
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![allow(clippy::type_complexity, clippy::too_many_arguments)]

use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::process::Command;
use std::str::FromStr;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -370,6 +372,7 @@ where
let snapshot_callback = self.make_snapshot_callback();
let auth = self.user_api_config.get_auth().map(Arc::new)?;
let extensions = self.db_config.validate_extensions()?;
let namespace_store_shutdown_fut: Pin<Box<dyn Future<Output = Result<()>> + Send>>;

match self.rpc_client_config {
Some(rpc_config) => {
Expand All @@ -385,6 +388,10 @@ where
let (namespaces, proxy_service, replication_service) = replica.configure().await?;
self.rpc_client_config = None;
self.spawn_monitoring_tasks(&mut join_set, stats_receiver, namespaces.clone())?;
namespace_store_shutdown_fut = {
let namespaces = namespaces.clone();
Box::pin(async move { namespaces.shutdown().await })
};

let services = Services {
namespaces,
Expand Down Expand Up @@ -420,6 +427,10 @@ where
let (namespaces, proxy_service, replication_service) = primary.configure().await?;
self.rpc_server_config = None;
self.spawn_monitoring_tasks(&mut join_set, stats_receiver, namespaces.clone())?;
namespace_store_shutdown_fut = {
let namespaces = namespaces.clone();
Box::pin(async move { namespaces.shutdown().await })
};

let services = Services {
namespaces,
Expand All @@ -442,6 +453,7 @@ where
tokio::select! {
_ = self.shutdown.notified() => {
join_set.shutdown().await;
namespace_store_shutdown_fut.await?;
// clean shutdown, remove sentinel file
std::fs::remove_file(sentinel_file_path(&self.path))?;
}
Expand Down
53 changes: 50 additions & 3 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use tokio::task::JoinSet;
use tokio::time::{Duration, Instant};
use tokio_util::io::StreamReader;
use tonic::transport::Channel;
use tracing::trace;
use uuid::Uuid;

use crate::auth::Authenticated;
use crate::connection::config::DatabaseConfigStore;
use crate::connection::libsql::{open_conn, MakeLibSqlConn};
use crate::connection::write_proxy::MakeWriteProxyConn;
use crate::connection::Connection;
use crate::connection::MakeConnection;
use crate::database::{Database, PrimaryDatabase, ReplicaDatabase};
use crate::error::{Error, LoadDumpError};
Expand Down Expand Up @@ -281,6 +283,7 @@ struct NamespaceStoreInner<M: MakeNamespace> {
/// The namespace factory, to create new namespaces.
make_namespace: M,
allow_lazy_creation: bool,
has_shutdown: RwLock<bool>,
}

impl<M: MakeNamespace> NamespaceStore<M> {
Expand All @@ -290,11 +293,15 @@ impl<M: MakeNamespace> NamespaceStore<M> {
store: Default::default(),
make_namespace,
allow_lazy_creation,
has_shutdown: RwLock::new(false),
}),
}
}

pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> {
if *self.inner.has_shutdown.read().await {
return Err(Error::NamespaceStoreShutdown);
}
let mut lock = self.inner.store.write().await;
if let Some(ns) = lock.remove(&namespace) {
// FIXME: when destroying, we are waiting for all the tasks associated with the
Expand All @@ -320,7 +327,10 @@ impl<M: MakeNamespace> NamespaceStore<M> {
&self,
namespace: NamespaceName,
restore_option: RestoreOption,
) -> anyhow::Result<()> {
) -> crate::Result<()> {
if *self.inner.has_shutdown.read().await {
return Err(Error::NamespaceStoreShutdown);
}
let mut lock = self.inner.store.write().await;
if let Some(ns) = lock.remove(&namespace) {
// FIXME: when destroying, we are waiting for all the tasks associated with the
Expand Down Expand Up @@ -379,6 +389,9 @@ impl<M: MakeNamespace> NamespaceStore<M> {
to: NamespaceName,
timestamp: Option<NaiveDateTime>,
) -> crate::Result<()> {
if *self.inner.has_shutdown.read().await {
return Err(Error::NamespaceStoreShutdown);
}
let mut lock = self.inner.store.write().await;
if lock.contains_key(&to) {
return Err(crate::error::Error::NamespaceAlreadyExist(
Expand Down Expand Up @@ -424,6 +437,9 @@ impl<M: MakeNamespace> NamespaceStore<M> {
where
Fun: FnOnce(&Namespace<M::Database>) -> R,
{
if *self.inner.has_shutdown.read().await {
return Err(Error::NamespaceStoreShutdown);
}
if !auth.is_namespace_authorized(&namespace) {
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
}
Expand All @@ -435,6 +451,9 @@ impl<M: MakeNamespace> NamespaceStore<M> {
where
Fun: FnOnce(&Namespace<M::Database>) -> R,
{
if *self.inner.has_shutdown.read().await {
return Err(Error::NamespaceStoreShutdown);
}
let before_load = Instant::now();
let lock = self.inner.store.upgradable_read().await;
if let Some(ns) = lock.get(&namespace) {
Expand Down Expand Up @@ -466,6 +485,9 @@ impl<M: MakeNamespace> NamespaceStore<M> {
namespace: NamespaceName,
restore_option: RestoreOption,
) -> crate::Result<()> {
if *self.inner.has_shutdown.read().await {
return Err(Error::NamespaceStoreShutdown);
}
let lock = self.inner.store.upgradable_read().await;
if lock.contains_key(&namespace) {
return Err(crate::error::Error::NamespaceAlreadyExist(
Expand All @@ -491,6 +513,17 @@ impl<M: MakeNamespace> NamespaceStore<M> {
Ok(())
}

pub async fn shutdown(self) -> crate::Result<()> {
let mut has_shutdown = self.inner.has_shutdown.write().await;
*has_shutdown = true;
let mut lock = self.inner.store.write().await;
for (name, ns) in lock.drain() {
ns.shutdown().await?;
trace!("shutdown namespace: `{}`", name);
}
Ok(())
}

pub(crate) async fn stats(&self, namespace: NamespaceName) -> crate::Result<Arc<Stats>> {
self.with(namespace, |ns| ns.stats.clone()).await
}
Expand Down Expand Up @@ -520,9 +553,22 @@ impl<T: Database> Namespace<T> {
}

async fn destroy(mut self) -> anyhow::Result<()> {
self.db.shutdown();
self.tasks.shutdown().await;
self.db.destroy();
Ok(())
}

async fn checkpoint(&self) -> anyhow::Result<()> {
let conn = self.db.connection_maker().create().await?;
conn.vacuum_if_needed().await?;
conn.checkpoint().await?;
Ok(())
}

async fn shutdown(mut self) -> anyhow::Result<()> {
self.tasks.shutdown().await;
self.checkpoint().await?;
self.db.shutdown().await?;
Ok(())
}
}
Expand Down Expand Up @@ -763,7 +809,7 @@ impl Namespace<PrimaryDatabase> {
}

is_dirty |= did_recover;
Some(Arc::new(std::sync::Mutex::new(replicator)))
Some(Arc::new(std::sync::Mutex::new(Some(replicator))))
} else {
None
};
Expand All @@ -787,6 +833,7 @@ impl Namespace<PrimaryDatabase> {
let cb = config.snapshot_callback.clone();
move |path: &Path| cb(path, &name)
}),
bottomless_replicator.clone(),
)?);

let ctx_builder = {
Expand Down
Loading

0 comments on commit c2ff5ab

Please sign in to comment.