diff --git a/apps/framework-cli/src/cli.rs b/apps/framework-cli/src/cli.rs index c6d49d5a3..63867b960 100644 --- a/apps/framework-cli/src/cli.rs +++ b/apps/framework-cli/src/cli.rs @@ -321,17 +321,19 @@ async fn top_command_handler( &settings, ); - let (metrics, rx) = Metrics::new(TelemetryMetadata { + let (metrics, rx_events) = Metrics::new(TelemetryMetadata { anonymous_telemetry_enabled: settings.telemetry.enabled, machine_id: settings.telemetry.machine_id.clone(), metric_labels: settings.metric.labels.clone(), is_moose_developer: settings.telemetry.is_moose_developer, is_production: project_arc.is_production, project_name: project_arc.name().to_string(), + export_metrics: settings.telemetry.export_metrics, + metric_endpoints: settings.metric.endpoints.clone(), }); let arc_metrics = Arc::new(metrics); - arc_metrics.start_listening_to_metrics(rx).await; + arc_metrics.start_listening_to_metrics(rx_events).await; check_project_name(&project_arc.name())?; run_local_infrastructure(&project_arc)?.show(); @@ -493,17 +495,19 @@ async fn top_command_handler( project.set_is_production_env(true); let project_arc = Arc::new(project); - let (metrics, rx) = Metrics::new(TelemetryMetadata { + let (metrics, rx_events) = Metrics::new(TelemetryMetadata { anonymous_telemetry_enabled: settings.telemetry.enabled, machine_id: settings.telemetry.machine_id.clone(), metric_labels: settings.metric.labels.clone(), is_moose_developer: settings.telemetry.is_moose_developer, is_production: project_arc.is_production, project_name: project_arc.name().to_string(), + export_metrics: settings.telemetry.export_metrics, + metric_endpoints: settings.metric.endpoints.clone(), }); let arc_metrics = Arc::new(metrics); - arc_metrics.start_listening_to_metrics(rx).await; + arc_metrics.start_listening_to_metrics(rx_events).await; let capture_handle = crate::utilities::capture::capture_usage( ActivityType::ProdCommand, diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index a6e53f6b1..984dc7ac1 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -1,6 +1,7 @@ use super::display::Message; use super::display::MessageType; use super::routines::auth::validate_auth_token; +use crate::metrics::MetricEvent; use crate::cli::display::with_spinner; use crate::framework::controller::RouteMeta; @@ -8,9 +9,9 @@ use crate::framework::controller::RouteMeta; use crate::framework::core::infrastructure::api_endpoint::APIType; use crate::framework::core::infrastructure_map::Change; use crate::framework::core::infrastructure_map::{ApiChange, InfrastructureMap}; +use crate::metrics::Metrics; use crate::utilities::docker; -use super::super::metrics::{Metrics, MetricsMessage}; use crate::framework::data_model::config::EndpointIngestionFormat; use crate::infrastructure::stream::redpanda; use crate::infrastructure::stream::redpanda::ConfiguredProducer; @@ -18,6 +19,7 @@ use crate::infrastructure::stream::redpanda::ConfiguredProducer; use crate::framework::typescript::bin::CliMessage; use crate::project::Project; use bytes::Buf; +use chrono::Utc; use http_body_util::BodyExt; use http_body_util::Full; use hyper::body::Body; @@ -40,6 +42,7 @@ use rdkafka::util::Timeout; use serde::Serialize; use serde::{Deserialize, Deserializer}; use serde_json::Deserializer as JsonDeserializer; +use tokio::spawn; use crate::framework::data_model::model::DataModel; use crate::utilities::validate_passthrough::{DataModelArrayVisitor, DataModelVisitor}; @@ -68,20 +71,6 @@ pub struct RouterRequest { route_table: &'static RwLock>, } -#[derive(Serialize, Deserialize, Debug, Clone)] -enum Direction { - In, - Out, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct FlowMessages { - count: u64, - function_name: String, - bytes: u64, - direction: Direction, -} - fn default_management_port() -> u16 { 5000 } @@ -133,8 +122,6 @@ async fn create_client( host: String, consumption_apis: &RwLock>, is_prod: bool, - metrics: Arc, - route: PathBuf, ) -> Result>, anyhow::Error> { // Extract the Authorization header and check the bearer token let auth_header = req.headers().get(hyper::header::AUTHORIZATION); @@ -208,13 +195,6 @@ async fn create_client( let res = sender.send_request(new_req).await?; let status = res.status(); let body = res.collect().await.unwrap().to_bytes().to_vec(); - metrics - .send_metric(MetricsMessage::PutConsumedBytesCount { - route, - method: "GET".to_string(), - bytes_count: body.len() as u64, - }) - .await; Ok(Response::builder() .status(status) @@ -318,7 +298,7 @@ async fn log_route(req: Request) -> Response> { }; show_message!(cli_message.message_type, message); } - Err(e) => println!("Received unknown message: {:?}", e), + Err(e) => println!("Received unkn message: {:?}", e), } Response::builder() @@ -329,33 +309,24 @@ async fn log_route(req: Request) -> Response> { async fn metrics_log_route(req: Request, metrics: Arc) -> Response> { let body = to_reader(req).await; - let parsed: Result = serde_json::from_reader(body); - match parsed { - Ok(cli_message) => match cli_message.direction { - Direction::In => { - metrics - .send_metric(MetricsMessage::PutStreamingFunctionMessagesIn { - function_name: cli_message.function_name.clone(), - count: cli_message.count, - }) - .await; - metrics - .send_metric(MetricsMessage::PutStreamingFunctionBytes { - function_name: cli_message.function_name.clone(), - bytes_count: cli_message.bytes, - }) - .await; - } - Direction::Out => { - metrics - .send_metric(MetricsMessage::PutStreamingFunctionMessagesOut { - function_name: cli_message.function_name.clone(), - count: cli_message.count, - }) - .await - } - }, - Err(e) => println!("Received unknown message: {:?}", e), + let parsed: Result = serde_json::from_reader(body); + if let Ok(MetricEvent::StreamingFunctionEvent { + count_in, + count_out, + bytes, + function_name, + timestamp, + }) = parsed + { + metrics + .send_metric_event(MetricEvent::StreamingFunctionEvent { + timestamp, + count_in, + count_out, + bytes, + function_name: function_name.clone(), + }) + .await; } Response::builder() @@ -368,10 +339,7 @@ async fn metrics_route(metrics: Arc) -> Result>, h let response = Response::builder() .status(StatusCode::OK) .body(Full::new(Bytes::from( - metrics - .get_prometheus_metrics_string() - .await - .unwrap_or_else(|e| format!("Unable to retrieve metrics: {}", e)), + metrics.get_metrics_registry_as_string().await, ))) .unwrap(); @@ -422,20 +390,9 @@ async fn send_payload_to_topic( configured_producer: &ConfiguredProducer, topic_name: &str, payload: Vec, - metrics: Arc, - route: PathBuf, ) -> Result<(i32, i64), (KafkaError, OwnedMessage)> { debug!("Sending payload {:?} to topic: {}", payload, topic_name); - metrics - .send_metric(MetricsMessage::PutHTTPToTopicEventCount { - route, - topic_name: topic_name.to_string(), - method: "POST".to_string(), - count: 1, - }) - .await; - configured_producer .producer .send( @@ -456,39 +413,22 @@ async fn handle_json_req( topic_name: &str, data_model: &DataModel, req: Request, - metrics: Arc, - route: PathBuf, ) -> Response> { // TODO probably a refactor to be done here with the array json but it doesn't seem to be // straightforward to do it in a generic way. let url = req.uri().to_string(); - let number_of_bytes = req.body().size_hint().exact().unwrap(); let body = to_reader(req).await; let parsed = JsonDeserializer::from_reader(body) .deserialize_any(&mut DataModelVisitor::new(&data_model.columns)); - metrics - .send_metric(MetricsMessage::PutIngestedBytesCount { - route: route.clone(), - method: "POST".to_string(), - bytes_count: number_of_bytes, - }) - .await; // TODO add check that the payload has the proper schema if let Err(e) = parsed { return bad_json_response(e); } - let res = send_payload_to_topic( - configured_producer, - topic_name, - parsed.ok().unwrap(), - metrics, - route, - ) - .await; + let res = send_payload_to_topic(configured_producer, topic_name, parsed.ok().unwrap()).await; if let Err((kafka_error, _)) = res { debug!( "Failed to deliver message to {} with error: {}", @@ -520,8 +460,6 @@ async fn handle_json_array_body( topic_name: &str, data_model: &DataModel, req: Request, - metrics: Arc, - route: PathBuf, ) -> Response> { // TODO probably a refactor to be done here with the json but it doesn't seem to be // straightforward to do it in a generic way. @@ -538,13 +476,6 @@ async fn handle_json_array_body( }); debug!("parsed json array for {}", topic_name); - metrics - .send_metric(MetricsMessage::PutIngestedBytesCount { - route: route.clone(), - method: "POST".to_string(), - bytes_count: number_of_bytes, - }) - .await; if let Err(e) = parsed { return bad_json_response(e); @@ -575,15 +506,6 @@ async fn handle_json_array_body( } wait_for_batch_complete(&mut res_arr, temp_res).await; - metrics - .send_metric(MetricsMessage::PutHTTPToTopicEventCount { - route: route.clone(), - method: "POST".to_string(), - count: res_arr.iter().filter(|res| res.is_ok()).count() as u64, - topic_name: topic_name.to_string(), - }) - .await; - if res_arr.iter().any(|res| res.is_err()) { return internal_server_error_response(); } @@ -632,7 +554,6 @@ async fn ingest_route( route: PathBuf, configured_producer: ConfiguredProducer, route_table: &RwLock>, - metrics: Arc, is_prod: bool, ) -> Result>, hyper::http::Error> { show_message!( @@ -660,8 +581,6 @@ async fn ingest_route( &route_meta.topic_name, &route_meta.data_model, req, - metrics, - route, ) .await), EndpointIngestionFormat::JsonArray => Ok(handle_json_array_body( @@ -669,8 +588,6 @@ async fn ingest_route( &route_meta.topic_name, &route_meta.data_model, req, - metrics, - route, ) .await), }, @@ -714,6 +631,8 @@ async fn router( let now = Instant::now(); let req = request.req; + let req_bytes = req.body().size_hint().exact().unwrap(); + let route_table = request.route_table; debug!( @@ -723,11 +642,10 @@ async fn router( ); let route = get_path_without_prefix(PathBuf::from(req.uri().path()), path_prefix); + let route_clone = route.clone(); let metrics_method = req.method().to_string(); - let metrics_path = route.clone(); - let route_split = route.to_str().unwrap().split('/').collect::>(); let res = match (req.method(), &route_split[..]) { (&hyper::Method::POST, ["ingest", _]) => { @@ -737,26 +655,16 @@ async fn router( route.join(current_version), configured_producer, route_table, - metrics.clone(), is_prod, ) .await } (&hyper::Method::POST, ["ingest", _, _]) => { - ingest_route( - req, - route, - configured_producer, - route_table, - metrics.clone(), - is_prod, - ) - .await + ingest_route(req, route, configured_producer, route_table, is_prod).await } (&hyper::Method::GET, ["consumption", _rt]) => { - match create_client(req, host, consumption_apis, is_prod, metrics.clone(), route).await - { + match create_client(req, host, consumption_apis, is_prod).await { Ok(response) => Ok(response), Err(e) => { debug!("Error: {:?}", e); @@ -772,17 +680,46 @@ async fn router( _ => route_not_found_response(), }; - let metrics_path_str = metrics_path.to_str().unwrap(); + let res_bytes = res.as_ref().unwrap().body().size_hint().exact().unwrap(); + let topic = route_table + .read() + .await + .get(&route_clone) + .map(|route_meta| route_meta.topic_name.clone()) + .unwrap_or_default(); + + let metrics_clone = metrics.clone(); + let metrics_path = route_clone.clone().to_str().unwrap().to_string(); + let metrics_path_clone = metrics_path.clone(); + + spawn(async move { + if metrics_path_clone.starts_with("ingest/") { + let _ = metrics_clone + .send_metric_event(MetricEvent::IngestedEvent { + topic, + timestamp: Utc::now(), + count: 1, + bytes: req_bytes, + latency: now.elapsed(), + route: metrics_path.clone(), + method: metrics_method.clone(), + }) + .await; + } - if metrics_path_str.starts_with("ingest/") || metrics_path_str.starts_with("consumption/") { - metrics - .send_metric(MetricsMessage::HTTPLatency { - path: metrics_path, - duration: now.elapsed(), - method: metrics_method, - }) - .await; - } + if metrics_path_clone.starts_with("consumption/") { + let _ = metrics_clone + .send_metric_event(MetricEvent::ConsumedEvent { + timestamp: Utc::now(), + count: 1, + latency: now.elapsed(), + bytes: res_bytes, + route: metrics_path.clone(), + method: metrics_method.clone(), + }) + .await; + } + }); res } diff --git a/apps/framework-cli/src/cli/settings.rs b/apps/framework-cli/src/cli/settings.rs index 1bd27be22..1409a3da4 100644 --- a/apps/framework-cli/src/cli/settings.rs +++ b/apps/framework-cli/src/cli/settings.rs @@ -24,6 +24,7 @@ const ENVIRONMENT_VARIABLE_PREFIX: &str = "MOOSE"; #[derive(Deserialize, Debug, Default)] pub struct MetricLabels { pub labels: Option, + pub endpoints: Option, } #[derive(Deserialize, Debug)] @@ -31,6 +32,8 @@ pub struct Telemetry { pub machine_id: String, pub enabled: bool, #[serde(default)] + pub export_metrics: bool, + #[serde(default)] pub is_moose_developer: bool, } @@ -40,6 +43,7 @@ impl Default for Telemetry { enabled: true, is_moose_developer: false, machine_id: Uuid::new_v4().to_string(), + export_metrics: false, } } } diff --git a/apps/framework-cli/src/framework/python/scripts/streaming_function_runner.py b/apps/framework-cli/src/framework/python/scripts/streaming_function_runner.py index 8bd19016d..78878bca6 100644 --- a/apps/framework-cli/src/framework/python/scripts/streaming_function_runner.py +++ b/apps/framework-cli/src/framework/python/scripts/streaming_function_runner.py @@ -176,8 +176,7 @@ def send_message_metrics_in(): global bytes_count while True: time.sleep(1) - requests.post("http://localhost:5000/metrics-logs", json={'count': count_in, 'bytes': bytes_count, 'function_name': f'{source_topic} -> {target_topic}', 'direction': 'In'}) - requests.post("http://localhost:5000/metrics-logs", json={'count': count_out, 'bytes': bytes_count, 'function_name': f'{source_topic} -> {target_topic}', 'direction': 'Out'}) + requests.post("http://localhost:5000/metrics-logs", json={'count_in': count_in, 'count_out': count_out, 'bytes': bytes_count, 'function_name': f'{source_topic} -> {target_topic}'}) count_in = 0 count_out = 0 bytes_count = 0 diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/inserter.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/inserter.rs index f07f6ee87..ef65f390e 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/inserter.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/inserter.rs @@ -55,7 +55,6 @@ async fn flush( interval.tick().await; let mut buffer_owned = buffer.lock().await; if buffer_owned.is_empty() { - drop(buffer_owned); continue; } @@ -73,7 +72,5 @@ async fn flush( } buffer_owned.clear(); - - drop(buffer_owned); } } diff --git a/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs b/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs index 2e71e790a..f2a17e5a3 100644 --- a/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs +++ b/apps/framework-cli/src/infrastructure/processes/kafka_clickhouse_sync.rs @@ -29,7 +29,7 @@ use crate::infrastructure::stream::redpanda::create_subscriber; use crate::infrastructure::stream::redpanda::fetch_topics; use crate::infrastructure::stream::redpanda::RedpandaConfig; use crate::infrastructure::stream::redpanda::{create_producer, send_with_back_pressure}; -use crate::metrics::{Metrics, MetricsMessage}; +use crate::metrics::{MetricEvent, Metrics}; const TABLE_SYNC_GROUP_ID: &str = "clickhouse_sync"; const VERSION_SYNC_GROUP_ID: &str = "version_sync_flow_sync"; @@ -428,17 +428,12 @@ async fn iterate_subscriber<'a, F>( source_topic_name, payload_str ); metrics - .send_metric(MetricsMessage::PutTopicToOLAPBytesCount { - consumer_group: "clickhouse sync".to_string(), - topic_name: source_topic_name.clone(), - bytes_count: payload.len() as u64, - }) - .await; - metrics - .send_metric(MetricsMessage::PutTopicToOLAPEventCount { + .send_metric_event(MetricEvent::TopicToOLAPEvent { + timestamp: chrono::Utc::now(), + count: 1, + bytes: payload.len() as u64, consumer_group: "clickhouse sync".to_string(), topic_name: source_topic_name.clone(), - count: 1, }) .await; diff --git a/apps/framework-cli/src/main.rs b/apps/framework-cli/src/main.rs index ce19e2280..bbe9df8bf 100644 --- a/apps/framework-cli/src/main.rs +++ b/apps/framework-cli/src/main.rs @@ -2,6 +2,7 @@ mod cli; pub mod framework; pub mod infrastructure; pub mod metrics; +pub mod metrics_inserter; pub mod project; pub mod utilities; diff --git a/apps/framework-cli/src/metrics.rs b/apps/framework-cli/src/metrics.rs index 9ee64df89..4345d2f63 100644 --- a/apps/framework-cli/src/metrics.rs +++ b/apps/framework-cli/src/metrics.rs @@ -6,16 +6,19 @@ use prometheus_client::{ metrics::histogram::Histogram, registry::Registry, }; +use serde::Deserialize; use serde_json::json; use serde_json::Value; use std::env; use std::sync::{Arc, LazyLock}; -use std::{path::PathBuf, time::Duration}; +use std::time::Duration; +use tokio::sync::Mutex; use tokio::time; +use crate::metrics_inserter::MetricsInserter; use crate::utilities::constants::{CLI_VERSION, CONTEXT, CTX_SESSION_ID}; use crate::utilities::decode_object; -use chrono::Utc; +use chrono::{DateTime, Utc}; use log::warn; const DEFAULT_ANONYMOUS_METRICS_URL: &str = @@ -45,53 +48,39 @@ pub enum MetricsErrors { OneShotError(#[from] tokio::sync::oneshot::error::RecvError), } -pub enum MetricsMessage { - GetMetricsRegistryAsString(tokio::sync::oneshot::Sender), - HTTPLatency { - path: PathBuf, - method: String, - duration: Duration, - }, - PutIngestedBytesCount { - route: PathBuf, +#[derive(Debug, Clone, Deserialize)] +pub enum MetricEvent { + // GetMetricsRegistryAsString(tokio::sync::oneshot::Sender), + IngestedEvent { + topic: String, + timestamp: DateTime, + count: u64, + bytes: u64, + latency: Duration, + route: String, method: String, - bytes_count: u64, }, - PutConsumedBytesCount { - route: PathBuf, + ConsumedEvent { + timestamp: DateTime, + count: u64, + latency: Duration, + bytes: u64, + route: String, method: String, - bytes_count: u64, }, - PutHTTPToTopicEventCount { - topic_name: String, - route: PathBuf, - method: String, - count: u64, + StreamingFunctionEvent { + timestamp: DateTime, + count_in: u64, + count_out: u64, + bytes: u64, + function_name: String, }, - PutTopicToOLAPEventCount { - consumer_group: String, - topic_name: String, + TopicToOLAPEvent { + timestamp: DateTime, count: u64, - }, - PutTopicToOLAPBytesCount { + bytes: u64, consumer_group: String, topic_name: String, - bytes_count: u64, - }, - PutStreamingFunctionMessagesIn { - function_name: String, - count: u64, - }, - PutStreamingFunctionMessagesOut { - function_name: String, - count: u64, - }, - PutStreamingFunctionBytes { - function_name: String, - bytes_count: u64, - }, - PutBlockCount { - count: i64, }, } @@ -101,14 +90,18 @@ pub struct TelemetryMetadata { pub machine_id: String, pub is_moose_developer: bool, pub metric_labels: Option, + pub metric_endpoints: Option, pub is_production: bool, pub project_name: String, + pub export_metrics: bool, } #[derive(Clone)] pub struct Metrics { - pub tx: tokio::sync::mpsc::Sender, + pub tx_events: tokio::sync::mpsc::Sender, telemetry_metadata: TelemetryMetadata, + metrics_inserter: MetricsInserter, + registry: Arc>, } pub struct Statistics { @@ -164,34 +157,45 @@ pub struct MessagesOutCounterLabels { impl Metrics { pub fn new( telemetry_metadata: TelemetryMetadata, - ) -> (Metrics, tokio::sync::mpsc::Receiver) { - let (tx, rx) = tokio::sync::mpsc::channel(32); + ) -> (Metrics, tokio::sync::mpsc::Receiver) { + let (tx_events, rx_events) = tokio::sync::mpsc::channel(32); + let metric_labels = match telemetry_metadata + .metric_labels + .as_deref() + .map(decode_object::decode_base64_to_json) + { + Some(Ok(Value::Object(map))) => Some(map), + _ => None, + }; + let metric_endpoints = match telemetry_metadata + .metric_endpoints + .as_deref() + .map(decode_object::decode_base64_to_json) + { + Some(Ok(Value::Object(map))) => Some(map), + _ => None, + }; let metrics = Metrics { - tx, + tx_events, telemetry_metadata, + metrics_inserter: MetricsInserter::new(metric_labels, metric_endpoints), + registry: Arc::new(Mutex::new(Registry::default())), }; - (metrics, rx) + (metrics, rx_events) } - pub async fn send_metric(&self, data: MetricsMessage) { - let _ = self.tx.send(data).await; + pub async fn send_metric_event(&self, data: MetricEvent) { + let _ = self.tx_events.send(data).await; } - pub async fn get_prometheus_metrics_string( - &self, - ) -> Result { - let (resp_tx, resp_rx) = tokio::sync::oneshot::channel::(); - let _ = self - .tx - .send(MetricsMessage::GetMetricsRegistryAsString(resp_tx)) - .await; - - Ok(resp_rx.await?) + pub async fn get_metrics_registry_as_string(&self) -> String { + let registry = self.registry.lock().await; + formatted_registry(®istry) } pub async fn start_listening_to_metrics( &self, - mut rx: tokio::sync::mpsc::Receiver, + mut rx_events: tokio::sync::mpsc::Receiver, ) { let data = Arc::new(Statistics { http_ingested_request_count: Counter::default(), @@ -253,7 +257,7 @@ impl Metrics { ), }); - let mut registry = Registry::default(); + let mut registry = self.registry.lock().await; registry.register( TOTAL_LATENCY, @@ -310,135 +314,123 @@ impl Metrics { let cloned_data_ref = data.clone(); + let metrics_inserter = self.metrics_inserter.clone(); + let export_metrics = self.telemetry_metadata.export_metrics; + tokio::spawn(async move { - while let Some(message) = rx.recv().await { + while let Some(message) = rx_events.recv().await { + if export_metrics { + let _ = metrics_inserter.insert(message.clone()).await; + } match message { - MetricsMessage::GetMetricsRegistryAsString(v) => { - let _ = v.send(formatted_registry(®istry)); - } - MetricsMessage::HTTPLatency { - path, - duration, - method, - } => { - data.http_latency_histogram - .get_or_create(&HTTPLabel { - method, - path: path.clone().into_os_string().to_str().unwrap().to_string(), - }) - .observe(duration.as_secs_f64()); - data.http_latency_histogram_aggregate - .observe(duration.as_secs_f64()); - if path.starts_with("ingest") { - data.http_ingested_latency_sum_ms - .inc_by(duration.as_millis() as u64); - } else { - data.http_consumed_latency_sum_ms - .inc_by(duration.as_millis() as u64); - } - } - MetricsMessage::PutIngestedBytesCount { - route: path, - bytes_count, + MetricEvent::IngestedEvent { + timestamp: _, + count, + bytes, + latency, + route, method, + topic, } => { data.http_ingested_bytes .get_or_create(&HTTPLabel { - method, - path: path.clone().into_os_string().to_str().unwrap().to_string(), + method: method.clone(), + path: route.clone(), }) - .inc_by(bytes_count); + .inc_by(bytes); data.http_ingested_request_count.inc(); - data.http_ingested_total_bytes.inc_by(bytes_count); + data.http_ingested_total_bytes.inc_by(bytes); + + data.http_ingested_latency_sum_ms + .inc_by(latency.as_millis() as u64); + + data.http_to_topic_event_count + .get_or_create(&MessagesInCounterLabels { + path: route.clone(), + topic_name: topic.clone(), + method: method.clone(), + }) + .inc_by(count); } - MetricsMessage::PutConsumedBytesCount { - route: path, - bytes_count, + MetricEvent::ConsumedEvent { + timestamp: _, + count: _, + latency, + bytes, + route, method, } => { + data.http_latency_histogram + .get_or_create(&HTTPLabel { + method: method.clone(), + path: route.clone(), + }) + .observe(latency.as_secs_f64()); + data.http_latency_histogram_aggregate + .observe(latency.as_secs_f64()); + data.http_consumed_latency_sum_ms + .inc_by(latency.as_millis() as u64); + data.http_consumed_bytes .get_or_create(&HTTPLabel { - method, - path: path.clone().into_os_string().to_str().unwrap().to_string(), + method: method.clone(), + path: route.clone(), }) - .inc_by(bytes_count); + .inc_by(bytes); } - MetricsMessage::PutHTTPToTopicEventCount { - route: path, - topic_name, - method, + MetricEvent::TopicToOLAPEvent { + timestamp: _, count, - } => { - data.http_to_topic_event_count - .get_or_create(&MessagesInCounterLabels { - path: path.clone().into_os_string().to_str().unwrap().to_string(), - topic_name, - method, - }) - .inc_by(count); - } - MetricsMessage::PutTopicToOLAPEventCount { + bytes, consumer_group, topic_name, - count, } => { data.topic_to_olap_event_count .get_or_create(&MessagesOutCounterLabels { - consumer_group, - topic_name, + consumer_group: consumer_group.clone(), + topic_name: topic_name.clone(), }) .inc_by(count); data.topic_to_olap_event_total_count.inc_by(count); + + data.topic_to_olap_bytes_count + .get_or_create(&MessagesOutCounterLabels { + consumer_group: consumer_group.clone(), + topic_name: topic_name.clone(), + }) + .inc_by(bytes); + data.topic_to_olap_bytes_total_count.inc_by(bytes); } - MetricsMessage::PutStreamingFunctionMessagesIn { + MetricEvent::StreamingFunctionEvent { + timestamp: _, + count_in, + count_out, + bytes, function_name, - count, } => { data.streaming_functions_in_event_count .get_or_create(&StreamingFunctionMessagesCounterLabels { - function_name, + function_name: function_name.clone(), }) - .inc_by(count); - data.streaming_functions_in_event_total_count.inc_by(count); - } - MetricsMessage::PutStreamingFunctionMessagesOut { - function_name, - count, - } => { + .inc_by(count_in); + data.streaming_functions_in_event_total_count + .inc_by(count_in); + data.streaming_functions_out_event_count .get_or_create(&StreamingFunctionMessagesCounterLabels { - function_name, - }) - .inc_by(count); - data.streaming_functions_out_event_total_count.inc_by(count); - } - MetricsMessage::PutTopicToOLAPBytesCount { - consumer_group, - topic_name, - bytes_count, - } => { - data.topic_to_olap_bytes_count - .get_or_create(&MessagesOutCounterLabels { - consumer_group, - topic_name, + function_name: function_name.clone(), }) - .inc_by(bytes_count); - data.topic_to_olap_bytes_total_count.inc_by(bytes_count); - } - MetricsMessage::PutStreamingFunctionBytes { - function_name, - bytes_count: count, - } => { + .inc_by(count_out); + data.streaming_functions_out_event_total_count + .inc_by(count_out); + data.streaming_functions_processed_bytes_count .get_or_create(&StreamingFunctionMessagesCounterLabels { - function_name, + function_name: function_name.clone(), }) - .inc_by(count); + .inc_by(bytes); data.streaming_functions_processed_bytes_total_count - .inc_by(count); - } - MetricsMessage::PutBlockCount { count } => { - data.blocks_count.set(count); + .inc_by(bytes); } }; } diff --git a/apps/framework-cli/src/metrics_inserter.rs b/apps/framework-cli/src/metrics_inserter.rs new file mode 100644 index 000000000..29008bba2 --- /dev/null +++ b/apps/framework-cli/src/metrics_inserter.rs @@ -0,0 +1,165 @@ +use crate::metrics::MetricEvent; +use reqwest::Client; +use serde_json::json; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::time; + +const MAX_FLUSH_INTERVAL_SECONDS: u64 = 10; +const MAX_BATCH_SIZE: usize = 1000; + +pub type BatchEvents = Arc>>; + +#[derive(Clone)] +pub struct MetricsInserter { + buffer: BatchEvents, +} + +impl MetricsInserter { + pub fn new( + metric_labels: Option>, + metric_endpoints: Option>, + ) -> Self { + let buffer = Arc::new(Mutex::new(Vec::new())); + + tokio::spawn(flush( + buffer.clone(), + metric_labels.clone(), + metric_endpoints.clone(), + )); + + Self { buffer } + } + + pub async fn insert(&self, event: MetricEvent) -> anyhow::Result<()> { + let mut buffer = self.buffer.lock().await; + buffer.push(event); + Ok(()) + } +} + +async fn flush( + buffer: BatchEvents, + metric_labels: Option>, + metric_endpoints: Option>, +) { + let mut interval = time::interval(Duration::from_secs(MAX_FLUSH_INTERVAL_SECONDS)); + let client = Client::new(); + + loop { + interval.tick().await; + let mut buffer_owned = buffer.lock().await; + if buffer_owned.is_empty() { + continue; + } + + let mut event_groups: std::collections::HashMap<&str, Vec> = + std::collections::HashMap::new(); + + for chunk in buffer_owned.chunks(MAX_BATCH_SIZE) { + for event in chunk { + let (event_type, payload) = match event { + MetricEvent::IngestedEvent { + timestamp, + count, + bytes, + latency, + route, + method, + topic, + } => ( + "IngestEvent", + &json!({ + "timestamp": timestamp, + "count": count, + "bytes": bytes, + "latency": latency.as_secs_f64(), + "route": route.clone(), + "method": method, + "topic": topic, + }), + ), + + MetricEvent::ConsumedEvent { + timestamp, + count, + latency, + bytes, + route, + method, + } => ( + "ConsumptionEvent", + &json!({ + "timestamp": timestamp, + "count": count, + "latency": latency.as_secs_f64(), + "bytes": bytes, + "route": route.clone(), + "method": method, + }), + ), + + MetricEvent::StreamingFunctionEvent { + timestamp, + count_in, + count_out, + bytes, + function_name, + } => ( + "StreamingFunctionEvent", + &json!({ + "timestamp": timestamp, + "count_in": count_in, + "count_out": count_out, + "bytes": bytes, + "function_name": function_name, + }), + ), + MetricEvent::TopicToOLAPEvent { + timestamp, + count, + bytes, + consumer_group, + topic_name, + } => ( + "TopicToOLAPEvent", + &json!({ + "timestamp": timestamp, + "count": count, + "bytes": bytes, + "consumer_group": consumer_group, + "topic_name": topic_name, + }), + ), + }; + + let mut payload = payload.clone(); + let payload_obj = payload.as_object_mut().unwrap(); + if let Some(labels_obj) = &metric_labels { + payload_obj.extend(labels_obj.iter().map(|(k, v)| (k.clone(), v.clone()))); + } + + event_groups.entry(event_type).or_default().push(payload); + } + } + + for (event_type, events) in event_groups { + let route = match metric_endpoints + .as_ref() + .and_then(|endpoints| endpoints.get(event_type)) + .and_then(|endpoint| endpoint.as_str()) + { + Some(route) => route, + None => { + eprintln!("Error: No endpoint found for event type: {}", event_type); + continue; + } + }; + + let _ = client.post(route).json(&events).send().await; + } + + buffer_owned.clear(); + } +} diff --git a/packages/ts-moose-lib/src/streaming-functions/runner.ts b/packages/ts-moose-lib/src/streaming-functions/runner.ts index 3f372b8c2..e88c120be 100755 --- a/packages/ts-moose-lib/src/streaming-functions/runner.ts +++ b/packages/ts-moose-lib/src/streaming-functions/runner.ts @@ -5,10 +5,11 @@ import http from "http"; import { cliLog } from "../commons"; type CliLogData = { - count: number; + count_in: number; + count_out: number; bytes: number; function_name: string; - direction: "In" | "Out"; + timestamp: Date; }; type StreamingFunction = (data: unknown) => unknown | Promise; @@ -210,28 +211,20 @@ let count_in = 0; let count_out = 0; let bytes = 0; -const sendMessageMetricsIn = (logger: Logger) => { - metricsLog({ - count: count_in, - function_name: logger.logPrefix, - bytes: bytes, - direction: "In", - }); +const sendMessageMetrics = (logger: Logger) => { + if (count_in > 0 || count_out > 0 || bytes > 0) { + metricsLog({ + count_in: count_in, + count_out: count_out, + function_name: logger.logPrefix, + bytes: bytes, + timestamp: new Date(), + }); + } count_in = 0; bytes = 0; - setTimeout(() => sendMessageMetricsIn(logger), 1000); -}; - -const sendMessageMetricsOut = (logger: Logger) => { - metricsLog({ - count: count_out, - // We actually only read bytes from the in direction - bytes: bytes, - function_name: logger.logPrefix, - direction: "Out", - }); count_out = 0; - setTimeout(() => sendMessageMetricsOut(logger), 1000); + setTimeout(() => sendMessageMetrics(logger), 1000); }; const startConsumer = async ( @@ -299,6 +292,7 @@ const startConsumer = async ( logger.log("Consumer is running..."); }; + /** * message.max.bytes is a broker setting that applies to all topics. * max.message.bytes is a per-topic setting. @@ -339,8 +333,7 @@ export const runStreamingFunctions = async (): Promise => { const args = parseArgs(); const logger = buildLogger(args); - setTimeout(() => sendMessageMetricsIn(logger), 1000); - setTimeout(() => sendMessageMetricsOut(logger), 1000); + setTimeout(() => sendMessageMetrics(logger), 1000); const kafka = new Kafka({ clientId: "streaming-function-consumer",