Skip to content

Commit

Permalink
fix: stats enhancement (#799)
Browse files Browse the repository at this point in the history
Till now the ingested events count and the ingested size would be 
fixed for the lifetime of a stream. Even if older data was deleted by 
retention stats were not updated all, leading to confusion. 

This PR adds a new way to track stats with three different qualifiers for
each stat type. Now we have lifetime, current and deleted qualifiers for
events ingested, ingested size and stored size, so that the users
have much better view of their data and its compression, storage used
etc. This change will be enhanced further with these updated metrics
visible in console. 

Fixes #788
  • Loading branch information
nikhilsinhaparseable authored May 21, 2024
1 parent 4023b56 commit ad7f67e
Show file tree
Hide file tree
Showing 19 changed files with 904 additions and 168 deletions.
113 changes: 101 additions & 12 deletions server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::option::{Mode, CONFIG};
use crate::storage;
use crate::{metadata, stats};

use crate::stats::Stats;
use actix_web::{web, HttpRequest, Responder};
use chrono::{DateTime, Utc};
use clokwerk::{AsyncScheduler, Interval};
Expand Down Expand Up @@ -70,6 +71,12 @@ pub struct Report {
total_events_count: u64,
total_json_bytes: u64,
total_parquet_bytes: u64,
current_events_count: u64,
current_json_bytes: u64,
current_parquet_bytes: u64,
deleted_events_count: u64,
deleted_json_bytes: u64,
deleted_parquet_bytes: u64,
metrics: HashMap<String, Value>,
}

Expand Down Expand Up @@ -112,6 +119,12 @@ impl Report {
total_events_count: ingestor_metrics.3,
total_json_bytes: ingestor_metrics.4,
total_parquet_bytes: ingestor_metrics.5,
current_events_count: ingestor_metrics.6,
current_json_bytes: ingestor_metrics.7,
current_parquet_bytes: ingestor_metrics.8,
deleted_events_count: ingestor_metrics.9,
deleted_json_bytes: ingestor_metrics.10,
deleted_parquet_bytes: ingestor_metrics.11,
metrics: build_metrics().await,
})
}
Expand All @@ -132,26 +145,70 @@ fn total_streams() -> usize {
metadata::STREAM_INFO.list_streams().len()
}

fn total_event_stats() -> (u64, u64, u64) {
fn total_event_stats() -> (Stats, Stats, Stats) {
let mut total_events: u64 = 0;
let mut total_parquet_bytes: u64 = 0;
let mut total_json_bytes: u64 = 0;

let mut current_events: u64 = 0;
let mut current_parquet_bytes: u64 = 0;
let mut current_json_bytes: u64 = 0;

let mut deleted_events: u64 = 0;
let mut deleted_parquet_bytes: u64 = 0;
let mut deleted_json_bytes: u64 = 0;

for stream in metadata::STREAM_INFO.list_streams() {
let Some(stats) = stats::get_current_stats(&stream, "json") else {
continue;
};
total_events += stats.events;
total_parquet_bytes += stats.storage;
total_json_bytes += stats.ingestion;
total_events += stats.lifetime_stats.events;
total_parquet_bytes += stats.lifetime_stats.storage;
total_json_bytes += stats.lifetime_stats.ingestion;

current_events += stats.current_stats.events;
current_parquet_bytes += stats.current_stats.storage;
current_json_bytes += stats.current_stats.ingestion;

deleted_events += stats.deleted_stats.events;
deleted_parquet_bytes += stats.deleted_stats.storage;
deleted_json_bytes += stats.deleted_stats.ingestion;
}
(total_events, total_json_bytes, total_parquet_bytes)

(
Stats {
events: total_events,
ingestion: total_json_bytes,
storage: total_parquet_bytes,
},
Stats {
events: current_events,
ingestion: current_json_bytes,
storage: current_parquet_bytes,
},
Stats {
events: deleted_events,
ingestion: deleted_json_bytes,
storage: deleted_parquet_bytes,
},
)
}

async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
async fn fetch_ingestors_metrics(
) -> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> {
let event_stats = total_event_stats();
let mut node_metrics =
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
let mut node_metrics = NodeMetrics::new(
total_streams(),
event_stats.0.events,
event_stats.0.ingestion,
event_stats.0.storage,
event_stats.1.events,
event_stats.1.ingestion,
event_stats.1.storage,
event_stats.2.events,
event_stats.2.ingestion,
event_stats.2.storage,
);

let mut vec = vec![];
let mut active_ingestors = 0u64;
Expand Down Expand Up @@ -198,6 +255,12 @@ async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64,
node_metrics.total_events_count,
node_metrics.total_json_bytes,
node_metrics.total_parquet_bytes,
node_metrics.current_events_count,
node_metrics.current_json_bytes,
node_metrics.current_parquet_bytes,
node_metrics.deleted_events_count,
node_metrics.deleted_json_bytes,
node_metrics.deleted_parquet_bytes,
))
}

Expand Down Expand Up @@ -255,30 +318,56 @@ struct NodeMetrics {
total_events_count: u64,
total_json_bytes: u64,
total_parquet_bytes: u64,
current_events_count: u64,
current_json_bytes: u64,
current_parquet_bytes: u64,
deleted_events_count: u64,
deleted_json_bytes: u64,
deleted_parquet_bytes: u64,
}

