Skip to content

Commit

Permalink
Merge pull request #4285 from systeminit/metrics_more
Browse files Browse the repository at this point in the history
feat: add metrics for pool_noodle, handle errors in veritech
  • Loading branch information
sprutton1 authored Aug 6, 2024
2 parents e487fbc + 1acc726 commit 70ba40a
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 126 deletions.
2 changes: 2 additions & 0 deletions dev/datasources/datasources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ datasources:
type: prometheus
access: proxy
url: http://prometheus:9090
jsonData:
timeInterval: 100ms
2 changes: 1 addition & 1 deletion dev/prometheus-config/config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
global:
scrape_interval: 1s
scrape_interval: 100ms
scrape_configs:
- job_name: metrics
static_configs:
Expand Down
53 changes: 34 additions & 19 deletions lib/si-pool-noodle/src/pool_noodle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::fmt;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use telemetry_utils::metric;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::info;
Expand Down Expand Up @@ -86,7 +86,6 @@ where
let pool = PoolNoodle(Arc::new(PoolNoodleInner {
pool_size,
spec,
active: AtomicUsize::new(0),
dropped: ArrayQueue::new(pool_size as usize),
ready: ArrayQueue::new(pool_size as usize),
to_be_cleaned,
Expand Down Expand Up @@ -145,7 +144,7 @@ where

while !stop.load(Ordering::Relaxed) {
// let's make more instances!
if let Some(id) = me.unprepared.pop() {
if let Some(id) = Self::pop_from_unprepared(me.clone()) {
debug!("PoolNoodle: readying instance");
match PoolNoodleInner::prepare(id, &me.spec).await {
Ok(_) => {
Expand Down Expand Up @@ -178,7 +177,7 @@ where
debug!("PoolNoodle: starting clean handler...");

loop {
if let Some(id) = me.to_be_cleaned.pop() {
if let Some(id) = Self::pop_from_clean(me.clone()) {
debug!("PoolNoodle: cleaning instance {}", id);
match PoolNoodleInner::clean(id, &me.spec).await {
Ok(_) => {
Expand All @@ -202,7 +201,6 @@ where

loop {
if let Some(instance) = me.dropped.pop() {
debug!("{}", me.stats().await.to_string());
let id = instance.id();
debug!("PoolNoodle: dropping: {}", id);
match PoolNoodleInner::terminate(instance, &me.spec).await {
Expand All @@ -221,6 +219,27 @@ where
}
}

fn pop_from_clean(me: Arc<PoolNoodleInner<I, S>>) -> Option<u32> {
me.to_be_cleaned.pop().map(|id| {
metric!(counter.pool_noodle.to_be_cleaned = -1);
Some(id)
})?
}

fn pop_from_unprepared(me: Arc<PoolNoodleInner<I, S>>) -> Option<u32> {
me.unprepared.pop().map(|id| {
metric!(counter.pool_noodle.unprepared = -1);
Some(id)
})?
}

fn pop_from_ready(me: Arc<PoolNoodleInner<I, S>>) -> Option<I> {
me.ready.pop().map(|id| {
metric!(counter.pool_noodle.ready = -1);
Some(id)
})?
}

fn push_to_clean(me: Arc<PoolNoodleInner<I, S>>, id: u32) {
if let Err(e) = me.to_be_cleaned.push(id) {
warn!(
Expand All @@ -229,19 +248,22 @@ where
);
warn!("{:?}", e);
}
metric!(counter.pool_noodle.to_be_cleaned = 1);
}

fn push_to_ready(me: Arc<PoolNoodleInner<I, S>>, instance: I) {
if let Err(i) = me.ready.push(instance) {
warn!("PoolNoodle: failed to push instance to ready: {}", i.id());
}
metric!(counter.pool_noodle.ready = 1);
}

fn push_to_unprepared(me: Arc<PoolNoodleInner<I, S>>, id: u32) {
if let Err(e) = me.unprepared.push(id) {
warn!("PoolNoodle: failed to push instance to unprepared: {}", id);
warn!("{:?}", e);
}
metric!(counter.pool_noodle.unprepared = 1);
}

/// This will attempt to get a ready, healthy instance from the pool.
Expand All @@ -256,7 +278,7 @@ where
if retries >= max_retries {
return Err(PoolNoodleError::ExecutionPoolStarved);
}
if let Some(mut instance) = me.ready.pop() {
if let Some(mut instance) = Self::pop_from_ready(me.clone()) {
debug!("PoolNoodle: got instance: {}", instance.id());
// Try to ensure the item is healthy
match &mut instance.ensure_healthy().await {
Expand All @@ -265,8 +287,7 @@ where
"PoolNoodle: got instance for func execution: {}",
&instance.id()
);
me.active.fetch_add(1, Ordering::Relaxed);
debug!("{}", me.stats().await.to_string());
metric!(counter.pool_noodle.active = 1);
return Ok(LifeGuard {
pool: me.clone(),
item: Some(instance),
Expand Down Expand Up @@ -317,7 +338,6 @@ where
{
pool_size: u32,
spec: S,
active: AtomicUsize,
dropped: ArrayQueue<I>,
ready: ArrayQueue<I>,
to_be_cleaned: ArrayQueue<u32>,
Expand Down Expand Up @@ -362,7 +382,6 @@ where
pub async fn stats(&self) -> PoolNoodleStats {
PoolNoodleStats {
pool_size: self.pool_size as usize,
active: self.active.load(Ordering::Relaxed),
dropped: self.dropped.len(),
ready: self.ready.len(),
to_be_cleaned: self.to_be_cleaned.len(),
Expand All @@ -376,11 +395,9 @@ where
pub struct PoolNoodleStats {
/// Total number of instances allowed in the pool
pub pool_size: usize,
/// Total number of instances that have been fetched from the pool and not yet dropped
pub active: usize,
/// Total number of instances currently running and able to accept work
pub dropped: usize,
/// Total number of instances dropped and awating to be cleaned
pub dropped: usize,
/// Total number of instances that have been fetched from the pool and not yet dropped
pub ready: usize,
/// Total number of instances that need to be cleaned up
pub to_be_cleaned: usize,
Expand All @@ -392,8 +409,8 @@ impl Display for PoolNoodleStats {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"PoolNoodle Stats -- pool size: {}, active: {}, dropped: {}, ready: {}, to be cleaned: {}, unprepared: {}",
self.pool_size, self.active, self.dropped, self.ready, self.to_be_cleaned, self.unprepared
"PoolNoodle Stats -- pool size: {}, dropped: {}, ready: {}, to be cleaned: {}, unprepared: {}",
self.pool_size, self.dropped, self.ready, self.to_be_cleaned, self.unprepared
)
}
}
Expand Down Expand Up @@ -423,15 +440,13 @@ where
.expect("Item must be present as it is initialized with Some and never replaced.");
debug!("PoolNoodle: dropping instance: {}", item.id());

if self.pool.active.load(Ordering::Relaxed) > 0 {
self.pool.active.fetch_sub(1, Ordering::Relaxed);
}
if let Err(i) = self.pool.dropped.push(item) {
warn!(
"PoolNoodle: failed to push instance to dropped: {}",
&i.id()
);
}
metric!(counter.pool_noodle.active = -1);
debug!("PoolNoodle: instance pushed to dropped");
}
}
Expand Down
Loading

0 comments on commit 70ba40a

Please sign in to comment.