From 5fc422919027abaf0a16b6c60f351dd89d40c05b Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Tue, 17 Sep 2024 18:41:50 +0200 Subject: [PATCH] bump grovedbg zip --- grovedb/Cargo.toml | 8 +++-- grovedb/build.rs | 4 +-- grovedb/src/debugger.rs | 77 ++++++++++++++++++++++++++--------------- 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/grovedb/Cargo.toml b/grovedb/Cargo.toml index 2aeeae9a..e8c009aa 100644 --- a/grovedb/Cargo.toml +++ b/grovedb/Cargo.toml @@ -27,13 +27,14 @@ indexmap = "2.2.6" intmap = { version = "2.0.0", optional = true } grovedb-path = { version = "2.0.3", path = "../path" } grovedbg-types = { version = "2.0.3", path = "../grovedbg-types", optional = true } -tokio = { version = "1.37.0", features = ["rt-multi-thread", "net"], optional = true } -axum = { version = "0.7.5", features = ["macros"], optional = true } -tower-http = { version = "0.5.2", features = ["fs"], optional = true } +tokio = { version = "1.37", features = ["rt-multi-thread", "net"], optional = true } +axum = { version = "0.7", features = ["macros"], optional = true } +tower-http = { version = "0.5", features = ["fs"], optional = true } blake3 = "1.4.0" bitvec = "1" zip-extensions = { version ="0.6.2", optional = true } grovedb-version = { path = "../grovedb-version", version = "2.0.3" } +tokio-util = { version = "0.7", optional = true } [dev-dependencies] rand = "0.8.5" @@ -74,6 +75,7 @@ estimated_costs = ["full"] grovedbg = [ "grovedbg-types", "tokio", + "tokio-util", "full", "grovedb-merk/grovedbg", "axum", diff --git a/grovedb/build.rs b/grovedb/build.rs index 66e72d15..ba614eff 100644 --- a/grovedb/build.rs +++ b/grovedb/build.rs @@ -6,8 +6,8 @@ fn main() { use sha2::{digest::FixedOutput, Digest, Sha256}; const GROVEDBG_SHA256: [u8; 32] = - hex!("ea7d9258973aa765eaf5064451fc83efa22e0ce6eaf2938507e2703571364e35"); - const GROVEDBG_VERSION: &str = "v1.0.0-rc.6"; + hex!("4c05d47613b1a68caaf5229e460720d7958b2fe0148fd0f5edd04f827052db64"); + const GROVEDBG_VERSION: &str = "v1.0.0"; let out_dir = PathBuf::from(&env::var_os("OUT_DIR").unwrap()); let grovedbg_zip_path = out_dir.join("grovedbg.zip"); diff --git a/grovedb/src/debugger.rs b/grovedb/src/debugger.rs index c67f901e..1c07504f 100644 --- a/grovedb/src/debugger.rs +++ b/grovedb/src/debugger.rs @@ -4,7 +4,7 @@ use std::{ collections::{BTreeMap, HashMap}, fs, sync::{Arc, Weak}, - time::{Instant, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router}; @@ -24,11 +24,11 @@ use indexmap::IndexMap; use tempfile::tempdir; use tokio::{ net::ToSocketAddrs, - sync::{ - mpsc::{self, Sender}, - RwLock, RwLockReadGuard, - }, + select, + sync::{RwLock, RwLockReadGuard}, + time::sleep, }; +use tokio_util::sync::CancellationToken; use tower_http::services::ServeDir; use crate::{ @@ -40,6 +40,8 @@ use crate::{ const GROVEDBG_ZIP: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/grovedbg.zip")); +const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 10); + pub(super) fn start_visualizer(grovedb: Weak, addr: A) where A: ToSocketAddrs + Send + 'static, @@ -54,10 +56,10 @@ where zip_extensions::read::zip_extract(&grovedbg_zip, &grovedbg_www) .expect("cannot extract grovedbg contents"); - let (shutdown_send, mut shutdown_receive) = mpsc::channel::<()>(1); + let cancellation_token = CancellationToken::new(); let state: AppState = AppState { - shutdown: shutdown_send, + cancellation_token: cancellation_token.clone(), grovedb, sessions: Default::default(), }; @@ -70,35 +72,53 @@ where .route("/prove_path_query", post(prove_path_query)) .route("/fetch_with_path_query", post(fetch_with_path_query)) .fallback_service(ServeDir::new(grovedbg_www)) - .with_state(state); - - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { - let listener = tokio::net::TcpListener::bind(addr) - .await - .expect("can't bind visualizer port"); - axum::serve(listener, app) - .with_graceful_shutdown(async move { - shutdown_receive.recv().await; - }) - .await - .unwrap() - }); + .with_state(state.clone()); + + let rt = tokio::runtime::Runtime::new().unwrap(); + + let cloned_cancellation_token = cancellation_token.clone(); + rt.spawn(async move { + loop { + select! { + _ = cloned_cancellation_token.cancelled() => break, + _ = sleep(Duration::from_secs(10)) => { + let now = Instant::now(); + let mut lock = state.sessions.write().await; + let to_delete: Vec = lock.iter().filter_map( + |(id, session)| (session.last_access < now - SESSION_TIMEOUT).then_some(*id) + ).collect(); + + to_delete.into_iter().for_each(|id| { lock.remove(&id); }); + } + } + } + }); + + rt.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("can't bind visualizer port"); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + cancellation_token.cancelled().await; + }) + .await + .unwrap() + }); }); } #[derive(Clone)] struct AppState { - shutdown: Sender<()>, + cancellation_token: CancellationToken, grovedb: Weak, sessions: Arc>>, } impl AppState { - async fn verify_running(&self) -> Result<(), AppError> { + fn verify_running(&self) -> Result<(), AppError> { if self.grovedb.strong_count() == 0 { - self.shutdown.send(()).await.ok(); + self.cancellation_token.cancel(); Err(AppError::Closed) } else { Ok(()) @@ -124,7 +144,7 @@ impl AppState { } async fn get_snapshot(&self, id: SessionId) -> Result, AppError> { - self.verify_running().await?; + self.verify_running()?; let mut lock = self.sessions.write().await; if let Some(session) = lock.get_mut(&id) { session.last_access = Instant::now(); @@ -146,10 +166,11 @@ struct Session { impl Session { fn new(grovedb: &GroveDb) -> Result { let tempdir = tempdir().map_err(|e| AppError::Any(e.to_string()))?; + let path = tempdir.path().join("grovedbg_session"); grovedb - .create_checkpoint(tempdir.path()) + .create_checkpoint(&path) .map_err(|e| AppError::Any(e.to_string()))?; - let snapshot = GroveDb::open(tempdir.path()).map_err(|e| AppError::Any(e.to_string()))?; + let snapshot = GroveDb::open(path).map_err(|e| AppError::Any(e.to_string()))?; Ok(Session { last_access: Instant::now(), _tempdir: tempdir,