Skip to content

Commit

Permalink
Reintroduce acker (#39)
Browse files Browse the repository at this point in the history
* Better error logging

* Reintroduce acker but wrap lapin's interface

* change codeowners

* Infallible error for acker
  • Loading branch information
Victor-N-Suadicani authored Apr 24, 2024
1 parent 265508d commit 9306a48
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 26 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @issuu/platypus
* @issuu/backend
2 changes: 1 addition & 1 deletion kanin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kanin"
version = "0.32.0"
version = "0.32.1"
edition = "2021"
authors = ["Victor Nordam Suadicani <[email protected]>"]
description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)."
Expand Down
2 changes: 1 addition & 1 deletion kanin/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<S> App<S> {
/// On connection errors, the app will attempt to gracefully shutdown.
///
/// # Panics
/// Panics in your handlers does not cause the app to shutdown. Requests will be nacked in this case..
/// Panics in your handlers does not cause the app to shutdown. Requests will be rejected in this case.
///
/// Internal panics inside kanin's code will however shut down the app. This shouldn't happen though (please report it if it does).
#[inline]
Expand Down
20 changes: 13 additions & 7 deletions kanin/src/app/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use lapin::{
};
use metrics::gauge;
use tokio::sync::broadcast;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{Error, Handler, HandlerConfig, Request, Respond, Result};

Expand Down Expand Up @@ -88,7 +88,10 @@ where
// We should only ever get to this point if the consumer is cancelled (see lapin::Consumer's implementation of Stream).
// We'll attempt a graceful shutdown in this case.
// We'll return the routing key - might be a help for the user to see which consumer got cancelled.
None => break Err(Error::ConsumerCancelled(routing_key)),
None => {
error!("Consumer cancelled, attempting to gracefully shut down...");
break Err(Error::ConsumerCancelled(routing_key));
},
},
};

