diff --git a/kanin/Cargo.toml b/kanin/Cargo.toml index 2fbca9d..0faaf50 100644 --- a/kanin/Cargo.toml +++ b/kanin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kanin" -version = "0.23.1" +version = "0.24.0" edition = "2021" authors = ["Victor Nordam Suadicani "] description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)." @@ -13,13 +13,16 @@ readme = "../README.md" [dependencies] # Derive macros for traits in kanin. -kanin_derive = "0.5.0" +kanin_derive = "0.5.2" # Lower level AMQP framework. lapin = "2.1.0" -# Generalized logging framework. -log = "0.4" +# Generalized tracing framework. +tracing = "0.1.37" + +# Used to create unique request IDs. +uuid = { version = "1.4.1", features = ["v4"] } # Asynchronous runtime. tokio = { version = "1.18.0", features = ["rt", "rt-multi-thread", "macros"] } diff --git a/kanin/src/app.rs b/kanin/src/app.rs index df6ffc3..c93b348 100644 --- a/kanin/src/app.rs +++ b/kanin/src/app.rs @@ -7,8 +7,8 @@ use std::{any::Any, sync::Arc}; use anymap::Map; use futures::future::{select_all, SelectAll}; use lapin::{self, Connection, ConnectionProperties}; -use log::{debug, error, info, trace}; use tokio::task::JoinHandle; +use tracing::{debug, error, info, trace}; use self::task::TaskFactory; use crate::{extract::State, Error, Handler, HandlerConfig, Respond, Result}; @@ -111,7 +111,9 @@ impl App { #[inline] pub async fn run(self, amqp_addr: &str) -> Result<()> { debug!("Connecting to AMQP on address: {amqp_addr:?} ..."); - let conn = Connection::connect(amqp_addr, ConnectionProperties::default()).await?; + let conn = Connection::connect(amqp_addr, ConnectionProperties::default()) + .await + .map_err(Error::Lapin)?; trace!("Connected to AMQP on address: {amqp_addr:?}"); self.run_with_connection(&conn).await } @@ -177,7 +179,10 @@ impl App { ); // Construct the task from the factory. This produces a pinned future which we can then spawn. - let task = task_factory.build(conn, state.clone()).await?; + let task = task_factory + .build(conn, state.clone()) + .await + .map_err(Error::Lapin)?; // Spawn the task and save the join handle. join_handles.push(tokio::spawn(task)); diff --git a/kanin/src/app/task.rs b/kanin/src/app/task.rs index da77e82..0f0aeb4 100644 --- a/kanin/src/app/task.rs +++ b/kanin/src/app/task.rs @@ -9,7 +9,7 @@ use lapin::{ types::FieldTable, BasicProperties, Channel, Connection, Consumer, }; -use log::{debug, error, trace, warn}; +use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use crate::{Handler, HandlerConfig, Request, Respond}; @@ -47,7 +47,7 @@ where let delivery = tokio::select! { // Listen on new deliveries. delivery = consumer.next() => match delivery { - // Received a delivery succesfully, just unwrap it from the option. + // Received a delivery successfully, just unwrap it from the option. Some(delivery) => delivery, // We should only ever get to this point if the consumer is cancelled. // We'll just return the routing key - might be a help for the user to see which @@ -79,7 +79,11 @@ where // Requests are handled and replied to concurrently. // This allows each handler task to process multiple requests at once. tasks.push(tokio::spawn(async move { - handle_request(req, handler, channel, should_reply).await; + let span = info_span!("request", req_id = %req.req_id); + + handle_request(req, handler, channel, should_reply) + .instrument(span) + .await; })); } }) @@ -100,15 +104,23 @@ async fn handle_request( let properties = req.properties().cloned(); let reply_to = properties.as_ref().and_then(|p| p.reply_to().clone()); let correlation_id = properties.as_ref().and_then(|p| p.correlation_id().clone()); + let app_id = req.app_id().unwrap_or(""); + + let handler_name = std::any::type_name::(); + info!("Received request on handler {handler_name:?} from {app_id}"); + + let t = std::time::Instant::now(); + // Call the handler with the request. let response = handler.call(&mut req).await; - debug!( - "Handler {:?} produced response: {response:?}", - std::any::type_name::() - ); + + debug!("Handler {handler_name:?} produced response {response:?}"); let bytes_response = response.respond(); + // Includes time for decoding request and encoding response, but *not* the time to publish the response. + let elapsed = t.elapsed(); + match (should_reply, reply_to) { // We're supposed to reply and we have a reply_to queue: Reply. (true, Some(reply_to)) => { @@ -121,12 +133,17 @@ async fn handle_request( .map(|p| format!("{p:?}")) .unwrap_or_else(|| "".into()); - warn!("Request from handler {:?} did not contain a `correlation_id` property. A reply will be published, but the receiver may not recognize it as the reply for their request. (all properties: {req_props})", std::any::type_name::()); + warn!("Request from handler {handler_name:?} did not contain a `correlation_id` property. A reply will be published, but the receiver may not recognize it as the reply for their request. (all properties: {req_props})"); } // Warn in case of replying with an empty message, since this is _probably_ wrong or unintended. if bytes_response.is_empty() { - warn!("Handler {:?} produced an empty response to a message with a `reply_to` property. This is probably undesired, as the caller likely expects more of a response.", std::any::type_name::()); + warn!("Handler {handler_name:?} produced an empty response to a message with a `reply_to` property. This is probably undesired, as the caller likely expects more of a response (elapsed={elapsed:?})"); + } else { + info!( + "Response with {} bytes that will be published to {reply_to} (elapsed={elapsed:?})", + bytes_response.len() + ); } let publish = channel @@ -159,21 +176,18 @@ async fn handle_request( .map(|p| format!("{p:?}")) .unwrap_or_else(|| "".into()); - warn!("Received non-empty message from handler {handler:?} but the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props})."); + warn!("Received non-empty message from handler {handler:?} but the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props}, elapsed={elapsed:?})."); } // We are supposed to reply, but the request did not have a reply_to. // However we produced an empty response, so it's not like the caller missed any information. - // In this case, we just debug log and leave it as is. This was probably intentional. (true, None) => { - let handler = std::any::type_name::(); - let req_props = properties - .map(|p| format!("{p:?}")) - .unwrap_or_else(|| "".into()); - - debug!("Received empty message from handler {handler:?} which has should_reply = true; however the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props}). This is probably not an issue since the caller did not miss any information."); + info!("Handler finished (empty, should_reply = true, elapsed={elapsed:?})"); } // We are not supposed to reply so we won't. - (false, _) => (), + (false, _) => { + let len = bytes_response.len(); + info!("Handler finished ({len} bytes, should_reply = false, elapsed={elapsed:?})."); + } }; match req.delivery.map(|d| d.acker) { diff --git a/kanin/src/error.rs b/kanin/src/error.rs index 7b097b4..c233cf7 100644 --- a/kanin/src/error.rs +++ b/kanin/src/error.rs @@ -2,9 +2,9 @@ use std::convert::Infallible; -use log::{error, warn}; use prost::DecodeError; use thiserror::Error as ThisError; +use tracing::{error, warn}; /// Errors that may be returned by `kanin`, especially when the app runs. #[derive(Debug, ThisError)] @@ -14,7 +14,7 @@ pub enum Error { NoHandlers, /// An error from an underlying lapin call. #[error("An underlying `lapin` call failed: {0}")] - Lapin(#[from] lapin::Error), + Lapin(lapin::Error), } /// Errors that may be produced by handlers. Failing extractors provided by `kanin` return this error. @@ -22,11 +22,11 @@ pub enum Error { pub enum HandlerError { /// Errors due to invalid requests. #[error("Invalid Request: {0:#}")] - InvalidRequest(#[from] RequestError), + InvalidRequest(RequestError), /// Internal server errors that are not the fault of clients. #[error("Internal Server Error: {0:#}")] - Internal(#[from] ServerError), + Internal(ServerError), } impl HandlerError { @@ -42,7 +42,7 @@ pub enum RequestError { /// /// This error is left as an opaque error as that is what is provided by [`prost`]. #[error("Message could not be decoded into the required type: {0:#}")] - DecodeError(#[from] DecodeError), + DecodeError(DecodeError), } /// Errors due to bad configuration or usage from the server-side. @@ -91,7 +91,7 @@ where impl From for HandlerError { fn from(e: DecodeError) -> Self { - RequestError::from(e).into() + HandlerError::InvalidRequest(RequestError::DecodeError(e)) } } diff --git a/kanin/src/extract.rs b/kanin/src/extract.rs index dc28761..97ab958 100644 --- a/kanin/src/extract.rs +++ b/kanin/src/extract.rs @@ -1,9 +1,10 @@ //! Interface for types that can extract themselves from requests. mod message; +mod req_id; mod state; -use std::convert::Infallible; +use std::{convert::Infallible, error::Error}; use async_trait::async_trait; use lapin::{acker::Acker, message::Delivery, Channel}; @@ -11,6 +12,7 @@ use lapin::{acker::Acker, message::Delivery, Channel}; use crate::{error::HandlerError, Request}; pub use message::Msg; +pub use req_id::ReqId; pub use state::State; /// A trait for types that can be extracted from [requests](`Request`). @@ -20,7 +22,7 @@ pub use state::State; #[async_trait] pub trait Extract: Sized { /// The error to return in case extraction fails. - type Error; + type Error: Error; /// Extract the type from the request. async fn extract(req: &mut Request) -> Result; diff --git a/kanin/src/extract/req_id.rs b/kanin/src/extract/req_id.rs new file mode 100644 index 0000000..bebdc2f --- /dev/null +++ b/kanin/src/extract/req_id.rs @@ -0,0 +1,78 @@ +//! Request IDs. + +use core::fmt; +use std::convert::Infallible; + +use async_trait::async_trait; +use lapin::{ + message::Delivery, + types::{AMQPValue, LongString}, +}; +use uuid::Uuid; + +use crate::{Extract, Request}; + +/// Request IDs allow concurrent logs to be associated with a unique request. It can also enable requests +/// to be traced between different services by propagating the request IDs when calling other services. +/// This type implements [`Extract`], so it can be used in handlers. +#[derive(Debug, Clone)] +pub struct ReqId(AMQPValue); + +impl ReqId { + /// Create a new [`ReqId`] as a random UUID. + fn new() -> Self { + let uuid = Uuid::new_v4(); + let amqp_value = AMQPValue::LongString(LongString::from(uuid.to_string())); + Self(amqp_value) + } + + /// Create a [`ReqId`] from an AMQP Delivery. If no `req_id` is found in the headers of the + /// message then a new one is created. + pub(crate) fn from_delivery(delivery: &Delivery) -> Self { + let Some(headers) = delivery.properties.headers() else { + return Self::new(); + }; + + let Some(req_id) = headers.inner().get("req_id") else { + return Self::new(); + }; + + Self(req_id.clone()) + } +} + +/// [`AMQPValue`] does not implement `Display` but we provide a `Display` implementation for +/// `ReqId` to allow it to be used in tracing spans (see the `tracing` crate). +impl fmt::Display for ReqId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0 { + AMQPValue::LongString(req_id) => req_id.fmt(f), + AMQPValue::Boolean(b) => b.fmt(f), + AMQPValue::ShortShortInt(v) => v.fmt(f), + AMQPValue::ShortShortUInt(v) => v.fmt(f), + AMQPValue::ShortInt(v) => v.fmt(f), + AMQPValue::ShortUInt(v) => v.fmt(f), + AMQPValue::LongInt(v) => v.fmt(f), + AMQPValue::LongUInt(v) => v.fmt(f), + AMQPValue::LongLongInt(v) => v.fmt(f), + AMQPValue::Float(v) => v.fmt(f), + AMQPValue::Double(v) => v.fmt(f), + AMQPValue::DecimalValue(v) => write!(f, "{v:?}"), + AMQPValue::ShortString(v) => write!(f, "{v:?}"), + AMQPValue::FieldArray(v) => write!(f, "{v:?}"), + AMQPValue::Timestamp(v) => write!(f, "{v:?}"), + AMQPValue::FieldTable(v) => write!(f, "{v:?}"), + AMQPValue::ByteArray(v) => write!(f, "{v:?}"), + AMQPValue::Void => write!(f, "Void"), + } + } +} + +#[async_trait] +impl Extract for ReqId { + type Error = Infallible; + + async fn extract(req: &mut Request) -> Result { + Ok(req.req_id.clone()) + } +} diff --git a/kanin/src/extract/state.rs b/kanin/src/extract/state.rs index 6d0de04..6b92f98 100644 --- a/kanin/src/extract/state.rs +++ b/kanin/src/extract/state.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use async_trait::async_trait; use derive_more::{Deref, DerefMut}; -use log::error; +use tracing::error; use crate::{error::ServerError, Extract, HandlerError, Request}; diff --git a/kanin/src/handler.rs b/kanin/src/handler.rs index c1ac88f..f9e8e8c 100644 --- a/kanin/src/handler.rs +++ b/kanin/src/handler.rs @@ -46,7 +46,10 @@ macro_rules! impl_handler { $( let $ty = match $ty::extract(req).await { Ok(value) => value, - Err(error) => return Res::from_error(error), + Err(error) => { + tracing::error!("{error}"); + return Res::from_error(error); + } }; )* diff --git a/kanin/src/lib.rs b/kanin/src/lib.rs index 023a458..c8d5c67 100644 --- a/kanin/src/lib.rs +++ b/kanin/src/lib.rs @@ -146,7 +146,7 @@ mod tests { use std::time::Duration; use lapin::{Connection, ConnectionProperties}; - use log::warn; + use tracing::warn; const TEST_AMQP_ADDR: &str = "amqp://localhost"; diff --git a/kanin/src/request.rs b/kanin/src/request.rs index 1c481ec..1e68a95 100644 --- a/kanin/src/request.rs +++ b/kanin/src/request.rs @@ -7,6 +7,7 @@ use lapin::protocol::basic::AMQPProperties; use lapin::{message::Delivery, Channel}; use crate::app::StateMap; +use crate::extract::ReqId; /// An AMQP request. #[derive(Debug)] @@ -15,6 +16,9 @@ pub struct Request { state: Arc, /// The channel the message was received on. channel: Channel, + /// Request ID. This is a unique ID for every request. Either a newly created UUID or whatever + /// is found in the `req_id` header of the incoming AMQP message. + pub(crate) req_id: ReqId, /// The message delivery. pub(crate) delivery: Option, } @@ -25,6 +29,7 @@ impl Request { Self { state, channel, + req_id: ReqId::from_delivery(&delivery), delivery: Some(delivery), } } @@ -44,4 +49,11 @@ impl Request { pub fn properties(&self) -> Option<&AMQPProperties> { self.delivery.as_ref().map(|d| &d.properties) } + + /// Returns the `app_id` AMQP property of the request. + pub fn app_id(&self) -> Option<&str> { + self.properties() + .and_then(|p| p.app_id().as_ref()) + .map(|app_id| app_id.as_str()) + } } diff --git a/kanin/src/tests/send_recv.rs b/kanin/src/tests/send_recv.rs index 51c97f6..2385edb 100644 --- a/kanin/src/tests/send_recv.rs +++ b/kanin/src/tests/send_recv.rs @@ -2,13 +2,20 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use futures::future::join_all; -use lapin::{message::Delivery, options::BasicPublishOptions, BasicProperties, Channel}; -use log::info; +use lapin::{ + message::Delivery, + options::BasicPublishOptions, + types::{AMQPValue, FieldTable}, + BasicProperties, Channel, +}; use tokio::sync::{mpsc::Sender, OnceCell}; +use tracing::info; use crate::{ - error::FromError, extract::State, tests::init_logging, App, Extract, HandlerError, Request, - Respond, + error::FromError, + extract::{ReqId, State}, + tests::init_logging, + App, Extract, HandlerError, Request, Respond, }; use super::amqp_connect; @@ -67,6 +74,12 @@ async fn handler_delivery(_delivery: Delivery) -> MyResponse { MyResponse("handler_delivery".into()) } +async fn handler_req_id(req_id: ReqId) -> MyResponse { + assert_eq!(req_id.to_string(), "abc"); + SYNC.get().unwrap().send(()).await.unwrap(); + MyResponse("handler_req_id".into()) +} + async fn handler_two_extractors(_channel: Channel, _delivery: Delivery) -> MyResponse { SYNC.get().unwrap().send(()).await.unwrap(); MyResponse("handler_two_extractors".into()) @@ -107,6 +120,7 @@ async fn it_receives_various_messages_and_works_as_expected() { .handler("handler", handler) .handler("handler_channel", handler_channel) .handler("handler_delivery", handler_delivery) + .handler("handler_req_id", handler_req_id) .handler("handler_two_extractors", handler_two_extractors) .handler("handler_state_extractor", handler_state_extractor) .handler("listener", listener) @@ -131,13 +145,18 @@ async fn it_receives_various_messages_and_works_as_expected() { .expect("failed to create channel"); let send_msg = |routing_key: &'static str, reply_to: &'static str| async { + let mut headers = FieldTable::default(); + headers.insert("req_id".into(), AMQPValue::LongString("abc".into())); + channel .basic_publish( "", routing_key, BasicPublishOptions::default(), &[], - BasicProperties::default().with_reply_to(reply_to.into()), + BasicProperties::default() + .with_reply_to(reply_to.into()) + .with_headers(headers), ) .await .expect("failed to publish"); @@ -161,6 +180,9 @@ async fn it_receives_various_messages_and_works_as_expected() { send_msg("handler_delivery", "handler_message_reply_to").await; recv.recv().await.unwrap(); recv.recv().await.unwrap(); + send_msg("handler_req_id", "handler_message_reply_to").await; + recv.recv().await.unwrap(); + recv.recv().await.unwrap(); send_msg("handler_two_extractors", "handler_message_reply_to").await; recv.recv().await.unwrap(); recv.recv().await.unwrap(); @@ -213,6 +235,7 @@ async fn it_receives_various_messages_and_works_as_expected() { // "handler", // "handler_channel", // "handler_delivery", + // "handler_req_id", // "handler_two_extractors", "handler_state_extractor", "listener", @@ -229,6 +252,7 @@ async fn it_receives_various_messages_and_works_as_expected() { "handler_message", "handler_message", "handler_message", + "handler_message", "shutdown", ], recv_calls.as_ref() diff --git a/kanin_derive/Cargo.toml b/kanin_derive/Cargo.toml index e32165d..c275489 100644 --- a/kanin_derive/Cargo.toml +++ b/kanin_derive/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kanin_derive" -version = "0.5.0" +version = "0.5.2" edition = "2021" authors = ["Victor Nordam Suadicani "] description = "Derive macros for kanin" diff --git a/kanin_derive/src/lib.rs b/kanin_derive/src/lib.rs index 4407f54..65e063f 100644 --- a/kanin_derive/src/lib.rs +++ b/kanin_derive/src/lib.rs @@ -153,11 +153,9 @@ fn derive_enum(name: Ident, variants: Punctuated) -> TokenStream fn from_error(error: ::kanin::HandlerError) -> Self { match error { ::kanin::HandlerError::InvalidRequest(e) => { - ::log::warn!("{}", e); Self::#invalid_request_name(::kanin::error::FromError::from_error(e)) }, ::kanin::HandlerError::Internal(e) => { - ::log::error!("{}", e); Self::#internal_error_name(::kanin::error::FromError::from_error(e)) }, }