diff --git a/kanin/Cargo.toml b/kanin/Cargo.toml index 4c2edaa..417a4cc 100644 --- a/kanin/Cargo.toml +++ b/kanin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kanin" -version = "0.29.0" +version = "0.30.0" edition = "2021" authors = ["Victor Nordam Suadicani "] description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)." @@ -43,9 +43,12 @@ derive_more = "0.99.17" # Great for structured errors. thiserror = "1.0.30" +# For exposing metrics about the internal state of kanin. +metrics = "0.22.1" + [dev-dependencies] # Concrete logging implementation. -env_logger = "0.10.0" +env_logger = "0.11.3" # Asynchronous runtime. tokio = { version = "1.18.0", features = [ diff --git a/kanin/src/app.rs b/kanin/src/app.rs index c086fb8..3531e6c 100644 --- a/kanin/src/app.rs +++ b/kanin/src/app.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use futures::future::{select_all, try_join_all, SelectAll}; use lapin::{self, Connection, ConnectionProperties}; +use metrics::describe_gauge; use tokio::task::JoinHandle; use tracing::{debug, error, info, trace}; @@ -113,6 +114,9 @@ impl App { /// On connection errors, the app will simply panic. #[inline] pub async fn run_with_connection(self, conn: &Connection) -> Result<()> { + // Describe metrics (just need to do it somewhere once as we run the app). + describe_gauge!("kanin.prefetch_capacity", "A gauge that measures how much prefetch is available on a certain queue, based on the prefetch of its consumers."); + let handles = self.setup_handlers(conn).await?; let (returning_handler, _remaining_handlers_count, _leftover_handlers) = handles.await; diff --git a/kanin/src/app/task.rs b/kanin/src/app/task.rs index 04351d5..dbd133d 100644 --- a/kanin/src/app/task.rs +++ b/kanin/src/app/task.rs @@ -9,6 +9,7 @@ use lapin::{ types::{FieldTable, ShortString}, BasicProperties, Channel, Connection, Consumer, }; +use metrics::gauge; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use crate::{Handler, HandlerConfig, Request, Respond}; @@ -276,6 +277,15 @@ impl TaskFactory { // If no queue was specified, we just use the routing key. let queue_name = self.config.queue.as_deref().unwrap_or(&self.routing_key); + // Set prefetch capacity gauge according to the prefetch. + // This allows one to construct a metric that informs how close a queue is to capacity. + // I.e. if there are 3 servers with prefetch 8 on a queue, the queue's capacity is 24. + // By comparing this number to the number of unacked messages in the AMQP message broker (like the rabbitmq_queue_messages_unacked metric from RabbitMQ), + // you can estimate how close to capacity the queue is. + let prefetch_f64: f64 = self.config.prefetch.into(); + gauge!("kanin.prefetch_capacity", "queue" => queue_name.to_string()) + .increment(prefetch_f64); + // Declare and bind the queue. AMQP states that we must do this before creating the consumer. trace!("Declaring queue {queue_name:?} prior to binding..."); channel