Expand All @@ -107,7 +110,7 @@ where
// Requests are handled and replied to concurrently.
// This allows each handler task to process multiple requests at once.
tasks.push(tokio::spawn(async move {
let span = info_span!("request", req_id = %req.req_id());
let span = error_span!("request", req_id = %req.req_id());

handle_request(req, handler, channel, should_reply)
.instrument(span)
Expand Down Expand Up @@ -176,7 +179,7 @@ where
///
/// Acks the request and responds if the handler executes normally.
///
/// If the handler panicks, the request will be nacked and instructed to requeue.
/// If the handler panicks, the request will be rejected and instructed to requeue.
async fn handle_request<H, S, Args, Res>(
mut req: Request<S>,
handler: H,
Expand Down Expand Up @@ -278,9 +281,12 @@ async fn handle_request<H, S, Args, Res>(
};

// Remember to ack, otherwise the AMQP broker will think we failed to process the request!
match req.ack(BasicAckOptions::default()).await {
Ok(()) => debug!("Successfully acked request."),
Err(e) => error!("Failed to ack request: {e:#}"),
// We don't ack if we've already done it, via the handler extracting the acker.
if !req.acked {
match req.ack(BasicAckOptions::default()).await {
Ok(()) => debug!("Successfully acked request."),
Err(e) => error!("Failed to ack request: {e:#}"),
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions kanin/src/extract.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Interface for types that can extract themselves from requests.

mod acker;
mod app_id;
mod message;
mod req_id;
mod state;

pub use acker::Acker;
pub use app_id::AppId;
pub use message::Msg;
pub use req_id::ReqId;
Expand Down
74 changes: 74 additions & 0 deletions kanin/src/extract/acker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! Manual acknowledgement and rejection.

use std::{convert::Infallible, mem};

use async_trait::async_trait;
use lapin::{
acker::Acker as LapinAcker,
options::{BasicAckOptions, BasicRejectOptions},
};

use crate::{Extract, Request};

/// An extractor that allows you manual control of acknowledgement and rejection of messages.
///
/// Note that when you extract an `Acker`, kanin _will not_ acknowledge the message for you.
/// Neither will it reject the message if your handler panicks.
///
/// When you extract this, you are responsible for acknowledging or rejecting yourself.
#[must_use = "You must call .ack or .reject in order to acknowledge or reject the message."]
#[derive(Debug)]
pub struct Acker(LapinAcker);

impl Acker {
/// Acks the message that was received for this acker.
///
/// # Errors
/// Returns `Err` on network failures.
// Note that since we consume the acker, it should not be possible to call this twice.
// Thus that error possibility is not listed.
pub async fn ack(self) -> Result<(), lapin::Error> {
self.0
.ack(BasicAckOptions {
// It does not make sense to use this flag with kanin, as it might interfere with handling of other previous messages.
multiple: false,
})
.await
}

/// Rejects the message that was received for this acker.
///
/// # Errors
/// Returns `Err` on network failures.
// Note that since we consume the acker, it should not be possible to call this twice.
// Thus that error possibility is not listed.
pub async fn reject(self, options: BasicRejectOptions) -> Result<(), lapin::Error> {
self.0.reject(options).await
}
}

/// Extract implementation for the AMQP acker.
#[async_trait]
impl<S> Extract<S> for Acker
where
S: Send + Sync,
{
type Error = Infallible;

async fn extract(req: &mut Request<S>) -> Result<Self, Self::Error> {
// This is quite a hacky way of taking the acker. We should improve this if/when lapin improves the interface.
// See also https://github.com/amqp-rs/lapin/issues/402.
let acker = mem::take(&mut req.delivery_mut().acker);

// The request will consider itself acked. It is up to the handler to actually ack the request.
req.acked = true;

if acker == LapinAcker::default() {
panic!(
"extracted acker was equal to the default acker - did you extract an acker twice?"
);
}

Ok(Acker(acker))
}
}
35 changes: 19 additions & 16 deletions kanin/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::sync::Arc;

use lapin::options::{BasicAckOptions, BasicNackOptions};
use lapin::options::{BasicAckOptions, BasicRejectOptions};
use lapin::protocol::basic::AMQPProperties;

use lapin::{message::Delivery, Channel};
Expand All @@ -19,7 +19,8 @@ pub struct Request<S> {
/// is found in the `req_id` header of the incoming AMQP message.
req_id: ReqId,
/// Has this message been (n)ack'ed?
acked: bool,
// This has to be pub within kanin so that the acker extractor can set it.
pub(crate) acked: bool,
/// The channel the message was received on.
channel: Channel,
/// The message delivery.
Expand Down Expand Up @@ -48,6 +49,13 @@ impl<S> Request<S> {
&self.delivery
}

/// Returns a mutable reference to the delivery of this request.
///
/// For now, this is a private interface. It could potentially be made public in the future.
pub(crate) fn delivery_mut(&mut self) -> &mut Delivery {
&mut self.delivery
}

/// Returns the app state for the given type.
pub fn state<T>(&self) -> T
where
Expand Down Expand Up @@ -82,7 +90,7 @@ impl<S> Request<S> {
}
}

/// We implement [`Drop`] on [`Request`] to ensure that requests that were not explicitly acknowledged will be nacked.
/// We implement [`Drop`] on [`Request`] to ensure that requests that were not explicitly acknowledged will be rejected.
impl<S> Drop for Request<S> {
fn drop(&mut self) {
// If we already acked, do nothing.
Expand All @@ -92,25 +100,20 @@ impl<S> Drop for Request<S> {

// We haven't acked and the request is being dropped.
// This almost certainly indicates a panic during request handling.
// We will nack the request to tell the AMQP broker to requeue this message ASAP.
warn!("Nacking unacked request {} due to drop.", self.req_id);
// We will reject the request to tell the AMQP broker to requeue this message ASAP.
warn!("Rejecting unacked request {} due to drop.", self.req_id);

let req_id = self.req_id.clone();
// Yoink the acker from the delivery so we can give it to a future to nack the message.
// Yoink the acker from the delivery so we can give it to a future to reject the message.
// This is a bit of a hack. Hopefully lapin improves the interface in the future, see also https://github.com/amqp-rs/lapin/issues/402.
let acker = std::mem::take(&mut self.delivery.acker);

// Nacking is async so we have to spawn a task to do it.
// Rejecting is async so we have to spawn a task to do it.
// Unfortunately we can't really be sure that this ever completes.
tokio::spawn(async move {
match acker
.nack(BasicNackOptions {
multiple: false,
requeue: true,
})
.await
{
Ok(()) => debug!("Successfully nacked request {} during drop.", req_id),
Err(e) => error!("Failed to nack request {} during drop: {e}", req_id),
match acker.reject(BasicRejectOptions { requeue: true }).await {
Ok(()) => debug!("Successfully rejected request {} during drop.", req_id),
Err(e) => error!("Failed to reject request {} during drop: {e}", req_id),
}
});

Expand Down

0 comments on commit 9306a48

Please sign in to comment.