Skip to content

Commit

Permalink
Remove delivery extraction, enable easy consumer timeout and concurre…
Browse files Browse the repository at this point in the history
…nt task creation (#35)

* Remove delivery extraction and concurrent task creation

* Fix tests
  • Loading branch information
Victor-N-Suadicani authored Feb 9, 2024
1 parent 0bc3cf2 commit 66f3f54
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 165 deletions.
6 changes: 3 additions & 3 deletions kanin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kanin"
version = "0.28.1"
version = "0.29.0"
edition = "2021"
authors = ["Victor Nordam Suadicani <[email protected]>"]
description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)."
Expand All @@ -13,10 +13,10 @@ readme = "../README.md"

[dependencies]
# Derive macros for traits in kanin.
kanin_derive = "0.6.0"
kanin_derive = "0.7.0"

# Lower level AMQP framework.
lapin = "2.1.0"
lapin = "2.3.1"

# Generalized tracing framework.
tracing = "0.1.37"
Expand Down
11 changes: 6 additions & 5 deletions kanin/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod task;

use std::sync::Arc;

use futures::future::{select_all, SelectAll};
use futures::future::{select_all, try_join_all, SelectAll};
use lapin::{self, Connection, ConnectionProperties};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace};
Expand Down Expand Up @@ -149,9 +149,8 @@ impl<S> App<S> {
panic!("panicking due to connection error");
});

