diff --git a/README.md b/README.md index d55137b1..ee88b547 100644 --- a/README.md +++ b/README.md @@ -242,7 +242,7 @@ let's say 10000, the following snippet should do: ```rust let db = Arc::new(GroveDb::open("db").unwrap()); - db.start_visualzier(10000); + db.start_visualizer(10000); ``` Just remember to use Arc because the HTTP server might outlast the GroveDB instance. diff --git a/grovedb/Cargo.toml b/grovedb/Cargo.toml index 2aeeae9a..680a0e15 100644 --- a/grovedb/Cargo.toml +++ b/grovedb/Cargo.toml @@ -27,13 +27,14 @@ indexmap = "2.2.6" intmap = { version = "2.0.0", optional = true } grovedb-path = { version = "2.0.3", path = "../path" } grovedbg-types = { version = "2.0.3", path = "../grovedbg-types", optional = true } -tokio = { version = "1.37.0", features = ["rt-multi-thread", "net"], optional = true } +tokio = { version = "1.40.0", features = ["rt-multi-thread", "net"], optional = true } axum = { version = "0.7.5", features = ["macros"], optional = true } tower-http = { version = "0.5.2", features = ["fs"], optional = true } blake3 = "1.4.0" bitvec = "1" zip-extensions = { version ="0.6.2", optional = true } grovedb-version = { path = "../grovedb-version", version = "2.0.3" } +tokio-util = { version = "0.7.12", optional = true } [dev-dependencies] rand = "0.8.5" @@ -74,6 +75,7 @@ estimated_costs = ["full"] grovedbg = [ "grovedbg-types", "tokio", + "tokio-util", "full", "grovedb-merk/grovedbg", "axum", diff --git a/grovedb/build.rs b/grovedb/build.rs index 66e72d15..9c0441b7 100644 --- a/grovedb/build.rs +++ b/grovedb/build.rs @@ -6,8 +6,8 @@ fn main() { use sha2::{digest::FixedOutput, Digest, Sha256}; const GROVEDBG_SHA256: [u8; 32] = - hex!("ea7d9258973aa765eaf5064451fc83efa22e0ce6eaf2938507e2703571364e35"); - const GROVEDBG_VERSION: &str = "v1.0.0-rc.6"; + hex!("5a1ee5a3033190974f580e41047ef9267ba03fafe0a70bbcf7878c1bb4f6126d"); + const GROVEDBG_VERSION: &str = "v1.1.0"; let out_dir = PathBuf::from(&env::var_os("OUT_DIR").unwrap()); let grovedbg_zip_path = out_dir.join("grovedbg.zip"); diff --git a/grovedb/src/debugger.rs b/grovedb/src/debugger.rs index da4657ec..1c07504f 100644 --- a/grovedb/src/debugger.rs +++ b/grovedb/src/debugger.rs @@ -1,6 +1,11 @@ //! GroveDB debugging support module. -use std::{collections::BTreeMap, fs, sync::Weak}; +use std::{ + collections::{BTreeMap, HashMap}, + fs, + sync::{Arc, Weak}, + time::{Duration, Instant, SystemTime}, +}; use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router}; use grovedb_merk::{ @@ -11,14 +16,19 @@ use grovedb_merk::{ use grovedb_path::SubtreePath; use grovedb_version::version::GroveVersion; use grovedbg_types::{ - MerkProofNode, MerkProofOp, NodeFetchRequest, NodeUpdate, Path, PathQuery, Query, QueryItem, - SizedQuery, SubqueryBranch, + DropSessionRequest, MerkProofNode, MerkProofOp, NewSessionResponse, NodeFetchRequest, + NodeUpdate, Path, PathQuery, Query, QueryItem, SessionId, SizedQuery, SubqueryBranch, + WithSession, }; use indexmap::IndexMap; +use tempfile::tempdir; use tokio::{ net::ToSocketAddrs, - sync::mpsc::{self, Sender}, + select, + sync::{RwLock, RwLockReadGuard}, + time::sleep, }; +use tokio_util::sync::CancellationToken; use tower_http::services::ServeDir; use crate::{ @@ -30,6 +40,8 @@ use crate::{ const GROVEDBG_ZIP: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/grovedbg.zip")); +const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 10); + pub(super) fn start_visualizer(grovedb: Weak, addr: A) where A: ToSocketAddrs + Send + 'static, @@ -44,33 +56,132 @@ where zip_extensions::read::zip_extract(&grovedbg_zip, &grovedbg_www) .expect("cannot extract grovedbg contents"); - let (shutdown_send, mut shutdown_receive) = mpsc::channel::<()>(1); + let cancellation_token = CancellationToken::new(); + + let state: AppState = AppState { + cancellation_token: cancellation_token.clone(), + grovedb, + sessions: Default::default(), + }; + let app = Router::new() + .route("/new_session", post(new_session)) + .route("/drop_session", post(drop_session)) .route("/fetch_node", post(fetch_node)) .route("/fetch_root_node", post(fetch_root_node)) .route("/prove_path_query", post(prove_path_query)) .route("/fetch_with_path_query", post(fetch_with_path_query)) .fallback_service(ServeDir::new(grovedbg_www)) - .with_state((shutdown_send, grovedb)); - - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { - let listener = tokio::net::TcpListener::bind(addr) - .await - .expect("can't bind visualizer port"); - axum::serve(listener, app) - .with_graceful_shutdown(async move { - shutdown_receive.recv().await; - }) - .await - .unwrap() - }); + .with_state(state.clone()); + + let rt = tokio::runtime::Runtime::new().unwrap(); + + let cloned_cancellation_token = cancellation_token.clone(); + rt.spawn(async move { + loop { + select! { + _ = cloned_cancellation_token.cancelled() => break, + _ = sleep(Duration::from_secs(10)) => { + let now = Instant::now(); + let mut lock = state.sessions.write().await; + let to_delete: Vec = lock.iter().filter_map( + |(id, session)| (session.last_access < now - SESSION_TIMEOUT).then_some(*id) + ).collect(); + + to_delete.into_iter().for_each(|id| { lock.remove(&id); }); + } + } + } + }); + + rt.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("can't bind visualizer port"); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + cancellation_token.cancelled().await; + }) + .await + .unwrap() + }); }); } +#[derive(Clone)] +struct AppState { + cancellation_token: CancellationToken, + grovedb: Weak, + sessions: Arc>>, +} + +impl AppState { + fn verify_running(&self) -> Result<(), AppError> { + if self.grovedb.strong_count() == 0 { + self.cancellation_token.cancel(); + Err(AppError::Closed) + } else { + Ok(()) + } + } + + async fn new_session(&self) -> Result { + let grovedb = self.grovedb.upgrade().ok_or(AppError::Closed)?; + let id = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("time went backwards") + .as_secs(); + self.sessions + .write() + .await + .insert(id, Session::new(&grovedb)?); + + Ok(id) + } + + async fn drop_session(&self, id: SessionId) { + self.sessions.write().await.remove(&id); + } + + async fn get_snapshot(&self, id: SessionId) -> Result, AppError> { + self.verify_running()?; + let mut lock = self.sessions.write().await; + if let Some(session) = lock.get_mut(&id) { + session.last_access = Instant::now(); + Ok(RwLockReadGuard::map(lock.downgrade(), |l| { + &l.get(&id).as_ref().expect("checked above").snapshot + })) + } else { + Err(AppError::NoSession) + } + } +} + +struct Session { + last_access: Instant, + _tempdir: tempfile::TempDir, + snapshot: GroveDb, +} + +impl Session { + fn new(grovedb: &GroveDb) -> Result { + let tempdir = tempdir().map_err(|e| AppError::Any(e.to_string()))?; + let path = tempdir.path().join("grovedbg_session"); + grovedb + .create_checkpoint(&path) + .map_err(|e| AppError::Any(e.to_string()))?; + let snapshot = GroveDb::open(path).map_err(|e| AppError::Any(e.to_string()))?; + Ok(Session { + last_access: Instant::now(), + _tempdir: tempdir, + snapshot, + }) + } +} + enum AppError { Closed, + NoSession, Any(String), } @@ -80,6 +191,9 @@ impl IntoResponse for AppError { AppError::Closed => { (StatusCode::SERVICE_UNAVAILABLE, "GroveDB is closed").into_response() } + AppError::NoSession => { + (StatusCode::UNAUTHORIZED, "No session with this id").into_response() + } AppError::Any(e) => (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(), } } @@ -91,16 +205,28 @@ impl From for AppError { } } +async fn new_session(State(state): State) -> Result, AppError> { + Ok(Json(NewSessionResponse { + session_id: state.new_session().await?, + })) +} + +async fn drop_session( + State(state): State, + Json(DropSessionRequest { session_id }): Json, +) { + state.drop_session(session_id).await; +} + async fn fetch_node( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, - Json(NodeFetchRequest { path, key }): Json, + State(state): State, + Json(WithSession { + session_id, + request: NodeFetchRequest { path, key }, + }): Json>, ) -> Result>, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; - // todo: GroveVersion::latest() to actual version let merk = db .open_non_transactional_merk_at_path(path.as_slice().into(), None, GroveVersion::latest()) .unwrap()?; @@ -115,14 +241,14 @@ async fn fetch_node( } async fn fetch_root_node( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, + State(state): State, + Json(WithSession { + session_id, + request: (), + }): Json>, ) -> Result>, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; - // todo: GroveVersion::latest() to actual version let merk = db .open_non_transactional_merk_at_path(SubtreePath::empty(), None, GroveVersion::latest()) .unwrap()?; @@ -138,13 +264,13 @@ async fn fetch_root_node( } async fn prove_path_query( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, - Json(json_path_query): Json, + State(state): State, + Json(WithSession { + session_id, + request: json_path_query, + }): Json>, ) -> Result, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; let path_query = path_query_to_grovedb(json_path_query); @@ -155,13 +281,13 @@ async fn prove_path_query( } async fn fetch_with_path_query( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, - Json(json_path_query): Json, + State(state): State, + Json(WithSession { + session_id, + request: json_path_query, + }): Json>, ) -> Result>, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; let path_query = path_query_to_grovedb(json_path_query); @@ -487,7 +613,6 @@ fn node_to_update( feature_type, }: NodeDbg, ) -> Result { - // todo: GroveVersion::latest() to actual version let grovedb_element = crate::Element::deserialize(&value, GroveVersion::latest())?; let element = element_to_grovedbg(grovedb_element); diff --git a/grovedbg-types/src/lib.rs b/grovedbg-types/src/lib.rs index f1cc53bd..dd9fc007 100644 --- a/grovedbg-types/src/lib.rs +++ b/grovedbg-types/src/lib.rs @@ -6,6 +6,23 @@ use serde_with::{base64::Base64, serde_as}; pub type Key = Vec; pub type Path = Vec; pub type PathSegment = Vec; +pub type SessionId = u64; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WithSession { + pub session_id: SessionId, + pub request: R, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NewSessionResponse { + pub session_id: SessionId, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DropSessionRequest { + pub session_id: SessionId, +} #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]