Skip to content

Commit

Permalink
Prefetch metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Nordam Suadicani committed Mar 6, 2024
1 parent 66f3f54 commit a66f0a7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
7 changes: 5 additions & 2 deletions kanin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kanin"
version = "0.29.0"
version = "0.30.0"
edition = "2021"
authors = ["Victor Nordam Suadicani <[email protected]>"]
description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)."
Expand Down Expand Up @@ -43,9 +43,12 @@ derive_more = "0.99.17"
# Great for structured errors.
thiserror = "1.0.30"

# For exposing metrics about the internal state of kanin.
metrics = "0.22.1"

[dev-dependencies]
# Concrete logging implementation.
env_logger = "0.10.0"
env_logger = "0.11.3"

# Asynchronous runtime.
tokio = { version = "1.18.0", features = [
Expand Down
4 changes: 4 additions & 0 deletions kanin/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;

use futures::future::{select_all, try_join_all, SelectAll};
use lapin::{self, Connection, ConnectionProperties};
use metrics::describe_gauge;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace};

Expand Down Expand Up @@ -113,6 +114,9 @@ impl<S> App<S> {
/// On connection errors, the app will simply panic.
#[inline]
pub async fn run_with_connection(self, conn: &Connection) -> Result<()> {
// Describe metrics (just need to do it somewhere once as we run the app).
describe_gauge!("kanin.prefetch_capacity", "A gauge that measures how much prefetch is available on a certain queue, based on the prefetch of its consumers.");

let handles = self.setup_handlers(conn).await?;
let (returning_handler, _remaining_handlers_count, _leftover_handlers) = handles.await;

Expand Down
10 changes: 10 additions & 0 deletions kanin/src/app/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use lapin::{
types::{FieldTable, ShortString},
BasicProperties, Channel, Connection, Consumer,
};
use metrics::gauge;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

use crate::{Handler, HandlerConfig, Request, Respond};
Expand Down Expand Up @@ -276,6 +277,15 @@ impl<S> TaskFactory<S> {
// If no queue was specified, we just use the routing key.
let queue_name = self.config.queue.as_deref().unwrap_or(&self.routing_key);

// Set prefetch capacity gauge according to the prefetch.
// This allows one to construct a metric that informs how close a queue is to capacity.
// I.e. if there are 3 servers with prefetch 8 on a queue, the queue's capacity is 24.
// By comparing this number to the number of unacked messages in the AMQP message broker (like the rabbitmq_queue_messages_unacked metric from RabbitMQ),
// you can estimate how close to capacity the queue is.
let prefetch_f64: f64 = self.config.prefetch.into();
gauge!("kanin.prefetch_capacity", "queue" => queue_name.to_string())
.increment(prefetch_f64);

// Declare and bind the queue. AMQP states that we must do this before creating the consumer.
trace!("Declaring queue {queue_name:?} prior to binding...");
channel
Expand Down

0 comments on commit a66f0a7

Please sign in to comment.