impl NodeMetrics {
fn build() -> Self {
let event_stats = total_event_stats();
Self {
stream_count: total_streams(),
total_events_count: event_stats.0,
total_json_bytes: event_stats.1,
total_parquet_bytes: event_stats.2,
total_events_count: event_stats.0.events,
total_json_bytes: event_stats.0.ingestion,
total_parquet_bytes: event_stats.0.storage,

current_events_count: event_stats.1.events,
current_json_bytes: event_stats.1.ingestion,
current_parquet_bytes: event_stats.1.storage,

deleted_events_count: event_stats.2.events,
deleted_json_bytes: event_stats.2.ingestion,
deleted_parquet_bytes: event_stats.2.storage,
}
}

#[allow(clippy::too_many_arguments)]
fn new(
stream_count: usize,
total_events_count: u64,
total_json_bytes: u64,
total_parquet_bytes: u64,
current_events_count: u64,
current_json_bytes: u64,
current_parquet_bytes: u64,
deleted_events_count: u64,
deleted_json_bytes: u64,
deleted_parquet_bytes: u64,
) -> Self {
Self {
stream_count,
total_events_count,
total_json_bytes,
total_parquet_bytes,
current_events_count,
current_json_bytes,
current_parquet_bytes,
deleted_events_count,
deleted_json_bytes,
deleted_parquet_bytes,
}
}

Expand Down
57 changes: 48 additions & 9 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers::http::base_path_without_preceding_slash;
use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY};
use crate::option::CONFIG;
use crate::stats::{event_labels, storage_size_labels, update_deleted_stats};
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -101,13 +103,28 @@ pub async fn update_snapshot(
change: manifest::File,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let event_labels = event_labels(stream_name, "json");
let storage_size_labels = storage_size_labels(stream_name);
let events_ingested = EVENTS_INGESTED_TODAY
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_TODAY
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = STORAGE_SIZE_TODAY
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;

let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = &mut meta.snapshot.manifest_list;
let time_partition: Option<String> = meta_clone.time_partition;
let time_partition = &meta_clone.time_partition;
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(&change, time_partition);
let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string());
lower_bound
}
None => {
Expand All @@ -129,12 +146,18 @@ pub async fn update_snapshot(
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);

let mut ch = false;
for m in manifests.iter() {
for m in manifests.iter_mut() {
let p = manifest_path("").to_string();
if m.manifest_path.contains(&p) {
ch = true;
m.events_ingested = events_ingested;
m.ingestion_size = ingestion_size;
m.storage_size = storage_size;
}
}

meta.snapshot.manifest_list = manifests.to_vec();
storage.put_snapshot(stream_name, meta.snapshot).await?;
if ch {
if let Some(mut manifest) = storage.get_manifest(&path).await? {
manifest.apply_change(change);
Expand All @@ -148,7 +171,10 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
false,
meta,
meta_clone,
events_ingested,
ingestion_size,
storage_size,
)
.await?;
}
Expand All @@ -159,7 +185,10 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
true,
meta,
meta_clone,
events_ingested,
ingestion_size,
storage_size,
)
.await?;
}
Expand All @@ -170,21 +199,28 @@ pub async fn update_snapshot(
storage.clone(),
stream_name,
true,
meta,
meta_clone,
events_ingested,
ingestion_size,
storage_size,
)
.await?;
}

Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn create_manifest(
lower_bound: DateTime<Utc>,
change: manifest::File,
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
update_snapshot: bool,
mut meta: ObjectStoreFormat,
events_ingested: u64,
ingestion_size: u64,
storage_size: u64,
) -> Result<(), ObjectStorageError> {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
Expand Down Expand Up @@ -216,6 +252,9 @@ async fn create_manifest(
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
events_ingested,
ingestion_size,
storage_size,
};
manifests.push(new_snapshot_entry);
meta.snapshot.manifest_list = manifests;
Expand All @@ -233,6 +272,8 @@ pub async fn remove_manifest_from_snapshot(
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_for_stats = meta.clone();
update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?;
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
Expand Down Expand Up @@ -308,8 +349,6 @@ pub async fn get_first_event(
);
// Convert dates vector to Bytes object
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
// delete the stream

let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
Expand All @@ -333,7 +372,7 @@ pub async fn get_first_event(

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
fn partition_path(
pub fn partition_path(
stream: &str,
lower_bound: DateTime<Utc>,
upper_bound: DateTime<Utc>,
Expand Down
5 changes: 4 additions & 1 deletion server/src/catalog/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use chrono::{DateTime, Utc};

use crate::query::PartialTimeFilter;

pub const CURRENT_SNAPSHOT_VERSION: &str = "v1";
pub const CURRENT_SNAPSHOT_VERSION: &str = "v2";
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
pub version: String,
Expand Down Expand Up @@ -76,4 +76,7 @@ pub struct ManifestItem {
pub manifest_path: String,
pub time_lower_bound: DateTime<Utc>,
pub time_upper_bound: DateTime<Utc>,
pub events_ingested: u64,
pub ingestion_size: u64,
pub storage_size: u64,
}
Loading

0 comments on commit ad7f67e

Please sign in to comment.