let mut join_handles = Vec::new();
let state = Arc::new(self.state);
for task_factory in self.handlers.into_iter() {
let mut join_handles = try_join_all(self.handlers.into_iter().map(|task_factory| async {
debug!(
"Spawning handler task for routing key: {:?} ...",
task_factory.routing_key()
Expand All @@ -164,8 +163,10 @@ impl<S> App<S> {
.map_err(Error::Lapin)?;

// Spawn the task and save the join handle.
join_handles.push(tokio::spawn(task));
}
Ok(tokio::spawn(task))
}))
.await?;

info!(
"Connected to AMQP broker. Listening on {} handler{}.",
join_handles.len(),
Expand Down
48 changes: 20 additions & 28 deletions kanin/src/app/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where
error!("Error when receiving delivery on routing key \"{routing_key}\": {e:#}");
continue;
}
// Construct the request by bundling the channel and the delivery.
// Construct the request by bundling the channel, the delivery and the app state.
Ok(delivery) => Request::new(channel.clone(), delivery, state.clone()),
};

Expand All @@ -78,7 +78,7 @@ where
// Requests are handled and replied to concurrently.
// This allows each handler task to process multiple requests at once.
tasks.push(tokio::spawn(async move {
let span = info_span!("request", req_id = %req.req_id);
let span = info_span!("request", req_id = %req.req_id());

handle_request(req, handler, channel, should_reply)
.instrument(span)
Expand All @@ -100,19 +100,23 @@ async fn handle_request<H, S, Args, Res>(
H: Handler<Args, Res, S>,
Res: Respond,
{
let properties = req.properties().cloned();
let reply_to = properties.as_ref().and_then(|p| p.reply_to().clone());
let correlation_id = properties.as_ref().and_then(|p| p.correlation_id().clone());
let app_id = req.app_id().unwrap_or("<unknown>");

let handler_name = std::any::type_name::<H>();
let app_id = req.app_id().unwrap_or("<unknown>");
info!("Received request on handler {handler_name:?} from {app_id}");

if req.delivery.redelivered {
info!("Request was redelivered.");
}

let t = std::time::Instant::now();

// Call the handler with the request.
let response = handler.call(&mut req).await;

let properties = req.properties();
let reply_to = properties.reply_to();
let correlation_id = properties.correlation_id();

debug!("Handler {handler_name:?} produced response {response:?}");

let bytes_response = response.respond();
Expand All @@ -126,13 +130,9 @@ async fn handle_request<H, S, Args, Res>(
let mut props = BasicProperties::default();

if let Some(correlation_id) = correlation_id {
props = props.with_correlation_id(correlation_id);
props = props.with_correlation_id(correlation_id.clone());
} else {
let req_props = properties
.map(|p| format!("{p:?}"))
.unwrap_or_else(|| "<None>".into());

warn!("Request from handler {handler_name:?} did not contain a `correlation_id` property. A reply will be published, but the receiver may not recognize it as the reply for their request. (all properties: {req_props})");
warn!("Request from handler {handler_name:?} did not contain a `correlation_id` property. A reply will be published, but the receiver may not recognize it as the reply for their request. (all properties: {properties:?})");
}

// Warn in case of replying with an empty message, since this is _probably_ wrong or unintended.
Expand Down Expand Up @@ -173,11 +173,7 @@ async fn handle_request<H, S, Args, Res>(
// Even worse, the response we produced is non-empty - it was probably meant to be received by someone!
// In this case, we warn. Empty responses may be produced by non-responding handlers, which is fine.
(true, None) if !bytes_response.is_empty() => {
let req_props = properties
.map(|p| format!("{p:?}"))
.unwrap_or_else(|| "<None>".into());

warn!("Received non-empty message from handler {handler_name:?} but the request did not contain a `reply_to` property, so no reply could be published (all properties: {req_props}, elapsed={elapsed:?}).");
warn!("Received non-empty message from handler {handler_name:?} but the request did not contain a `reply_to` property, so no reply could be published (all properties: {properties:?}, elapsed={elapsed:?}).");
}
// We are supposed to reply, but the request did not have a reply_to.
// However we produced an empty response, so it's not like the caller missed any information.
Expand All @@ -195,17 +191,13 @@ async fn handle_request<H, S, Args, Res>(
}
};

match req.delivery.map(|d| d.acker) {
// Check if it's the default - this signifies that it was already extracted.
// In that case, it is the responsibility of the handler to acknowledge, so we won't do it.
Some(acker) if acker != Acker::default() => {
match acker.ack(BasicAckOptions::default()).await {
Ok(()) => debug!("Successfully acked request."),
Err(e) => error!("Failed to ack request: {e:#}"),
}
// Check if it's the default - this signifies that it was already extracted.
// In that case, it is the responsibility of the handler to acknowledge, so we won't do it.
if req.delivery.acker != Acker::default() {
match req.delivery.acker.ack(BasicAckOptions::default()).await {
Ok(()) => debug!("Successfully acked request."),
Err(e) => error!("Failed to ack request: {e:#}"),
}
// If the delivery or acker was extracted, it is up to the request handler itself to acknowledge the request.
_ => (),
}
}

Expand Down
19 changes: 0 additions & 19 deletions kanin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@ pub enum HandlerError {
/// Errors due to invalid requests.
#[error("Invalid Request: {0:#}")]
InvalidRequest(RequestError),

/// Internal server errors that are not the fault of clients.
#[error("Internal Server Error: {0:#}")]
Internal(ServerError),
}

impl HandlerError {
/// Convenient const for this otherwise long error.
pub const DELIVERY_ALREADY_EXTRACTED: Self =
Self::Internal(ServerError::DeliveryAlreadyExtracted);
}

/// All the ways a request might be invalid.
Expand All @@ -45,14 +35,6 @@ pub enum RequestError {
DecodeError(DecodeError),
}

/// Errors due to bad configuration or usage from the server-side.
#[derive(Debug, ThisError)]
pub enum ServerError {
/// A handler attempted to extract the delivery of a message twice.
#[error("The delivery was already extracted from the request and could not be accessed")]
DeliveryAlreadyExtracted,
}

/// Types that may be constructed from errors.
///
/// You must implement `FromError<kanin::HandlerError> for T` for any return type `T` of your handlers.
Expand Down Expand Up @@ -100,7 +82,6 @@ impl FromError<HandlerError> for () {
HandlerError::InvalidRequest(e) => {
warn!("Listener handler received an invalid request: {e:#}")
}
HandlerError::Internal(e) => error!("Internal error on listener handler: {e:#}"),
}
}
}
40 changes: 11 additions & 29 deletions kanin/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ mod message;
mod req_id;
mod state;

use std::{convert::Infallible, error::Error};

use async_trait::async_trait;
use lapin::{acker::Acker, message::Delivery, Channel};

use crate::{error::HandlerError, Request};

pub use app_id::AppId;
pub use message::Msg;
pub use req_id::ReqId;
pub use state::State;

use std::{convert::Infallible, error::Error};

use async_trait::async_trait;
use lapin::{acker::Acker, Channel};

use crate::Request;

/// A trait for types that can be extracted from [requests](`Request`).
///
/// Note that extractions might mutate the request in certain ways.
/// Most notably, if extracting the [`Delivery`] or [`Acker`] from a request, it is the responsibility of the handler to acknowledge the message.
/// Most notably, if extracting the [`Acker`] from a request, it is the responsibility of the handler to acknowledge the message.
#[async_trait]
pub trait Extract<S>: Sized {
/// The error to return in case extraction fails.
Expand All @@ -30,36 +30,18 @@ pub trait Extract<S>: Sized {
async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error>;
}

/// Note that when you extract the [`Delivery`], the handler itself must acknowledge the request.
/// Kanin *will not* acknowledge the request for you in this case.
#[async_trait]
impl<S> Extract<S> for Delivery
where
S: Send + Sync,
{
type Error = HandlerError;

async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
req.delivery
.take()
.ok_or(HandlerError::DELIVERY_ALREADY_EXTRACTED)
}
}

/// Note that when you extract the [`Acker`], the handler itself must acknowledge the request.
/// kanin *will not* acknowledge the request for you in this case.
// TODO: This implementation is quite hacky and should probably be removed.
#[async_trait]
impl<S> Extract<S> for Acker
where
S: Send + Sync,
{
type Error = HandlerError;
type Error = Infallible;

async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
req.delivery
.as_mut()
.ok_or(HandlerError::DELIVERY_ALREADY_EXTRACTED)
.map(|d| std::mem::take(&mut d.acker))
Ok(std::mem::take(&mut req.delivery.acker))
}
}

Expand Down
8 changes: 1 addition & 7 deletions kanin/src/extract/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ where
type Error = HandlerError;

async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
match &req.delivery {
None => Err(HandlerError::DELIVERY_ALREADY_EXTRACTED),
Some(d) => {
let data: &[u8] = &d.data;
Ok(Msg(D::decode(data)?))
}
}
Ok(Msg(D::decode(&req.delivery.data[..])?))
}
}
2 changes: 1 addition & 1 deletion kanin/src/extract/req_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ where
type Error = Infallible;

async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
Ok(req.req_id.clone())
Ok(req.req_id().clone())
}
}
23 changes: 19 additions & 4 deletions kanin/src/handler_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::time::Duration;

use lapin::options::QueueDeclareOptions;
use lapin::types::{AMQPValue, FieldTable, ShortString};
use lapin::types::{AMQPValue, FieldTable, LongString, ShortString};

/// Detailed configuration of a handler.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -45,7 +45,7 @@ impl HandlerConfig {
Default::default()
}

