diff --git a/server/src/analytics.rs b/server/src/analytics.rs index 95cfd3459..c011b5961 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -24,6 +24,7 @@ use crate::option::{Mode, CONFIG}; use crate::storage; use crate::{metadata, stats}; +use crate::stats::Stats; use actix_web::{web, HttpRequest, Responder}; use chrono::{DateTime, Utc}; use clokwerk::{AsyncScheduler, Interval}; @@ -70,6 +71,12 @@ pub struct Report { total_events_count: u64, total_json_bytes: u64, total_parquet_bytes: u64, + current_events_count: u64, + current_json_bytes: u64, + current_parquet_bytes: u64, + deleted_events_count: u64, + deleted_json_bytes: u64, + deleted_parquet_bytes: u64, metrics: HashMap, } @@ -112,6 +119,12 @@ impl Report { total_events_count: ingestor_metrics.3, total_json_bytes: ingestor_metrics.4, total_parquet_bytes: ingestor_metrics.5, + current_events_count: ingestor_metrics.6, + current_json_bytes: ingestor_metrics.7, + current_parquet_bytes: ingestor_metrics.8, + deleted_events_count: ingestor_metrics.9, + deleted_json_bytes: ingestor_metrics.10, + deleted_parquet_bytes: ingestor_metrics.11, metrics: build_metrics().await, }) } @@ -132,26 +145,70 @@ fn total_streams() -> usize { metadata::STREAM_INFO.list_streams().len() } -fn total_event_stats() -> (u64, u64, u64) { +fn total_event_stats() -> (Stats, Stats, Stats) { let mut total_events: u64 = 0; let mut total_parquet_bytes: u64 = 0; let mut total_json_bytes: u64 = 0; + let mut current_events: u64 = 0; + let mut current_parquet_bytes: u64 = 0; + let mut current_json_bytes: u64 = 0; + + let mut deleted_events: u64 = 0; + let mut deleted_parquet_bytes: u64 = 0; + let mut deleted_json_bytes: u64 = 0; + for stream in metadata::STREAM_INFO.list_streams() { let Some(stats) = stats::get_current_stats(&stream, "json") else { continue; }; - total_events += stats.events; - total_parquet_bytes += stats.storage; - total_json_bytes += stats.ingestion; + total_events += stats.lifetime_stats.events; + total_parquet_bytes += stats.lifetime_stats.storage; + total_json_bytes += stats.lifetime_stats.ingestion; + + current_events += stats.current_stats.events; + current_parquet_bytes += stats.current_stats.storage; + current_json_bytes += stats.current_stats.ingestion; + + deleted_events += stats.deleted_stats.events; + deleted_parquet_bytes += stats.deleted_stats.storage; + deleted_json_bytes += stats.deleted_stats.ingestion; } - (total_events, total_json_bytes, total_parquet_bytes) + + ( + Stats { + events: total_events, + ingestion: total_json_bytes, + storage: total_parquet_bytes, + }, + Stats { + events: current_events, + ingestion: current_json_bytes, + storage: current_parquet_bytes, + }, + Stats { + events: deleted_events, + ingestion: deleted_json_bytes, + storage: deleted_parquet_bytes, + }, + ) } -async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> { +async fn fetch_ingestors_metrics( +) -> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> { let event_stats = total_event_stats(); - let mut node_metrics = - NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2); + let mut node_metrics = NodeMetrics::new( + total_streams(), + event_stats.0.events, + event_stats.0.ingestion, + event_stats.0.storage, + event_stats.1.events, + event_stats.1.ingestion, + event_stats.1.storage, + event_stats.2.events, + event_stats.2.ingestion, + event_stats.2.storage, + ); let mut vec = vec![]; let mut active_ingestors = 0u64; @@ -198,6 +255,12 @@ async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, node_metrics.total_events_count, node_metrics.total_json_bytes, node_metrics.total_parquet_bytes, + node_metrics.current_events_count, + node_metrics.current_json_bytes, + node_metrics.current_parquet_bytes, + node_metrics.deleted_events_count, + node_metrics.deleted_json_bytes, + node_metrics.deleted_parquet_bytes, )) } @@ -255,6 +318,12 @@ struct NodeMetrics { total_events_count: u64, total_json_bytes: u64, total_parquet_bytes: u64, + current_events_count: u64, + current_json_bytes: u64, + current_parquet_bytes: u64, + deleted_events_count: u64, + deleted_json_bytes: u64, + deleted_parquet_bytes: u64, } impl NodeMetrics { @@ -262,23 +331,43 @@ impl NodeMetrics { let event_stats = total_event_stats(); Self { stream_count: total_streams(), - total_events_count: event_stats.0, - total_json_bytes: event_stats.1, - total_parquet_bytes: event_stats.2, + total_events_count: event_stats.0.events, + total_json_bytes: event_stats.0.ingestion, + total_parquet_bytes: event_stats.0.storage, + + current_events_count: event_stats.1.events, + current_json_bytes: event_stats.1.ingestion, + current_parquet_bytes: event_stats.1.storage, + + deleted_events_count: event_stats.2.events, + deleted_json_bytes: event_stats.2.ingestion, + deleted_parquet_bytes: event_stats.2.storage, } } - + #[allow(clippy::too_many_arguments)] fn new( stream_count: usize, total_events_count: u64, total_json_bytes: u64, total_parquet_bytes: u64, + current_events_count: u64, + current_json_bytes: u64, + current_parquet_bytes: u64, + deleted_events_count: u64, + deleted_json_bytes: u64, + deleted_parquet_bytes: u64, ) -> Self { Self { stream_count, total_events_count, total_json_bytes, total_parquet_bytes, + current_events_count, + current_json_bytes, + current_parquet_bytes, + deleted_events_count, + deleted_json_bytes, + deleted_parquet_bytes, } } diff --git a/server/src/catalog.rs b/server/src/catalog.rs index eb1237727..c88d4c9a5 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -20,7 +20,9 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; use crate::handlers::http::base_path_without_preceding_slash; +use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY}; use crate::option::CONFIG; +use crate::stats::{event_labels, storage_size_labels, update_deleted_stats}; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -101,13 +103,28 @@ pub async fn update_snapshot( change: manifest::File, ) -> Result<(), ObjectStorageError> { // get current snapshot + let event_labels = event_labels(stream_name, "json"); + let storage_size_labels = storage_size_labels(stream_name); + let events_ingested = EVENTS_INGESTED_TODAY + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_TODAY + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = STORAGE_SIZE_TODAY + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + let mut meta = storage.get_object_store_format(stream_name).await?; let meta_clone = meta.clone(); let manifests = &mut meta.snapshot.manifest_list; - let time_partition: Option = meta_clone.time_partition; + let time_partition = &meta_clone.time_partition; let lower_bound = match time_partition { Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(&change, time_partition); + let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string()); lower_bound } None => { @@ -129,12 +146,18 @@ pub async fn update_snapshot( let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); let mut ch = false; - for m in manifests.iter() { + for m in manifests.iter_mut() { let p = manifest_path("").to_string(); if m.manifest_path.contains(&p) { ch = true; + m.events_ingested = events_ingested; + m.ingestion_size = ingestion_size; + m.storage_size = storage_size; } } + + meta.snapshot.manifest_list = manifests.to_vec(); + storage.put_snapshot(stream_name, meta.snapshot).await?; if ch { if let Some(mut manifest) = storage.get_manifest(&path).await? { manifest.apply_change(change); @@ -148,7 +171,10 @@ pub async fn update_snapshot( storage.clone(), stream_name, false, - meta, + meta_clone, + events_ingested, + ingestion_size, + storage_size, ) .await?; } @@ -159,7 +185,10 @@ pub async fn update_snapshot( storage.clone(), stream_name, true, - meta, + meta_clone, + events_ingested, + ingestion_size, + storage_size, ) .await?; } @@ -170,7 +199,10 @@ pub async fn update_snapshot( storage.clone(), stream_name, true, - meta, + meta_clone, + events_ingested, + ingestion_size, + storage_size, ) .await?; } @@ -178,6 +210,7 @@ pub async fn update_snapshot( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn create_manifest( lower_bound: DateTime, change: manifest::File, @@ -185,6 +218,9 @@ async fn create_manifest( stream_name: &str, update_snapshot: bool, mut meta: ObjectStoreFormat, + events_ingested: u64, + ingestion_size: u64, + storage_size: u64, ) -> Result<(), ObjectStorageError> { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound @@ -216,6 +252,9 @@ async fn create_manifest( manifest_path: path.to_string(), time_lower_bound: lower_bound, time_upper_bound: upper_bound, + events_ingested, + ingestion_size, + storage_size, }; manifests.push(new_snapshot_entry); meta.snapshot.manifest_list = manifests; @@ -233,6 +272,8 @@ pub async fn remove_manifest_from_snapshot( if !dates.is_empty() { // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; + let meta_for_stats = meta.clone(); + update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?; let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); @@ -308,8 +349,6 @@ pub async fn get_first_event( ); // Convert dates vector to Bytes object let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap()); - // delete the stream - let ingestor_first_event_at = handlers::http::cluster::send_retention_cleanup_request( &url, @@ -333,7 +372,7 @@ pub async fn get_first_event( /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. -fn partition_path( +pub fn partition_path( stream: &str, lower_bound: DateTime, upper_bound: DateTime, diff --git a/server/src/catalog/snapshot.rs b/server/src/catalog/snapshot.rs index 982e111fd..997f3566a 100644 --- a/server/src/catalog/snapshot.rs +++ b/server/src/catalog/snapshot.rs @@ -22,7 +22,7 @@ use chrono::{DateTime, Utc}; use crate::query::PartialTimeFilter; -pub const CURRENT_SNAPSHOT_VERSION: &str = "v1"; +pub const CURRENT_SNAPSHOT_VERSION: &str = "v2"; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct Snapshot { pub version: String, @@ -76,4 +76,7 @@ pub struct ManifestItem { pub manifest_path: String, pub time_lower_bound: DateTime, pub time_upper_bound: DateTime, + pub events_ingested: u64, + pub ingestion_size: u64, + pub storage_size: u64, } diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 88c563251..505d555b8 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -28,8 +28,8 @@ use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::storage::object_storage::ingestor_metadata_path; +use crate::storage::PARSEABLE_ROOT_DIRECTORY; use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; -use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; @@ -161,25 +161,74 @@ pub async fn fetch_stats_from_ingestors( .get_object_store() .get_objects( Some(&path), - Box::new(|file_name| file_name.starts_with(".ingestor")), + Box::new(|file_name| { + file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") + }), ) .await?; + let mut ingestion_size = 0u64; let mut storage_size = 0u64; let mut count = 0u64; + let mut lifetime_ingestion_size = 0u64; + let mut lifetime_storage_size = 0u64; + let mut lifetime_count = 0u64; + let mut deleted_ingestion_size = 0u64; + let mut deleted_storage_size = 0u64; + let mut deleted_count = 0u64; for ob in obs { - if let Ok(stat) = serde_json::from_slice::(&ob) { - count += stat.stats.events; - ingestion_size += stat.stats.ingestion; - storage_size += stat.stats.storage; + let stream_metadata: serde_json::Value = + serde_json::from_slice(&ob).expect("stream.json is valid json"); + let version = stream_metadata + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + let stats = stream_metadata.get("stats").unwrap(); + if matches!(version, Some("v4")) { + let current_stats = stats.get("current_stats").unwrap().clone(); + let lifetime_stats = stats.get("lifetime_stats").unwrap().clone(); + let deleted_stats = stats.get("deleted_stats").unwrap().clone(); + + count += current_stats.get("events").unwrap().as_u64().unwrap(); + ingestion_size += current_stats.get("ingestion").unwrap().as_u64().unwrap(); + storage_size += current_stats.get("storage").unwrap().as_u64().unwrap(); + lifetime_count += lifetime_stats.get("events").unwrap().as_u64().unwrap(); + lifetime_ingestion_size += lifetime_stats.get("ingestion").unwrap().as_u64().unwrap(); + lifetime_storage_size += lifetime_stats.get("storage").unwrap().as_u64().unwrap(); + deleted_count += deleted_stats.get("events").unwrap().as_u64().unwrap(); + deleted_ingestion_size += deleted_stats.get("ingestion").unwrap().as_u64().unwrap(); + deleted_storage_size += deleted_stats.get("storage").unwrap().as_u64().unwrap(); + } else { + count += stats.get("events").unwrap().as_u64().unwrap(); + ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); + storage_size += stats.get("storage").unwrap().as_u64().unwrap(); + lifetime_count += stats.get("events").unwrap().as_u64().unwrap(); + lifetime_ingestion_size += stats.get("ingestion").unwrap().as_u64().unwrap(); + lifetime_storage_size += stats.get("storage").unwrap().as_u64().unwrap(); + deleted_count += 0; + deleted_ingestion_size += 0; + deleted_storage_size += 0; } } let qs = QueriedStats::new( "", Utc::now(), - IngestionStats::new(count, format!("{} Bytes", ingestion_size), "json"), - StorageStats::new(format!("{} Bytes", storage_size), "parquet"), + IngestionStats::new( + count, + format!("{} Bytes", ingestion_size), + lifetime_count, + format!("{} Bytes", lifetime_ingestion_size), + deleted_count, + format!("{} Bytes", deleted_ingestion_size), + "json", + ), + StorageStats::new( + format!("{} Bytes", storage_size), + format!("{} Bytes", lifetime_storage_size), + format!("{} Bytes", deleted_storage_size), + "parquet", + ), ); Ok(vec![qs]) diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index cea27bb04..6f41755f4 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -86,14 +86,30 @@ pub struct IngestionStats { pub count: u64, pub size: String, pub format: String, + pub lifetime_count: u64, + pub lifetime_size: String, + pub deleted_count: u64, + pub deleted_size: String, } impl IngestionStats { - pub fn new(count: u64, size: String, format: &str) -> Self { + pub fn new( + count: u64, + size: String, + lifetime_count: u64, + lifetime_size: String, + deleted_count: u64, + deleted_size: String, + format: &str, + ) -> Self { Self { count, size, format: format.to_string(), + lifetime_count, + lifetime_size, + deleted_count, + deleted_size, } } } @@ -102,13 +118,17 @@ impl IngestionStats { pub struct StorageStats { pub size: String, pub format: String, + pub lifetime_size: String, + pub deleted_size: String, } impl StorageStats { - pub fn new(size: String, format: &str) -> Self { + pub fn new(size: String, lifetime_size: String, deleted_size: String, format: &str) -> Self { Self { size, format: format.to_string(), + lifetime_size, + deleted_size, } } } @@ -125,6 +145,7 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .map(|x| &x.ingestion) .fold(IngestionStats::default(), |acc, x| IngestionStats { count: acc.count + x.count, + size: format!( "{} Bytes", acc.size.split(' ').collect_vec()[0] @@ -135,6 +156,26 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .unwrap_or_default() ), format: x.format.clone(), + lifetime_count: acc.lifetime_count + x.lifetime_count, + lifetime_size: format!( + "{} Bytes", + acc.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + deleted_count: acc.deleted_count + x.deleted_count, + deleted_size: format!( + "{} Bytes", + acc.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), }); let cumulative_storage = @@ -152,6 +193,24 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { .unwrap_or_default() ), format: x.format.clone(), + lifetime_size: format!( + "{} Bytes", + acc.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.lifetime_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), + deleted_size: format!( + "{} Bytes", + acc.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + + x.deleted_size.split(' ').collect_vec()[0] + .parse::() + .unwrap_or_default() + ), }); QueriedStats::new( diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 5910982f6..da4f00ad3 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -509,24 +509,40 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = match &stream_meta.first_event_at { Some(_) => { let ingestion_stats = IngestionStats::new( - stats.events, - format!("{} {}", stats.ingestion, "Bytes"), + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), "json", ); - let storage_stats = - StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) } None => { let ingestion_stats = IngestionStats::new( - stats.events, - format!("{} {}", stats.ingestion, "Bytes"), + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), "json", ); - let storage_stats = - StorageStats::new(format!("{} {}", stats.storage, "Bytes"), "parquet"); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) } @@ -544,6 +560,7 @@ pub async fn get_stats(req: HttpRequest) -> Result } // Check if the first_event_at is empty +#[allow(dead_code)] pub fn first_event_at_empty(stream_name: &str) -> bool { let hash_map = STREAM_INFO.read().unwrap(); if let Some(stream_info) = hash_map.get(stream_name) { @@ -628,19 +645,16 @@ pub async fn get_stream_info(req: HttpRequest) -> Result = Vec::new(); - if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await + let store = CONFIG.storage().get_object_store(); + let dates: Vec = Vec::new(); + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await { + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) { - if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) - { - log::error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); } } diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 2ce245949..6630da786 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -334,6 +334,7 @@ impl IngestServer { } metrics::fetch_stats_from_storage().await; + metrics::reset_daily_metric_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 607cc5653..3558ec3df 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -173,10 +173,11 @@ impl QueryServer { log::warn!("could not populate local metadata. {:?}", e); } - // track all parquet files already in the data directory - storage::retention::load_retention_from_global(); // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; + metrics::reset_daily_metric_from_global(); + // track all parquet files already in the data directory + storage::retention::load_retention_from_global(); // all internal data structures populated now. // start the analytics scheduler if enabled diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 165581234..ff30baa98 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -116,7 +116,8 @@ impl ParseableServer for Server { banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); - self.initialize().await + self.initialize().await?; + Ok(()) } fn validate(&self) -> anyhow::Result<()> { @@ -404,8 +405,9 @@ impl Server { log::warn!("could not populate local metadata. {:?}", err); } - storage::retention::load_retention_from_global(); metrics::fetch_stats_from_storage().await; + metrics::reset_daily_metric_from_global(); + storage::retention::load_retention_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = diff --git a/server/src/metadata.rs b/server/src/metadata.rs index b3963bbc4..76d5dd0da 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -26,7 +26,10 @@ use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; -use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE}; +use crate::metrics::{ + EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, + LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, +}; use crate::storage::{LogStream, ObjectStorage, StorageDir}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -286,10 +289,22 @@ impl StreamInfo { ) -> Result<(), MetadataError> { EVENTS_INGESTED .with_label_values(&[stream_name, origin]) - .inc_by(num_rows); + .add(num_rows as i64); + EVENTS_INGESTED_TODAY + .with_label_values(&[stream_name, origin]) + .add(num_rows as i64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); + EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[stream_name, origin]) + .add(size as i64); + LIFETIME_EVENTS_INGESTED + .with_label_values(&[stream_name, origin]) + .add(num_rows as i64); + LIFETIME_EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, origin]) + .add(size as i64); Ok(()) } } diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 3e337123d..2407de1ae 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -18,23 +18,40 @@ pub mod prom_utils; pub mod storage; +use std::sync::Mutex; +use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, metrics, option::CONFIG}; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; +use clokwerk::AsyncScheduler; +use clokwerk::Job; +use clokwerk::TimeUnits; use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; - -use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, option::CONFIG}; +use std::thread; +use std::time::Duration; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); +type SchedulerHandle = thread::JoinHandle<()>; -pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { - IntCounterVec::new( +static METRIC_SCHEDULER_HANDLER: Lazy>> = + Lazy::new(|| Mutex::new(None)); + +pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { + IntGaugeVec::new( Opts::new("events_ingested", "Events ingested").namespace(METRICS_NAMESPACE), &["stream", "format"], ) .expect("metric can be created") }); +pub static EVENTS_INGESTED_TODAY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_ingested_today", "Events ingested today").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("events_ingested_size", "Events ingested size bytes") @@ -44,6 +61,18 @@ pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static EVENTS_INGESTED_SIZE_TODAY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "events_ingested_size_today", + "Events ingested size today in bytes", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + pub static STORAGE_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), @@ -52,6 +81,75 @@ pub static STORAGE_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static STORAGE_SIZE_TODAY: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("storage_size_today", "Storage size today in bytes").namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + +pub static EVENTS_DELETED: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static EVENTS_DELETED_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("events_deleted_size", "Events deleted size bytes").namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static DELETED_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "deleted_events_storage_size", + "Deleted events storage size bytes", + ) + .namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + +pub static LIFETIME_EVENTS_INGESTED: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new("lifetime_events_ingested", "Lifetime events ingested") + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static LIFETIME_EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "lifetime_events_ingested_size", + "Lifetime events ingested size bytes", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "format"], + ) + .expect("metric can be created") +}); + +pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "lifetime_events_storage_size", + "Lifetime events storage size bytes", + ) + .namespace(METRICS_NAMESPACE), + &["type", "stream", "format"], + ) + .expect("metric can be created") +}); + pub static STAGING_FILES: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), @@ -88,12 +186,39 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED.clone())) .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_TODAY.clone())) + .expect("metric can be registered"); registry .register(Box::new(EVENTS_INGESTED_SIZE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_SIZE_TODAY.clone())) + .expect("metric can be registered"); registry .register(Box::new(STORAGE_SIZE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(STORAGE_SIZE_TODAY.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_DELETED.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_DELETED_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(DELETED_EVENTS_STORAGE_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(LIFETIME_EVENTS_INGESTED.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(LIFETIME_EVENTS_INGESTED_SIZE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(LIFETIME_EVENTS_STORAGE_SIZE.clone())) + .expect("metric can be registered"); registry .register(Box::new(STAGING_FILES.clone())) .expect("metric can be registered"); @@ -145,12 +270,86 @@ pub async fn fetch_stats_from_storage() { EVENTS_INGESTED .with_label_values(&[&stream_name, "json"]) - .inc_by(stats.events); + .set(stats.current_stats.events as i64); + EVENTS_INGESTED_TODAY + .with_label_values(&[&stream_name, "json"]) + .set(stats.current_date_stats.events as i64); EVENTS_INGESTED_SIZE .with_label_values(&[&stream_name, "json"]) - .set(stats.ingestion as i64); + .set(stats.current_stats.ingestion as i64); + EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[&stream_name, "json"]) + .set(stats.current_date_stats.ingestion as i64); STORAGE_SIZE .with_label_values(&["data", &stream_name, "parquet"]) - .set(stats.storage as i64) + .set(stats.current_stats.storage as i64); + STORAGE_SIZE_TODAY + .with_label_values(&["data", &stream_name, "parquet"]) + .set(stats.current_date_stats.storage as i64); + EVENTS_DELETED + .with_label_values(&[&stream_name, "json"]) + .set(stats.deleted_stats.events as i64); + EVENTS_DELETED_SIZE + .with_label_values(&[&stream_name, "json"]) + .set(stats.deleted_stats.ingestion as i64); + DELETED_EVENTS_STORAGE_SIZE + .with_label_values(&["data", &stream_name, "parquet"]) + .set(stats.deleted_stats.storage as i64); + + LIFETIME_EVENTS_INGESTED + .with_label_values(&[&stream_name, "json"]) + .set(stats.lifetime_stats.events as i64); + LIFETIME_EVENTS_INGESTED_SIZE + .with_label_values(&[&stream_name, "json"]) + .set(stats.lifetime_stats.ingestion as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", &stream_name, "parquet"]) + .set(stats.lifetime_stats.storage as i64); } } + +fn async_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .thread_name("reset-metrics-task-thread") + .enable_all() + .build() + .unwrap() +} + +pub fn reset_daily_metric_from_global() { + init_reset_daily_metric_scheduler(); +} + +pub fn init_reset_daily_metric_scheduler() { + log::info!("Setting up schedular"); + let mut scheduler = AsyncScheduler::new(); + let func = move || async { + //get retention every day at 12 am + for stream in STREAM_INFO.list_streams() { + metrics::EVENTS_INGESTED_TODAY + .with_label_values(&[&stream, "json"]) + .set(0); + metrics::EVENTS_INGESTED_SIZE_TODAY + .with_label_values(&[&stream, "json"]) + .set(0); + metrics::STORAGE_SIZE_TODAY + .with_label_values(&["data", &stream, "parquet"]) + .set(0); + } + }; + + scheduler.every(1.day()).at("00:00").run(func); + + let scheduler_handler = thread::spawn(|| { + let rt = async_runtime(); + rt.block_on(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + scheduler.run_pending().await; + } + }); + }); + + *METRIC_SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); + log::info!("Scheduler is initialized") +} diff --git a/server/src/migration.rs b/server/src/migration.rs index d9c15fc4c..ae053b9db 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -23,11 +23,6 @@ mod stream_metadata_migration; use std::{fs::OpenOptions, sync::Arc}; -use bytes::Bytes; -use itertools::Itertools; -use relative_path::RelativePathBuf; -use serde::Serialize; - use crate::{ option::Config, storage::{ @@ -36,6 +31,10 @@ use crate::{ SCHEMA_FILE_NAME, STREAM_ROOT_DIRECTORY, }, }; +use bytes::Bytes; +use itertools::Itertools; +use relative_path::RelativePathBuf; +use serde::Serialize; /// Migrate the metdata from v1 or v2 to v3 /// This is a one time migration @@ -118,7 +117,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: match version { Some("v1") => { - let new_stream_metadata = stream_metadata_migration::v1_v3(stream_metadata); + let new_stream_metadata = stream_metadata_migration::v1_v4(stream_metadata); storage .put_object(&path, to_bytes(&new_stream_metadata)) .await?; @@ -127,11 +126,11 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema).ok(); - let map = schema_migration::v1_v3(schema)?; + let map = schema_migration::v1_v4(schema)?; storage.put_object(&schema_path, to_bytes(&map)).await?; } Some("v2") => { - let new_stream_metadata = stream_metadata_migration::v2_v3(stream_metadata); + let new_stream_metadata = stream_metadata_migration::v2_v4(stream_metadata); storage .put_object(&path, to_bytes(&new_stream_metadata)) .await?; @@ -140,9 +139,15 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); let schema = storage.get_object(&schema_path).await?; let schema = serde_json::from_slice(&schema)?; - let map = schema_migration::v2_v3(schema)?; + let map = schema_migration::v2_v4(schema)?; storage.put_object(&schema_path, to_bytes(&map)).await?; } + Some("v3") => { + let new_stream_metadata = stream_metadata_migration::v3_v4(stream_metadata); + storage + .put_object(&path, to_bytes(&new_stream_metadata)) + .await?; + } _ => (), } @@ -223,7 +228,7 @@ pub async fn run_file_migration(config: &Config) -> anyhow::Result<()> { } run_meta_file_migration(&object_store, old_meta_file_path).await?; - run_stream_files_migration(object_store).await?; + run_stream_files_migration(&object_store).await?; Ok(()) } @@ -263,7 +268,7 @@ async fn run_meta_file_migration( } async fn run_stream_files_migration( - object_store: Arc, + object_store: &Arc, ) -> anyhow::Result<()> { let streams = object_store .list_old_streams() diff --git a/server/src/migration/schema_migration.rs b/server/src/migration/schema_migration.rs index 5abacfdf1..9cca5ac6e 100644 --- a/server/src/migration/schema_migration.rs +++ b/server/src/migration/schema_migration.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use arrow_schema::{DataType, Field, Schema}; use serde_json::Value; -pub(super) fn v1_v3(schema: Option) -> anyhow::Result { +pub(super) fn v1_v4(schema: Option) -> anyhow::Result { if let Some(schema) = schema { value_to_schema(schema) } else { @@ -30,7 +30,7 @@ pub(super) fn v1_v3(schema: Option) -> anyhow::Result { } } -pub(super) fn v2_v3(schemas: HashMap) -> anyhow::Result { +pub(super) fn v2_v4(schemas: HashMap) -> anyhow::Result { let mut derived_schemas = Vec::new(); for value in schemas.into_values() { diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index dc0a12440..24da19bf0 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -19,16 +19,78 @@ use serde_json::{json, Value}; -use crate::storage; +use crate::{catalog::snapshot::CURRENT_SNAPSHOT_VERSION, storage}; -pub fn v1_v3(mut stream_metadata: Value) -> Value { +pub fn v1_v4(mut stream_metadata: Value) -> Value { + let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); + let stats = stream_metadata_map.get("stats").unwrap().clone(); let default_stats = json!({ - "events": 0, - "ingestion": 0, - "storage": 0 + "lifetime_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "current_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "deleted_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + }, + "current_date_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + } }); + stream_metadata_map.insert("stats".to_owned(), default_stats); + stream_metadata_map.insert( + "version".to_owned(), + Value::String(storage::CURRENT_SCHEMA_VERSION.into()), + ); + stream_metadata_map.insert( + "objectstore-format".to_owned(), + Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), + ); + stream_metadata_map.insert( + "snapshot".to_owned(), + json!({ + "version": CURRENT_SNAPSHOT_VERSION, + "manifest_list": [] + }), + ); + stream_metadata +} + +pub fn v2_v4(mut stream_metadata: Value) -> Value { let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); - stream_metadata_map.entry("stats").or_insert(default_stats); + let stats = stream_metadata_map.get("stats").unwrap().clone(); + let default_stats = json!({ + "lifetime_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "current_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "deleted_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + }, + "current_date_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + } + }); + stream_metadata_map.insert("stats".to_owned(), default_stats); stream_metadata_map.insert( "version".to_owned(), Value::String(storage::CURRENT_SCHEMA_VERSION.into()), @@ -37,17 +99,44 @@ pub fn v1_v3(mut stream_metadata: Value) -> Value { "objectstore-format".to_owned(), Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), ); + + stream_metadata_map.insert( + "snapshot".to_owned(), + json!({ + "version": CURRENT_SNAPSHOT_VERSION, + "manifest_list": [] + }), + ); stream_metadata } -pub fn v2_v3(mut stream_metadata: Value) -> Value { +pub fn v3_v4(mut stream_metadata: Value) -> Value { + let stream_metadata_map: &mut serde_json::Map = + stream_metadata.as_object_mut().unwrap(); + let stats = stream_metadata_map.get("stats").unwrap().clone(); let default_stats = json!({ - "events": 0, - "ingestion": 0, - "storage": 0 + "lifetime_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "current_stats": { + "events": stats.get("events").unwrap(), + "ingestion": stats.get("ingestion").unwrap(), + "storage": stats.get("storage").unwrap() + }, + "deleted_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + }, + "current_date_stats": { + "events": 0, + "ingestion": 0, + "storage": 0 + } }); - let stream_metadata_map = stream_metadata.as_object_mut().unwrap(); - stream_metadata_map.entry("stats").or_insert(default_stats); + stream_metadata_map.insert("stats".to_owned(), default_stats); stream_metadata_map.insert( "version".to_owned(), Value::String(storage::CURRENT_SCHEMA_VERSION.into()), @@ -56,5 +145,42 @@ pub fn v2_v3(mut stream_metadata: Value) -> Value { "objectstore-format".to_owned(), Value::String(storage::CURRENT_OBJECT_STORE_VERSION.into()), ); + + let snapshot = stream_metadata_map.get("snapshot").unwrap().clone(); + let version = snapshot + .as_object() + .and_then(|meta| meta.get("version")) + .and_then(|version| version.as_str()); + if matches!(version, Some("v1")) { + let updated_snapshot = v1_v2_snapshot_migration(snapshot); + stream_metadata_map.insert("snapshot".to_owned(), updated_snapshot); + } + stream_metadata } + +fn v1_v2_snapshot_migration(mut snapshot: Value) -> Value { + let manifest_list = snapshot.get("manifest_list").unwrap(); + let mut new_manifest_list = Vec::new(); + for manifest in manifest_list.as_array().unwrap() { + let manifest_map = manifest.as_object().unwrap(); + let time_lower_bound = manifest_map.get("time_lower_bound").unwrap(); + let time_upper_bound = manifest_map.get("time_upper_bound").unwrap(); + let new_manifest = json!({ + "manifest_path": manifest_map.get("manifest_path").unwrap(), + "time_lower_bound": time_lower_bound, + "time_upper_bound": time_upper_bound, + "events_ingested": 0, + "ingestion_size": 0, + "storage_size": 0 + }); + new_manifest_list.push(new_manifest); + } + let snapshot_map: &mut serde_json::Map = snapshot.as_object_mut().unwrap(); + snapshot_map.insert( + "version".to_owned(), + Value::String(CURRENT_SNAPSHOT_VERSION.into()), + ); + snapshot_map.insert("manifest_list".to_owned(), Value::Array(new_manifest_list)); + snapshot +} diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index ef0eb69ba..dccb3aa84 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -863,16 +863,25 @@ mod tests { manifest_path: "1".to_string(), time_lower_bound: datetime_min(2023, 12, 15), time_upper_bound: datetime_max(2023, 12, 15), + events_ingested: 0, + ingestion_size: 0, + storage_size: 0, }, ManifestItem { manifest_path: "2".to_string(), time_lower_bound: datetime_min(2023, 12, 16), time_upper_bound: datetime_max(2023, 12, 16), + events_ingested: 0, + ingestion_size: 0, + storage_size: 0, }, ManifestItem { manifest_path: "3".to_string(), time_lower_bound: datetime_min(2023, 12, 17), time_upper_bound: datetime_max(2023, 12, 17), + events_ingested: 0, + ingestion_size: 0, + storage_size: 0, }, ] } diff --git a/server/src/stats.rs b/server/src/stats.rs index 5b92bfd85..05fc91dfd 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -15,8 +15,14 @@ * along with this program. If not, see . * */ - -use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE, STORAGE_SIZE}; +use crate::metrics::{ + DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, + EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, + LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, + STORAGE_SIZE, STORAGE_SIZE_TODAY, +}; +use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; +use std::sync::Arc; /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] @@ -26,48 +32,162 @@ pub struct Stats { pub storage: u64, } -pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] +pub struct FullStats { + pub lifetime_stats: Stats, + pub current_stats: Stats, + pub deleted_stats: Stats, + pub current_date_stats: Stats, +} + +pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); let events_ingested = EVENTS_INGESTED .get_metric_with_label_values(&event_labels) .ok()? - .get(); + .get() as u64; let ingestion_size = EVENTS_INGESTED_SIZE .get_metric_with_label_values(&event_labels) .ok()? - .get(); + .get() as u64; let storage_size = STORAGE_SIZE .get_metric_with_label_values(&storage_size_labels) .ok()? - .get(); - // this should be valid for all cases given that gauge must never go negative - let ingestion_size = ingestion_size as u64; - let storage_size = storage_size as u64; + .get() as u64; + let events_deleted = EVENTS_DELETED + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let events_deleted_size = EVENTS_DELETED_SIZE + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let deleted_events_storage_size = DELETED_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(&storage_size_labels) + .ok()? + .get() as u64; + let lifetime_events_ingested = LIFETIME_EVENTS_INGESTED + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let lifetime_ingestion_size = LIFETIME_EVENTS_INGESTED_SIZE + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let lifetime_events_storage_size = LIFETIME_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(&storage_size_labels) + .ok()? + .get() as u64; + let events_ingested_today = EVENTS_INGESTED_TODAY + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let ingestion_size_today = EVENTS_INGESTED_SIZE_TODAY + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let storage_size_today = STORAGE_SIZE_TODAY + .get_metric_with_label_values(&storage_size_labels) + .ok()? + .get() as u64; - Some(Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, + Some(FullStats { + lifetime_stats: Stats { + events: lifetime_events_ingested, + ingestion: lifetime_ingestion_size, + storage: lifetime_events_storage_size, + }, + current_stats: Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + }, + deleted_stats: Stats { + events: events_deleted, + ingestion: events_deleted_size, + storage: deleted_events_storage_size, + }, + current_date_stats: Stats { + events: events_ingested_today, + ingestion: ingestion_size_today, + storage: storage_size_today, + }, }) } +pub async fn update_deleted_stats( + storage: Arc, + stream_name: &str, + meta: ObjectStoreFormat, + dates: Vec, +) -> Result<(), ObjectStorageError> { + let mut num_row: i64 = 0; + let mut storage_size: i64 = 0; + let mut ingestion_size: i64 = 0; + + let mut manifests = meta.snapshot.manifest_list; + manifests.retain(|item| dates.iter().any(|date| item.manifest_path.contains(date))); + if !manifests.is_empty() { + for manifest in manifests { + num_row += manifest.events_ingested as i64; + ingestion_size += manifest.ingestion_size as i64; + storage_size += manifest.storage_size as i64; + } + } + EVENTS_DELETED + .with_label_values(&[stream_name, "json"]) + .add(num_row); + EVENTS_DELETED_SIZE + .with_label_values(&[stream_name, "json"]) + .add(ingestion_size); + DELETED_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(storage_size); + EVENTS_INGESTED + .with_label_values(&[stream_name, "json"]) + .sub(num_row); + EVENTS_INGESTED_SIZE + .with_label_values(&[stream_name, "json"]) + .sub(ingestion_size); + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .sub(storage_size); + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + if let Err(e) = storage.put_stats(stream_name, &stats).await { + log::warn!("Error updating stats to objectstore due to error [{}]", e); + } + } + + Ok(()) +} + pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Result<()> { let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); EVENTS_INGESTED.remove_label_values(&event_labels)?; + EVENTS_INGESTED_TODAY.remove_label_values(&event_labels)?; EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; + EVENTS_INGESTED_SIZE_TODAY.remove_label_values(&event_labels)?; STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + STORAGE_SIZE_TODAY.remove_label_values(&storage_size_labels)?; + EVENTS_DELETED.remove_label_values(&event_labels)?; + EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?; + DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + LIFETIME_EVENTS_INGESTED.remove_label_values(&event_labels)?; + LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; + LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; Ok(()) } -fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] { +pub fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; 2] { [stream_name, format] } -fn storage_size_labels(stream_name: &str) -> [&str; 3] { +pub fn storage_size_labels(stream_name: &str) -> [&str; 3] { ["data", stream_name, "parquet"] } diff --git a/server/src/storage.rs b/server/src/storage.rs index d6fb2fb7b..cf25ad826 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,7 @@ * */ -use crate::{catalog::snapshot::Snapshot, stats::Stats}; +use crate::{catalog::snapshot::Snapshot, stats::FullStats}; use chrono::Local; @@ -66,8 +66,8 @@ const MAX_OBJECT_STORE_REQUESTS: usize = 1000; // const PERMISSIONS_READ_WRITE: &str = "readwrite"; const ACCESS_ALL: &str = "all"; -pub const CURRENT_OBJECT_STORE_VERSION: &str = "v3"; -pub const CURRENT_SCHEMA_VERSION: &str = "v3"; +pub const CURRENT_OBJECT_STORE_VERSION: &str = "v4"; +pub const CURRENT_SCHEMA_VERSION: &str = "v4"; #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct ObjectStoreFormat { @@ -83,7 +83,7 @@ pub struct ObjectStoreFormat { pub first_event_at: Option, pub owner: Owner, pub permissions: Vec, - pub stats: Stats, + pub stats: FullStats, #[serde(default)] pub snapshot: Snapshot, #[serde(default)] @@ -157,7 +157,7 @@ impl Default for ObjectStoreFormat { first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], - stats: Stats::default(), + stats: FullStats::default(), snapshot: Snapshot::default(), cache_enabled: false, retention: None, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 04a63704a..f5945b86d 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,6 +26,7 @@ use super::{ }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; use crate::option::Mode; use crate::{ alerts::Alerts, @@ -34,7 +35,7 @@ use crate::{ metadata::STREAM_INFO, metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, - stats::{self, Stats}, + stats::{self, FullStats, Stats}, }; use actix_web_prometheus::PrometheusMetrics; @@ -174,7 +175,11 @@ pub trait ObjectStorage: Sync + 'static { .await } - async fn put_stats(&self, stream_name: &str, stats: &Stats) -> Result<(), ObjectStorageError> { + async fn put_stats( + &self, + stream_name: &str, + stats: &FullStats, + ) -> Result<(), ObjectStorageError> { let path = stream_json_path(stream_name); let stream_metadata = self.get_object(&path).await?; let stats = serde_json::to_value(stats).expect("stats are perfectly serializable"); @@ -182,7 +187,6 @@ pub trait ObjectStorage: Sync + 'static { serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); stream_metadata["stats"] = stats; - self.put_object(&path, to_bytes(&stream_metadata)).await } @@ -282,7 +286,7 @@ pub trait ObjectStorage: Sync + 'static { .expect("parseable config is valid json"); if CONFIG.parseable.mode == Mode::Ingest { - config.stats = Stats::default(); + config.stats = FullStats::default(); config.snapshot.manifest_list = vec![]; } @@ -320,7 +324,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(stats) } - async fn get_stats(&self, stream_name: &str) -> Result { + async fn get_stats(&self, stream_name: &str) -> Result { let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; let stream_metadata: Value = serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); @@ -432,7 +436,6 @@ pub trait ObjectStorage: Sync + 'static { } let streams = STREAM_INFO.list_streams(); - let mut stream_stats = HashMap::new(); let cache_manager = LocalCacheManager::global(); let mut cache_updates: HashMap<&String, Vec<_>> = HashMap::new(); @@ -464,15 +467,20 @@ pub trait ObjectStorage: Sync + 'static { commit_schema_to_storage(stream, schema).await?; } } - + let mut compressed_size: u64 = 0; let parquet_files = dir.parquet_files(); parquet_files.iter().for_each(|file| { - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - stream_stats - .entry(stream) - .and_modify(|size| *size += compressed_size) - .or_insert_with(|| compressed_size); + compressed_size += file.metadata().map_or(0, |meta| meta.len()); }); + STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); + STORAGE_SIZE_TODAY + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); for file in parquet_files { let filename = file @@ -499,6 +507,12 @@ pub trait ObjectStorage: Sync + 'static { let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; + let stats = stats::get_current_stats(stream, "json"); + if let Some(stats) = stats { + if let Err(e) = self.put_stats(stream, &stats).await { + log::warn!("Error updating stats to objectstore due to error [{}]", e); + } + } if cache_enabled && cache_manager.is_some() { cache_updates .entry(stream) @@ -510,18 +524,6 @@ pub trait ObjectStorage: Sync + 'static { } } - for (stream, compressed_size) in stream_stats { - STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - let stats = stats::get_current_stats(stream, "json"); - if let Some(stats) = stats { - if let Err(e) = self.put_stats(stream, &stats).await { - log::warn!("Error updating stats to objectstore due to error [{}]", e); - } - } - } - if let Some(manager) = cache_manager { let cache_updates = cache_updates .into_iter() diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index f67a15eb7..7e697b3ae 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -52,6 +52,7 @@ pub fn init_scheduler() { log::info!("Setting up schedular"); let mut scheduler = AsyncScheduler::new(); let func = move || async { + //get retention every day at 12 am for stream in STREAM_INFO.list_streams() { let res = CONFIG .storage() @@ -188,22 +189,20 @@ impl From for Vec { } mod action { + use crate::catalog::remove_manifest_from_snapshot; + use crate::{metadata, option::CONFIG}; use chrono::{Days, NaiveDate, Utc}; use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools; use relative_path::RelativePathBuf; - use crate::{catalog::remove_manifest_from_snapshot, metadata, option::CONFIG}; - pub(super) async fn delete(stream_name: String, days: u32) { log::info!("running retention task - delete for stream={stream_name}"); + let store = CONFIG.storage().get_object_store(); + let retain_until = get_retain_until(Utc::now().date_naive(), days as u64); - let Ok(mut dates) = CONFIG - .storage() - .get_object_store() - .list_dates(&stream_name) - .await - else { + + let Ok(mut dates) = store.list_dates(&stream_name).await else { return; }; dates.retain(|date| date.starts_with("date")); @@ -212,35 +211,39 @@ mod action { .filter(|date| string_to_date(date) < retain_until) .collect_vec(); let dates = dates_to_delete.clone(); - let delete_tasks = FuturesUnordered::new(); - for date in dates_to_delete { - let path = RelativePathBuf::from_iter([&stream_name, &date]); - delete_tasks.push(async move { - CONFIG - .storage() - .get_object_store() - .delete_prefix(&path) - .await - }); - } + if !dates.is_empty() { + let delete_tasks = FuturesUnordered::new(); + let res_remove_manifest = + remove_manifest_from_snapshot(store.clone(), &stream_name, dates.clone()).await; + + for date in dates_to_delete { + let path = RelativePathBuf::from_iter([&stream_name, &date]); + delete_tasks.push(async move { + CONFIG + .storage() + .get_object_store() + .delete_prefix(&path) + .await + }); + } - let res: Vec<_> = delete_tasks.collect().await; + let res: Vec<_> = delete_tasks.collect().await; - for res in res { - if let Err(err) = res { - log::error!("Failed to run delete task {err:?}") + for res in res { + if let Err(err) = res { + log::error!("Failed to run delete task {err:?}"); + return; + } } - } - - let store = CONFIG.storage().get_object_store(); - let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await; - if let Ok(first_event_at) = res { - if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) - { - log::error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); + if let Ok(first_event_at) = res_remove_manifest { + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } } } }