Skip to content

Commit

Permalink
Init tracing (#28)
Browse files Browse the repository at this point in the history
* Init tracing

* Document ReqId::new

* Update kanin/Cargo.toml

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/app/task.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/app/task.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/extract/req_id.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/extract/req_id.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/extract/req_id.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/extract/req_id.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/extract/req_id.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/tests/send_recv.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Add req_id to Request

* Move 'response' log

* Update kanin/src/request.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Update kanin/src/request.rs

Co-authored-by: Victor Nordam Suadicani <[email protected]>

* Remove instrumentation + From<_> --> from_delivery

* Better log messages + elapsed time for req

* Update versions

---------

Co-authored-by: Victor Nordam Suadicani <[email protected]>
  • Loading branch information
gorm-issuu and Victor-N-Suadicani authored Aug 25, 2023
1 parent 68c5977 commit 1a39461
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 44 deletions.
11 changes: 7 additions & 4 deletions kanin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kanin"
version = "0.23.1"
version = "0.24.0"
edition = "2021"
authors = ["Victor Nordam Suadicani <[email protected]>"]
description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)."
Expand All @@ -13,13 +13,16 @@ readme = "../README.md"

[dependencies]
# Derive macros for traits in kanin.
kanin_derive = "0.5.0"
kanin_derive = "0.5.2"

# Lower level AMQP framework.
lapin = "2.1.0"

# Generalized logging framework.
log = "0.4"
# Generalized tracing framework.
tracing = "0.1.37"

# Used to create unique request IDs.
uuid = { version = "1.4.1", features = ["v4"] }

# Asynchronous runtime.
tokio = { version = "1.18.0", features = ["rt", "rt-multi-thread", "macros"] }
Expand Down
11 changes: 8 additions & 3 deletions kanin/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{any::Any, sync::Arc};
use anymap::Map;
use futures::future::{select_all, SelectAll};
use lapin::{self, Connection, ConnectionProperties};
use log::{debug, error, info, trace};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace};

