Skip to content

Commit

Permalink
Duration for the consumer timeout (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor-N-Suadicani authored Apr 5, 2024
1 parent ded8a84 commit 265508d
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 25 deletions.
2 changes: 1 addition & 1 deletion kanin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kanin"
version = "0.31.0"
version = "0.32.0"
edition = "2021"
authors = ["Victor Nordam Suadicani <[email protected]>"]
description = "An RPC microservice framework for AMQP, protobuf and Rust built on lapin (https://github.com/amqp-rs/lapin)."
Expand Down
8 changes: 6 additions & 2 deletions kanin/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<S> App<S> {
}
}

/// Returns a [`broadcast::Sender<()>`]. If you send a message on this channel, the app will gracefully shut down.
/// Returns a [`tokio::sync::broadcast::Sender`]. If you send a message on this channel, the app will gracefully shut down.
pub fn shutdown_channel(&self) -> broadcast::Sender<()> {
self.shutdown.clone()
}
Expand Down Expand Up @@ -176,8 +176,12 @@ impl<S> App<S> {
/// * A connection to the AMQP broker could not be established.
/// * Queue/consumer declaration or binding failed while setting up a handler.
///
/// On connection errors, the app will attempt to gracefully shutdown.
///
/// # Panics
/// On connection errors, the app will simply panic.
/// Panics in your handlers does not cause the app to shutdown. Requests will be nacked in this case..
///
/// Internal panics inside kanin's code will however shut down the app. This shouldn't happen though (please report it if it does).
#[inline]
pub async fn run_with_connection(self, conn: &Connection) -> Result<()> {
// Describe metrics (just need to do it somewhere once as we run the app).
Expand Down
4 changes: 2 additions & 2 deletions kanin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ pub enum Error {
/// The app was started with no handlers registered.
#[error("No handlers were registered on the app.")]
NoHandlers,
/// The app exited due to a consumer from the AMQP broker cancelling.
/// The app exited due to a consumer from the AMQP broker cancelling. The routing key of the consumer is given.
#[error("Consumer cancelled on routing key {0}")]
ConsumerCancelled(String),
/// An error from an underlying lapin call.
/// An error from an underlying [`lapin`] call.
#[error("An underlying `lapin` call failed: {0}")]
Lapin(lapin::Error),
}
Expand Down
40 changes: 21 additions & 19 deletions kanin/src/handler_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::time::Duration;

use lapin::options::QueueDeclareOptions;
use lapin::types::{AMQPValue, FieldTable, LongString, ShortString};
use lapin::types::{AMQPValue, FieldTable};

/// Detailed configuration of a handler.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -82,10 +82,10 @@ impl HandlerConfig {
// Panic is extremely unlikely, let's not bother.
#[allow(clippy::missing_panics_doc)]
pub fn with_expires(mut self, expires: Duration) -> Self {
let millis: u32 = expires
let millis: i64 = expires
.as_millis()
.try_into()
.expect("Duration too long to fit milliseconds in 32 bits");
.expect("Duration too long to fit milliseconds in i64");

self.arguments.insert("x-expires".into(), millis.into());
self
Expand All @@ -96,53 +96,55 @@ impl HandlerConfig {
// Panic is extremely unlikely, let's not bother.
#[allow(clippy::missing_panics_doc)]
pub fn with_message_ttl(mut self, message_ttl: Duration) -> Self {
let millis: u32 = message_ttl
let millis: i64 = message_ttl
.as_millis()
.try_into()
.expect("Duration too long to fit milliseconds in 32 bits");
.expect("Duration too long to fit milliseconds in i64");

self.arguments.insert("x-message-ttl".into(), millis.into());
self
}

/// Sets the `x-dead-letter-exchange` argument on the queue. See also [RabbitMQ's documentation](https://www.rabbitmq.com/dlx.html).
pub fn with_dead_letter_exchange(
mut self,
dead_letter_exchange: impl Into<LongString>,
) -> Self {
pub fn with_dead_letter_exchange(mut self, dead_letter_exchange: impl Into<String>) -> Self {
self.arguments.insert(
"x-dead-letter-exchange".into(),
AMQPValue::LongString(dead_letter_exchange.into()),
AMQPValue::LongString(dead_letter_exchange.into().into()),
);
self
}

/// Sets the `x-dead-letter-routing-key` argument on the queue. See also [RabbitMQ's documentation](https://www.rabbitmq.com/dlx.html).
pub fn with_dead_letter_routing_key(
mut self,
dead_letter_routing_key: impl Into<LongString>,
dead_letter_routing_key: impl Into<String>,
) -> Self {
self.arguments.insert(
"x-dead-letter-routing-key".into(),
AMQPValue::LongString(dead_letter_routing_key.into()),
AMQPValue::LongString(dead_letter_routing_key.into().into()),
);
self
}

/// Sets the `x-consumer-timeout` argument on the queue. See also [RabbitMQ's documentation](https://www.rabbitmq.com/consumers.html).
pub fn with_consumer_timeout(mut self, consumer_timeout: impl Into<LongString>) -> Self {
self.arguments.insert(
"x-consumer-timeout".into(),
AMQPValue::LongString(consumer_timeout.into()),
);
// Panic is extremely unlikely, let's not bother.
#[allow(clippy::missing_panics_doc)]
pub fn with_consumer_timeout(mut self, consumer_timeout: Duration) -> Self {
let millis: i64 = consumer_timeout
.as_millis()
.try_into()
.expect("Duration too long to fit milliseconds in i64");

self.arguments
.insert("x-consumer-timeout".into(), millis.into());
self
}

/// Set any argument with any value.
///
/// Prefer the more specific methods if you can, but you can use this for any specific argument you might want to set.
pub fn with_arg(mut self, arg: impl Into<ShortString>, value: impl Into<AMQPValue>) -> Self {
self.arguments.insert(arg.into(), value.into());
pub fn with_arg(mut self, arg: impl Into<String>, value: impl Into<AMQPValue>) -> Self {
self.arguments.insert(arg.into().into(), value.into());
self
}

Expand Down
2 changes: 1 addition & 1 deletion kanin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
//! # use kanin::{extract::{Msg, State}, App, AppState};
//! # use protobuf::{EchoRequest, EchoResponse};
//! #
//! // EchoRequest and EchoResponse are protobuf messages as generated by prost_build: https://docs.rs/prost-build/0.10.4/prost_build/index.html
//! // EchoRequest and EchoResponse are protobuf messages as generated by prost_build: https://docs.rs/prost-build/latest/prost_build/index.html
//! async fn echo(Msg(request): Msg<EchoRequest>, State(num): State<u8>) -> EchoResponse {
//! assert_eq!(42, num);
//!
Expand Down

0 comments on commit 265508d

Please sign in to comment.