diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index b5f53b6c..0597738a 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,7 +23,7 @@ use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::create_update_stream; use crate::alerts::Alerts; use crate::handlers::STREAM_TYPE_KEY; -use crate::hottier::HotTierManager; +use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::CONFIG; @@ -512,6 +512,110 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { + return Err(StreamError::Custom { + msg: "Hot tier can not be updated for internal stream".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + let body = body.into_inner(); + let mut hottier: StreamHotTier = match serde_json::from_value(body) { + Ok(hottier) => hottier, + Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), + }; + + validator::hot_tier(&hottier.size.to_string())?; + + STREAM_INFO.set_hot_tier(&stream_name, true)?; + if let Some(hot_tier_manager) = HotTierManager::global() { + let existing_hot_tier_used_size = hot_tier_manager + .validate_hot_tier_size(&stream_name, &hottier.size) + .await?; + hottier.used_size = Some(existing_hot_tier_used_size.to_string()); + hottier.available_size = Some(hottier.size.clone()); + hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); + hot_tier_manager + .put_hot_tier(&stream_name, &mut hottier) + .await?; + let storage = CONFIG.storage().get_object_store(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.hot_tier_enabled = Some(true); + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + } + + Ok(( + format!("hot tier set for stream {stream_name}"), + StatusCode::OK, + )) +} + +pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?; + hot_tier.size = format!("{} {}", hot_tier.size, "Bytes"); + hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes")); + hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes")); + Ok((web::Json(hot_tier), StatusCode::OK)) + } else { + Err(StreamError::Custom { + msg: format!("hot tier not initialised for stream {}", stream_name), + status: (StatusCode::BAD_REQUEST), + }) + } +} + +pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { + return Err(StreamError::Custom { + msg: "Hot tier can not be deleted for internal stream".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.delete_hot_tier(&stream_name).await?; + } + Ok(( + format!("hot tier deleted for stream {stream_name}"), + StatusCode::OK, + )) +} + pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { if let Ok(stream_exists) = create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index bbac2c15..4fb06085 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -17,12 +17,11 @@ use crate::{ logstream::{error::StreamError, get_stats_date}, modal::utils::logstream_utils::create_update_stream, }, - hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}, + hottier::HotTierManager, metadata::{self, STREAM_INFO}, option::CONFIG, stats::{self, Stats}, storage::{StorageDir, StreamType}, - validator, }; pub async fn delete(req: HttpRequest) -> Result { @@ -218,107 +217,3 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { - return Err(StreamError::Custom { - msg: "Hot tier can not be updated for internal stream".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - if CONFIG.parseable.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - let body = body.into_inner(); - let mut hottier: StreamHotTier = match serde_json::from_value(body) { - Ok(hottier) => hottier, - Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), - }; - - validator::hot_tier(&hottier.size.to_string())?; - - STREAM_INFO.set_hot_tier(&stream_name, true)?; - if let Some(hot_tier_manager) = HotTierManager::global() { - let existing_hot_tier_used_size = hot_tier_manager - .validate_hot_tier_size(&stream_name, &hottier.size) - .await?; - hottier.used_size = Some(existing_hot_tier_used_size.to_string()); - hottier.available_size = Some(hottier.size.clone()); - hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); - hot_tier_manager - .put_hot_tier(&stream_name, &mut hottier) - .await?; - let storage = CONFIG.storage().get_object_store(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.hot_tier_enabled = Some(true); - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - } - - Ok(( - format!("hot tier set for stream {stream_name}"), - StatusCode::OK, - )) -} - -pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - if CONFIG.parseable.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - if let Some(hot_tier_manager) = HotTierManager::global() { - let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?; - hot_tier.size = format!("{} {}", hot_tier.size, "Bytes"); - hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes")); - hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes")); - Ok((web::Json(hot_tier), StatusCode::OK)) - } else { - Err(StreamError::Custom { - msg: format!("hot tier not initialised for stream {}", stream_name), - status: (StatusCode::BAD_REQUEST), - }) - } -} - -pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - if CONFIG.parseable.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { - return Err(StreamError::Custom { - msg: "Hot tier can not be deleted for internal stream".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.delete_hot_tier(&stream_name).await?; - } - Ok(( - format!("hot tier deleted for stream {stream_name}"), - StatusCode::OK, - )) -} diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 618e9e07..015e01af 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -356,17 +356,17 @@ impl QueryServer { // PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream .route( web::put() - .to(querier_logstream::put_stream_hot_tier) + .to(logstream::put_stream_hot_tier) .authorize_for_stream(Action::PutHotTierEnabled), ) .route( web::get() - .to(querier_logstream::get_stream_hot_tier) + .to(logstream::get_stream_hot_tier) .authorize_for_stream(Action::GetHotTierEnabled), ) .route( web::delete() - .to(querier_logstream::delete_stream_hot_tier) + .to(logstream::delete_stream_hot_tier) .authorize_for_stream(Action::DeleteHotTierEnabled), ), ), diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index cdbe544e..815f00e5 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -29,6 +29,7 @@ use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::handlers::http::API_BASE_PATH; use crate::handlers::http::API_VERSION; +use crate::hottier::HotTierManager; use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; @@ -554,6 +555,10 @@ impl Server { storage::retention::load_retention_from_global(); + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.download_from_s3()?; + }; + let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = diff --git a/server/src/hottier.rs b/server/src/hottier.rs index de45530a..0528a122 100644 --- a/server/src/hottier.rs +++ b/server/src/hottier.rs @@ -73,10 +73,12 @@ impl HotTierManager { pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); - let hot_tier_path = CONFIG.parseable.hot_tier_storage_path.as_ref()?; - + let hot_tier_path = &CONFIG.parseable.hot_tier_storage_path; + if hot_tier_path.is_none() { + return None; + } Some(INSTANCE.get_or_init(|| { - let hot_tier_path = hot_tier_path.clone(); + let hot_tier_path = hot_tier_path.as_ref().unwrap().clone(); std::fs::create_dir_all(&hot_tier_path).unwrap(); HotTierManager { filesystem: LocalFileSystem::new(), diff --git a/server/src/option.rs b/server/src/option.rs index 21c517be..73b701c6 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -85,6 +85,15 @@ Cloud Native, log analytics platform for modern applications."#, .exit() } + if cli.hot_tier_storage_path.is_some() { + create_parseable_cli_command() + .error( + ErrorKind::ValueValidation, + "Cannot use hot tier with local-store subcommand.", + ) + .exit() + } + Config { parseable: cli, storage: Arc::new(storage),