use self::task::TaskFactory;
use crate::{extract::State, Error, Handler, HandlerConfig, Respond, Result};
Expand Down Expand Up @@ -111,7 +111,9 @@ impl App {
#[inline]
pub async fn run(self, amqp_addr: &str) -> Result<()> {
debug!("Connecting to AMQP on address: {amqp_addr:?} ...");
let conn = Connection::connect(amqp_addr, ConnectionProperties::default()).await?;
let conn = Connection::connect(amqp_addr, ConnectionProperties::default())
.await
.map_err(Error::Lapin)?;
trace!("Connected to AMQP on address: {amqp_addr:?}");
self.run_with_connection(&conn).await
}
Expand Down Expand Up @@ -177,7 +179,10 @@ impl App {
);

// Construct the task from the factory. This produces a pinned future which we can then spawn.
let task = task_factory.build(conn, state.clone()).await?;
let task = task_factory
.build(conn, state.clone())
.await
.map_err(Error::Lapin)?;

// Spawn the task and save the join handle.
join_handles.push(tokio::spawn(task));
Expand Down
50 changes: 32 additions & 18 deletions kanin/src/app/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use lapin::{
types::FieldTable,
BasicProperties, Channel, Connection, Consumer,
};
use log::{debug, error, trace, warn};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

use crate::{Handler, HandlerConfig, Request, Respond};

Expand Down Expand Up @@ -47,7 +47,7 @@ where
let delivery = tokio::select! {
// Listen on new deliveries.
delivery = consumer.next() => match delivery {
// Received a delivery succesfully, just unwrap it from the option.
// Received a delivery successfully, just unwrap it from the option.
Some(delivery) => delivery,
// We should only ever get to this point if the consumer is cancelled.
// We'll just return the routing key - might be a help for the user to see which
Expand Down Expand Up @@ -79,7 +79,11 @@ where
// Requests are handled and replied to concurrently.
// This allows each handler task to process multiple requests at once.
tasks.push(tokio::spawn(async move {
handle_request(req, handler, channel, should_reply).await;
let span = info_span!("request", req_id = %req.req_id);

handle_request(req, handler, channel, should_reply)
.instrument(span)
.await;
}));
}
})
Expand All @@ -100,15 +104,23 @@ async fn handle_request<H, Args, Res>(
let properties = req.properties().cloned();
let reply_to = properties.as_ref().and_then(|p| p.reply_to().clone());
let correlation_id = properties.as_ref().and_then(|p| p.correlation_id().clone());
let app_id = req.app_id().unwrap_or("<unknown>");

let handler_name = std::any::type_name::<H>();
info!("Received request on handler {handler_name:?} from {app_id}");

let t = std::time::Instant::now();

// Call the handler with the request.
let response = handler.call(&mut req).await;
debug!(
"Handler {:?} produced response: {response:?}",
std::any::type_name::<H>()
);

debug!("Handler {handler_name:?} produced response {response:?}");

let bytes_response = response.respond();

// Includes time for decoding request and encoding response, but *not* the time to publish the response.
let elapsed = t.elapsed();

match (should_reply, reply_to) {
// We're supposed to reply and we have a reply_to queue: Reply.
(true, Some(reply_to)) => {
Expand All @@ -121,12 +133,17 @@ async fn handle_request<H, Args, Res>(
.map(|p| format!("{p:?}"))
.unwrap_or_else(|| "<None>".into());

warn!("Request from handler {:?} did not contain a `correlation_id` property. A reply will be published, but the receiver may not recognize it as the reply for their request. (all properties: {req_props})", std::any::type_name::<H>());
warn!("Request from handler {handler_name:?} did not contain a `correlation_id` property. A reply will be published, but the receiver may not recognize it as the reply for their request. (all properties: {req_props})");
}

// Warn in case of replying with an empty message, since this is _probably_ wrong or unintended.
if bytes_response.is_empty() {
warn!("Handler {:?} produced an empty response to a message with a `reply_to` property. This is probably undesired, as the caller likely expects more of a response.", std::any::type_name::<H>());
warn!("Handler {handler_name:?} produced an empty response to a message with a `reply_to` property. This is probably undesired, as the caller likely expects more of a response (elapsed={elapsed:?})");
} else {
info!(
"Response with {} bytes that will be published to {reply_to} (elapsed={elapsed:?})",
bytes_response.len()
);
}

let publish = channel
Expand Down Expand Up @@ -159,21 +176,18 @@ async fn handle_request<H, Args, Res>(
.map(|p| format!("{p:?}"))
.unwrap_or_else(|| "<None>".into());

warn!("Received non-empty message from handler {handler:?} but the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props}).");
warn!("Received non-empty message from handler {handler:?} but the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props}, elapsed={elapsed:?}).");
}
// We are supposed to reply, but the request did not have a reply_to.
// However we produced an empty response, so it's not like the caller missed any information.
// In this case, we just debug log and leave it as is. This was probably intentional.
(true, None) => {
let handler = std::any::type_name::<H>();
let req_props = properties
.map(|p| format!("{p:?}"))
.unwrap_or_else(|| "<None>".into());

debug!("Received empty message from handler {handler:?} which has should_reply = true; however the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props}). This is probably not an issue since the caller did not miss any information.");
info!("Handler finished (empty, should_reply = true, elapsed={elapsed:?})");
}
// We are not supposed to reply so we won't.
(false, _) => (),
(false, _) => {
let len = bytes_response.len();
info!("Handler finished ({len} bytes, should_reply = false, elapsed={elapsed:?}).");
}
};

match req.delivery.map(|d| d.acker) {
Expand Down
12 changes: 6 additions & 6 deletions kanin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use std::convert::Infallible;

use log::{error, warn};
use prost::DecodeError;
use thiserror::Error as ThisError;
use tracing::{error, warn};

/// Errors that may be returned by `kanin`, especially when the app runs.
#[derive(Debug, ThisError)]
Expand All @@ -14,19 +14,19 @@ pub enum Error {
NoHandlers,
/// An error from an underlying lapin call.
#[error("An underlying `lapin` call failed: {0}")]
Lapin(#[from] lapin::Error),
Lapin(lapin::Error),
}

/// Errors that may be produced by handlers. Failing extractors provided by `kanin` return this error.
#[derive(Debug, ThisError)]
pub enum HandlerError {
/// Errors due to invalid requests.
#[error("Invalid Request: {0:#}")]
InvalidRequest(#[from] RequestError),
InvalidRequest(RequestError),

/// Internal server errors that are not the fault of clients.
#[error("Internal Server Error: {0:#}")]
Internal(#[from] ServerError),
Internal(ServerError),
}

impl HandlerError {
Expand All @@ -42,7 +42,7 @@ pub enum RequestError {
///
/// This error is left as an opaque error as that is what is provided by [`prost`].
#[error("Message could not be decoded into the required type: {0:#}")]
DecodeError(#[from] DecodeError),
DecodeError(DecodeError),
}

/// Errors due to bad configuration or usage from the server-side.
Expand Down Expand Up @@ -91,7 +91,7 @@ where

impl From<DecodeError> for HandlerError {
fn from(e: DecodeError) -> Self {
RequestError::from(e).into()
HandlerError::InvalidRequest(RequestError::DecodeError(e))
}
}

Expand Down
6 changes: 4 additions & 2 deletions kanin/src/extract.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
//! Interface for types that can extract themselves from requests.

mod message;
mod req_id;
mod state;

use std::convert::Infallible;
use std::{convert::Infallible, error::Error};

use async_trait::async_trait;
use lapin::{acker::Acker, message::Delivery, Channel};

use crate::{error::HandlerError, Request};

pub use message::Msg;
pub use req_id::ReqId;
pub use state::State;

/// A trait for types that can be extracted from [requests](`Request`).
Expand All @@ -20,7 +22,7 @@ pub use state::State;
#[async_trait]
pub trait Extract: Sized {
/// The error to return in case extraction fails.
type Error;
type Error: Error;

/// Extract the type from the request.
async fn extract(req: &mut Request) -> Result<Self, Self::Error>;
Expand Down
78 changes: 78 additions & 0 deletions kanin/src/extract/req_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! Request IDs.

use core::fmt;
use std::convert::Infallible;

use async_trait::async_trait;
use lapin::{
message::Delivery,
types::{AMQPValue, LongString},
};
use uuid::Uuid;

use crate::{Extract, Request};

/// Request IDs allow concurrent logs to be associated with a unique request. It can also enable requests
/// to be traced between different services by propagating the request IDs when calling other services.
/// This type implements [`Extract`], so it can be used in handlers.
#[derive(Debug, Clone)]
pub struct ReqId(AMQPValue);

impl ReqId {
/// Create a new [`ReqId`] as a random UUID.
fn new() -> Self {
let uuid = Uuid::new_v4();
let amqp_value = AMQPValue::LongString(LongString::from(uuid.to_string()));
Self(amqp_value)
}

/// Create a [`ReqId`] from an AMQP Delivery. If no `req_id` is found in the headers of the
/// message then a new one is created.
pub(crate) fn from_delivery(delivery: &Delivery) -> Self {
let Some(headers) = delivery.properties.headers() else {
return Self::new();
};

let Some(req_id) = headers.inner().get("req_id") else {
return Self::new();
};

Self(req_id.clone())
}
}

/// [`AMQPValue`] does not implement `Display` but we provide a `Display` implementation for
/// `ReqId` to allow it to be used in tracing spans (see the `tracing` crate).
impl fmt::Display for ReqId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
AMQPValue::LongString(req_id) => req_id.fmt(f),
AMQPValue::Boolean(b) => b.fmt(f),
AMQPValue::ShortShortInt(v) => v.fmt(f),
AMQPValue::ShortShortUInt(v) => v.fmt(f),
AMQPValue::ShortInt(v) => v.fmt(f),
AMQPValue::ShortUInt(v) => v.fmt(f),
AMQPValue::LongInt(v) => v.fmt(f),
AMQPValue::LongUInt(v) => v.fmt(f),
AMQPValue::LongLongInt(v) => v.fmt(f),
AMQPValue::Float(v) => v.fmt(f),
AMQPValue::Double(v) => v.fmt(f),
AMQPValue::DecimalValue(v) => write!(f, "{v:?}"),
AMQPValue::ShortString(v) => write!(f, "{v:?}"),
AMQPValue::FieldArray(v) => write!(f, "{v:?}"),
AMQPValue::Timestamp(v) => write!(f, "{v:?}"),
AMQPValue::FieldTable(v) => write!(f, "{v:?}"),
AMQPValue::ByteArray(v) => write!(f, "{v:?}"),
AMQPValue::Void => write!(f, "Void"),
}
}
}

#[async_trait]
impl Extract for ReqId {
type Error = Infallible;

async fn extract(req: &mut Request) -> Result<Self, Self::Error> {
Ok(req.req_id.clone())
}
}
2 changes: 1 addition & 1 deletion kanin/src/extract/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use derive_more::{Deref, DerefMut};
use log::error;
use tracing::error;

use crate::{error::ServerError, Extract, HandlerError, Request};

Expand Down
5 changes: 4 additions & 1 deletion kanin/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ macro_rules! impl_handler {
$(
let $ty = match $ty::extract(req).await {
Ok(value) => value,
Err(error) => return Res::from_error(error),
Err(error) => {
tracing::error!("{error}");
return Res::from_error(error);
}
};
)*

Expand Down
2 changes: 1 addition & 1 deletion kanin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ mod tests {
use std::time::Duration;

use lapin::{Connection, ConnectionProperties};
use log::warn;
use tracing::warn;

const TEST_AMQP_ADDR: &str = "amqp://localhost";

Expand Down
12 changes: 12 additions & 0 deletions kanin/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use lapin::protocol::basic::AMQPProperties;
use lapin::{message::Delivery, Channel};

use crate::app::StateMap;
use crate::extract::ReqId;

/// An AMQP request.
#[derive(Debug)]
Expand All @@ -15,6 +16,9 @@ pub struct Request {
state: Arc<StateMap>,
/// The channel the message was received on.
channel: Channel,
/// Request ID. This is a unique ID for every request. Either a newly created UUID or whatever
/// is found in the `req_id` header of the incoming AMQP message.
pub(crate) req_id: ReqId,
/// The message delivery.
pub(crate) delivery: Option<Delivery>,
}
Expand All @@ -25,6 +29,7 @@ impl Request {
Self {
state,
channel,
req_id: ReqId::from_delivery(&delivery),
delivery: Some(delivery),
}
}
Expand All @@ -44,4 +49,11 @@ impl Request {
pub fn properties(&self) -> Option<&AMQPProperties> {
self.delivery.as_ref().map(|d| &d.properties)
}

/// Returns the `app_id` AMQP property of the request.
pub fn app_id(&self) -> Option<&str> {
self.properties()
.and_then(|p| p.app_id().as_ref())
.map(|app_id| app_id.as_str())
}
}
Loading

0 comments on commit 1a39461

Please sign in to comment.