diff --git a/dev/datasources/datasources.yml b/dev/datasources/datasources.yml index b2e04bee6d..bd3cc3a691 100644 --- a/dev/datasources/datasources.yml +++ b/dev/datasources/datasources.yml @@ -19,3 +19,5 @@ datasources: type: prometheus access: proxy url: http://prometheus:9090 + jsonData: + timeInterval: 100ms diff --git a/dev/prometheus-config/config.yml b/dev/prometheus-config/config.yml index 30b913477b..10da846a7a 100644 --- a/dev/prometheus-config/config.yml +++ b/dev/prometheus-config/config.yml @@ -1,5 +1,5 @@ global: - scrape_interval: 1s + scrape_interval: 100ms scrape_configs: - job_name: metrics static_configs: diff --git a/lib/si-pool-noodle/src/pool_noodle.rs b/lib/si-pool-noodle/src/pool_noodle.rs index c7dfec1c92..30a61029c4 100644 --- a/lib/si-pool-noodle/src/pool_noodle.rs +++ b/lib/si-pool-noodle/src/pool_noodle.rs @@ -3,9 +3,9 @@ use std::fmt; use std::fmt::Display; use std::fmt::Formatter; use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use telemetry_utils::metric; use tokio::sync::Mutex; use tokio::time::sleep; use tracing::info; @@ -86,7 +86,6 @@ where let pool = PoolNoodle(Arc::new(PoolNoodleInner { pool_size, spec, - active: AtomicUsize::new(0), dropped: ArrayQueue::new(pool_size as usize), ready: ArrayQueue::new(pool_size as usize), to_be_cleaned, @@ -145,7 +144,7 @@ where while !stop.load(Ordering::Relaxed) { // let's make more instances! - if let Some(id) = me.unprepared.pop() { + if let Some(id) = Self::pop_from_unprepared(me.clone()) { debug!("PoolNoodle: readying instance"); match PoolNoodleInner::prepare(id, &me.spec).await { Ok(_) => { @@ -178,7 +177,7 @@ where debug!("PoolNoodle: starting clean handler..."); loop { - if let Some(id) = me.to_be_cleaned.pop() { + if let Some(id) = Self::pop_from_clean(me.clone()) { debug!("PoolNoodle: cleaning instance {}", id); match PoolNoodleInner::clean(id, &me.spec).await { Ok(_) => { @@ -202,7 +201,6 @@ where loop { if let Some(instance) = me.dropped.pop() { - debug!("{}", me.stats().await.to_string()); let id = instance.id(); debug!("PoolNoodle: dropping: {}", id); match PoolNoodleInner::terminate(instance, &me.spec).await { @@ -221,6 +219,27 @@ where } } + fn pop_from_clean(me: Arc>) -> Option { + me.to_be_cleaned.pop().map(|id| { + metric!(counter.pool_noodle.to_be_cleaned = -1); + Some(id) + })? + } + + fn pop_from_unprepared(me: Arc>) -> Option { + me.unprepared.pop().map(|id| { + metric!(counter.pool_noodle.unprepared = -1); + Some(id) + })? + } + + fn pop_from_ready(me: Arc>) -> Option { + me.ready.pop().map(|id| { + metric!(counter.pool_noodle.ready = -1); + Some(id) + })? + } + fn push_to_clean(me: Arc>, id: u32) { if let Err(e) = me.to_be_cleaned.push(id) { warn!( @@ -229,12 +248,14 @@ where ); warn!("{:?}", e); } + metric!(counter.pool_noodle.to_be_cleaned = 1); } fn push_to_ready(me: Arc>, instance: I) { if let Err(i) = me.ready.push(instance) { warn!("PoolNoodle: failed to push instance to ready: {}", i.id()); } + metric!(counter.pool_noodle.ready = 1); } fn push_to_unprepared(me: Arc>, id: u32) { @@ -242,6 +263,7 @@ where warn!("PoolNoodle: failed to push instance to unprepared: {}", id); warn!("{:?}", e); } + metric!(counter.pool_noodle.unprepared = 1); } /// This will attempt to get a ready, healthy instance from the pool. @@ -256,7 +278,7 @@ where if retries >= max_retries { return Err(PoolNoodleError::ExecutionPoolStarved); } - if let Some(mut instance) = me.ready.pop() { + if let Some(mut instance) = Self::pop_from_ready(me.clone()) { debug!("PoolNoodle: got instance: {}", instance.id()); // Try to ensure the item is healthy match &mut instance.ensure_healthy().await { @@ -265,8 +287,7 @@ where "PoolNoodle: got instance for func execution: {}", &instance.id() ); - me.active.fetch_add(1, Ordering::Relaxed); - debug!("{}", me.stats().await.to_string()); + metric!(counter.pool_noodle.active = 1); return Ok(LifeGuard { pool: me.clone(), item: Some(instance), @@ -317,7 +338,6 @@ where { pool_size: u32, spec: S, - active: AtomicUsize, dropped: ArrayQueue, ready: ArrayQueue, to_be_cleaned: ArrayQueue, @@ -362,7 +382,6 @@ where pub async fn stats(&self) -> PoolNoodleStats { PoolNoodleStats { pool_size: self.pool_size as usize, - active: self.active.load(Ordering::Relaxed), dropped: self.dropped.len(), ready: self.ready.len(), to_be_cleaned: self.to_be_cleaned.len(), @@ -376,11 +395,9 @@ where pub struct PoolNoodleStats { /// Total number of instances allowed in the pool pub pool_size: usize, - /// Total number of instances that have been fetched from the pool and not yet dropped - pub active: usize, - /// Total number of instances currently running and able to accept work - pub dropped: usize, /// Total number of instances dropped and awating to be cleaned + pub dropped: usize, + /// Total number of instances that have been fetched from the pool and not yet dropped pub ready: usize, /// Total number of instances that need to be cleaned up pub to_be_cleaned: usize, @@ -392,8 +409,8 @@ impl Display for PoolNoodleStats { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!( f, - "PoolNoodle Stats -- pool size: {}, active: {}, dropped: {}, ready: {}, to be cleaned: {}, unprepared: {}", - self.pool_size, self.active, self.dropped, self.ready, self.to_be_cleaned, self.unprepared + "PoolNoodle Stats -- pool size: {}, dropped: {}, ready: {}, to be cleaned: {}, unprepared: {}", + self.pool_size, self.dropped, self.ready, self.to_be_cleaned, self.unprepared ) } } @@ -423,15 +440,13 @@ where .expect("Item must be present as it is initialized with Some and never replaced."); debug!("PoolNoodle: dropping instance: {}", item.id()); - if self.pool.active.load(Ordering::Relaxed) > 0 { - self.pool.active.fetch_sub(1, Ordering::Relaxed); - } if let Err(i) = self.pool.dropped.push(item) { warn!( "PoolNoodle: failed to push instance to dropped: {}", &i.id() ); } + metric!(counter.pool_noodle.active = -1); debug!("PoolNoodle: instance pushed to dropped"); } } diff --git a/lib/veritech-server/src/server.rs b/lib/veritech-server/src/server.rs index 090056129c..ca0465afa6 100644 --- a/lib/veritech-server/src/server.rs +++ b/lib/veritech-server/src/server.rs @@ -430,15 +430,18 @@ async fn resolver_function_request( let cyclone_request = CycloneRequest::from_parts(request, sensitive_strings); - let mut client = cyclone_pool - .get() - .await - .map_err(|err| span.record_err(ServerError::CyclonePool(Box::new(err))))?; + let mut client = cyclone_pool.get().await.map_err(|err| { + metric!(counter.function_run.resolver = -1); + span.record_err(ServerError::CyclonePool(Box::new(err))) + })?; let mut progress = client .execute_resolver(cyclone_request) .await - .map_err(|err| span.record_err(err))? + .map_err(|err| { + metric!(counter.function_run.resolver = -1); + span.record_err(err) + })? .start() .await .map_err(|err| span.record_err(err))?; @@ -446,17 +449,17 @@ async fn resolver_function_request( while let Some(msg) = progress.next().await { match msg { Ok(ProgressMessage::OutputStream(output)) => { - publisher - .publish_output(&output) - .await - .map_err(|err| span.record_err(err))?; + publisher.publish_output(&output).await.map_err(|err| { + metric!(counter.function_run.resolver = -1); + span.record_err(err) + })? } Ok(ProgressMessage::Heartbeat) => { trace!("received heartbeat message"); - publisher - .publish_keep_alive() - .await - .map_err(|err| span.record_err(err))?; + publisher.publish_keep_alive().await.map_err(|err| { + metric!(counter.function_run.resolver = -1); + span.record_err(err) + })? } Err(err) => { warn!(error = ?err, "next progress message was an error, bailing out"); @@ -464,13 +467,13 @@ async fn resolver_function_request( } } } - metric!(counter.function_run.resolver = -1); - let function_result = progress - .finish() - .await - .map_err(|err| span.record_err(err))?; + let function_result = progress.finish().await.map_err(|err| { + metric!(counter.function_run.resolver = -1); + span.record_err(err) + })?; + metric!(counter.function_run.resolver = -1); span.record_ok(); Ok(function_result) } @@ -610,29 +613,38 @@ async fn validation_request( let reply_mailbox = request .reply .ok_or(ServerError::NoReplyMailboxFound) - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })?; let publisher = Publisher::new(&nats, &reply_mailbox); - let mut client = cyclone_pool - .get() - .await - .map_err(|err| span.record_err(ServerError::CyclonePool(Box::new(err))))?; + let mut client = cyclone_pool.get().await.map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(ServerError::CyclonePool(Box::new(err))) + })?; let mut progress = client .execute_validation(cyclone_request) .await - .map_err(|err| span.record_err(err))? + .map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })? .start() .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })?; while let Some(msg) = progress.next().await { match msg { Ok(ProgressMessage::OutputStream(output)) => { - publisher - .publish_output(&output) - .await - .map_err(|err| span.record_err(err))?; + publisher.publish_output(&output).await.map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })?; } Ok(ProgressMessage::Heartbeat) => { trace!("received heartbeat message"); @@ -643,21 +655,24 @@ async fn validation_request( } } } - metric!(counter.function_run.validation = -1); - publisher - .finalize_output() - .await - .map_err(|err| span.record_err(err))?; - - let function_result = progress - .finish() - .await - .map_err(|err| span.record_err(err))?; + publisher.finalize_output().await.map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })?; + + let function_result = progress.finish().await.map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })?; publisher .publish_result(&function_result) .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.validation = -1); + span.record_err(err) + })?; + metric!(counter.function_run.validation = -1); span.record_ok(); Ok(()) } @@ -798,29 +813,38 @@ async fn schema_variant_definition_request( let reply_mailbox = request .reply .ok_or(ServerError::NoReplyMailboxFound) - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })?; let publisher = Publisher::new(&nats, &reply_mailbox); - let mut client = cyclone_pool - .get() - .await - .map_err(|err| span.record_err(ServerError::CyclonePool(Box::new(err))))?; + let mut client = cyclone_pool.get().await.map_err(|err| { + metric!(counter.function_run.schema_variant_definition = 1); + span.record_err(ServerError::CyclonePool(Box::new(err))) + })?; let mut progress = client .execute_schema_variant_definition(cyclone_request) .await - .map_err(|err| span.record_err(err))? + .map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })? .start() .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })?; while let Some(msg) = progress.next().await { match msg { Ok(ProgressMessage::OutputStream(output)) => { - publisher - .publish_output(&output) - .await - .map_err(|err| span.record_err(err))?; + publisher.publish_output(&output).await.map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })?; } Ok(ProgressMessage::Heartbeat) => { trace!("received heartbeat message"); @@ -831,21 +855,24 @@ async fn schema_variant_definition_request( } } } - metric!(counter.function_run.schema_variant_definition = -1); - publisher - .finalize_output() - .await - .map_err(|err| span.record_err(err))?; - - let function_result = progress - .finish() - .await - .map_err(|err| span.record_err(err))?; + publisher.finalize_output().await.map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })?; + + let function_result = progress.finish().await.map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })?; publisher .publish_result(&function_result) .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.schema_variant_definition = -1); + span.record_err(err) + })?; + metric!(counter.function_run.schema_variant_definition = -1); span.record_ok(); Ok(()) } @@ -985,29 +1012,38 @@ async fn action_run_request( let reply_mailbox = request .reply .ok_or(ServerError::NoReplyMailboxFound) - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })?; let publisher = Publisher::new(&nats, &reply_mailbox); - let mut client = cyclone_pool - .get() - .await - .map_err(|err| span.record_err(ServerError::CyclonePool(Box::new(err))))?; + let mut client = cyclone_pool.get().await.map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(ServerError::CyclonePool(Box::new(err))) + })?; let mut progress = client .execute_action_run(cyclone_request) .await - .map_err(|err| span.record_err(err))? + .map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })? .start() .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })?; while let Some(msg) = progress.next().await { match msg { Ok(ProgressMessage::OutputStream(output)) => { - publisher - .publish_output(&output) - .await - .map_err(|err| span.record_err(err))?; + publisher.publish_output(&output).await.map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })?; } Ok(ProgressMessage::Heartbeat) => { trace!("received heartbeat message"); @@ -1018,21 +1054,24 @@ async fn action_run_request( } } } - metric!(counter.function_run.action = -1); - publisher - .finalize_output() - .await - .map_err(|err| span.record_err(err))?; - - let function_result = progress - .finish() - .await - .map_err(|err| span.record_err(err))?; + publisher.finalize_output().await.map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })?; + + let function_result = progress.finish().await.map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })?; publisher .publish_result(&function_result) .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.action = -1); + span.record_err(err) + })?; + metric!(counter.function_run.action = -1); span.record_ok(); Ok(()) } @@ -1172,30 +1211,39 @@ async fn reconciliation_request( let reply_mailbox = request .reply .ok_or(ServerError::NoReplyMailboxFound) - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })?; let publisher = Publisher::new(&nats, &reply_mailbox); - let mut client = cyclone_pool - .get() - .await - .map_err(|err| span.record_err(ServerError::CyclonePool(Box::new(err))))?; + let mut client = cyclone_pool.get().await.map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(ServerError::CyclonePool(Box::new(err))) + })?; let mut progress = client .execute_reconciliation(cyclone_request) .await - .map_err(|err| span.record_err(err))? + .map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })? .start() .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })?; while let Some(msg) = progress.next().await { match msg { Ok(ProgressMessage::OutputStream(output)) => { - publisher - .publish_output(&output) - .await - .map_err(|err| span.record_err(err))?; + publisher.publish_output(&output).await.map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })? } Ok(ProgressMessage::Heartbeat) => { trace!("received heartbeat message"); @@ -1206,21 +1254,24 @@ async fn reconciliation_request( } } } - metric!(counter.function_run.reconciliation = -1); - publisher - .finalize_output() - .await - .map_err(|err| span.record_err(err))?; - - let function_result = progress - .finish() - .await - .map_err(|err| span.record_err(err))?; + publisher.finalize_output().await.map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })?; + + let function_result = progress.finish().await.map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })?; publisher .publish_result(&function_result) .await - .map_err(|err| span.record_err(err))?; + .map_err(|err| { + metric!(counter.function_run.reconciliation = -1); + span.record_err(err) + })?; + metric!(counter.function_run.reconciliation = -1); span.record_ok(); Ok(()) }