/// Sets the exchange of the handler. Defaults to the direct exchange, [`HandlerConfig::DIRECT_EXCHANGE`].
/// Sets the queue name. Defaults to the same as the routing key.
pub fn with_queue(mut self, queue: impl Into<String>) -> Self {
self.queue = Some(queue.into());
self
Expand Down Expand Up @@ -106,7 +106,10 @@ impl HandlerConfig {
}

/// Sets the `x-dead-letter-exchange` argument on the queue. See also [RabbitMQ's documentation](https://www.rabbitmq.com/dlx.html).
pub fn with_dead_letter_exchange(mut self, dead_letter_exchange: String) -> Self {
pub fn with_dead_letter_exchange(
mut self,
dead_letter_exchange: impl Into<LongString>,
) -> Self {
self.arguments.insert(
"x-dead-letter-exchange".into(),
AMQPValue::LongString(dead_letter_exchange.into()),
Expand All @@ -115,14 +118,26 @@ impl HandlerConfig {
}

/// Sets the `x-dead-letter-routing-key` argument on the queue. See also [RabbitMQ's documentation](https://www.rabbitmq.com/dlx.html).
pub fn with_dead_letter_routing_key(mut self, dead_letter_routing_key: String) -> Self {
pub fn with_dead_letter_routing_key(
mut self,
dead_letter_routing_key: impl Into<LongString>,
) -> Self {
self.arguments.insert(
"x-dead-letter-routing-key".into(),
AMQPValue::LongString(dead_letter_routing_key.into()),
);
self
}

/// Sets the `x-consumer-timeout` argument on the queue. See also [RabbitMQ's documentation](https://www.rabbitmq.com/consumers.html).
pub fn with_consumer_timeout(mut self, consumer_timeout: impl Into<LongString>) -> Self {
self.arguments.insert(
"x-consumer-timeout".into(),
AMQPValue::LongString(consumer_timeout.into()),
);
self
}

/// Set any argument with any value.
///
/// Prefer the more specific methods if you can, but you can use this for any specific argument you might want to set.
Expand Down
1 change: 0 additions & 1 deletion kanin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//! # #[prost(string, tag="1")]
//! # pub error: ::prost::alloc::string::String,
//! # }
//! # #[derive(kanin::FromError)]
//! # #[derive(Clone, PartialEq, ::prost::Message)]
//! # pub struct InternalError {
//! # #[prost(string, tag="1")]
Expand Down
Loading

0 comments on commit 66f3f54

Please sign in to comment.