Skip to content

Commit

Permalink
feat: safe debug service url
Browse files Browse the repository at this point in the history
service url may contain sensitive information like password that should not leak in tracing / logs
  • Loading branch information
Nicolas NARDONE committed Oct 15, 2024
1 parent 0d62158 commit 232f8a0
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ log = "0.4"
tokio = { version = "1", features = ["sync"] }
tracing = { version = "0.1", optional = true }
tracing-futures = { version = "0.2", optional = true }
url = "2"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "net", "io-util"] }
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ pub trait Connector<SvcSrc, Svc, E> {
pub struct RoundRobin<SvcSrc, Svc, E, Conn>
where
Conn: Connector<SvcSrc, Svc, E>,
SvcSrc: ToString + Clone,
{
/// Sources used to connect to a service. Usually some form of URL to attempt a connection,
/// e.g. `amqp://localhost:5672`
Expand All @@ -192,7 +193,7 @@ where

impl<SvcSrc, Svc, E, Conn> RoundRobin<SvcSrc, Svc, E, Conn>
where
SvcSrc: Debug,
SvcSrc: Debug + Display + Clone,
E: Next + Display,
Conn: Connector<SvcSrc, Svc, E>,
{
Expand Down Expand Up @@ -264,8 +265,8 @@ where
#[cfg(feature = "tracing")]
{
let span = Span::current();
span.record("index", &display(index));
span.record("service", &debug(&self.sources[index]));
span.record("index", display(index));
span.record("service", debug(self.sources[index].clone()));
}

// Connect if not already connected
Expand Down
1 change: 1 addition & 0 deletions tourniquet-celery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ celery = { version = "0.5", default-features = false }
log = "0.4"
tourniquet = { version = "0.4", path = ".." }
tracing = { version = "0.1", optional = true }
url = "2"

[dev-dependencies]
serde = "1.0"
Expand Down
70 changes: 59 additions & 11 deletions tourniquet-celery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
//! # Example
//!
//! ```rust,no_run
//! # use celery::task::TaskResult;
//! # use celery::{task, task::TaskResult};
//! # use tourniquet::RoundRobin;
//! # use tourniquet_celery::{CeleryConnector, RoundRobinExt};
//! # use tourniquet_celery::{CeleryConnector, CelerySource, RoundRobinExt};
//! #
//! #[celery::task]
//! async fn do_work(work: String) -> TaskResult<()> {
Expand All @@ -18,16 +18,17 @@
//! # #[tokio::main]
//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let rr = RoundRobin::new(
//! vec!["amqp://rabbit01:5672/".to_owned(), "amqp://rabbit02:5672".to_owned()],
//! vec![CelerySource::from("amqp://rabbit01:5672/".to_owned()), CelerySource::from("amqp://rabbit02:5672".to_owned())],
//! CeleryConnector { name: "rr", routes: &[("*", "my_route")], ..Default::default() },
//! );
//!
//! # let work = "foo".to_owned();
//! rr.send_task(|| do_work::new(work.clone())).await.expect("Failed to send task");
//! rr.send_task(|| do_work(work.clone())).await.expect("Failed to send task");
//! # Ok(())
//! # }
//! ```
use std::borrow::Cow;
use std::error::Error;
use std::fmt::{Debug, Display, Error as FmtError, Formatter};

Expand All @@ -38,6 +39,8 @@ use celery::{
task::{AsyncResult, Signature, Task},
Celery, CeleryBuilder,
};
use url::Url;

use tourniquet::{Connector, Next, RoundRobin};
#[cfg(feature = "trace")]
use tracing::{
Expand Down Expand Up @@ -92,6 +95,37 @@ impl Error for RRCeleryError {
}
}

/// Wrapper for String
#[derive(Clone)]
pub struct CelerySource(String);

impl From<String> for CelerySource {
fn from(src: String) -> Self {
Self(src)
}
}

fn safe_source(url: &String) -> Cow<'_, String> {
// URL that is safe to log (password stripped)
let Some(mut url_safe): Option<Url> = url.parse().ok() else { return Cow::Borrowed(url) };
if url_safe.password().is_some() {
let _ = url_safe.set_password(Some("********"));
}
Cow::Owned(url_safe.to_string())
}

impl Display for CelerySource {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
Display::fmt(&safe_source(&self.0), f)
}
}

impl Debug for CelerySource {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
Debug::fmt(&safe_source(&self.0), f)
}
}

/// Ready to use connector for Celery.
///
/// Please refer to
Expand All @@ -115,10 +149,10 @@ impl<'a> Default for CeleryConnector<'a> {
}

#[async_trait]
impl<'a> Connector<String, Celery, RRCeleryError> for CeleryConnector<'a> {
#[cfg_attr(feature = "trace", tracing::instrument(skip(self), err))]
async fn connect(&self, url: &String) -> Result<Celery, RRCeleryError> {
let mut builder = CeleryBuilder::new(self.name, url.as_ref());
impl<'a> Connector<CelerySource, Celery, RRCeleryError> for CeleryConnector<'a> {
#[cfg_attr(feature = "trace", instrument(skip(self), err))]
async fn connect(&self, src: &CelerySource) -> Result<Celery, RRCeleryError> {
let mut builder = CeleryBuilder::new(self.name, src.0.as_ref());

if let Some(queue) = self.default_queue {
builder = builder.default_queue(queue);
Expand All @@ -145,7 +179,7 @@ pub trait RoundRobinExt {
#[async_trait]
impl<SvcSrc, Conn> RoundRobinExt for RoundRobin<SvcSrc, Celery, RRCeleryError, Conn>
where
SvcSrc: Debug + Send + Sync,
SvcSrc: Debug + Send + Sync + Display + Clone,
Conn: Connector<SvcSrc, Celery, RRCeleryError> + Send + Sync,
{
/// Send a Celery task.
Expand All @@ -172,11 +206,25 @@ where
self.run(|celery| async move { Ok(celery.send_task(task_gen()).await?) }).await?;

#[cfg(feature = "trace")]
Span::current().record("task_id", &display(&task.task_id));
Span::current().record("task_id", display(&task.task_id));

Ok(task)
}
}

/// Shorthand type for a basic RoundRobin type using Celery
pub type CeleryRoundRobin = RoundRobin<String, Celery, RRCeleryError, CeleryConnector<'static>>;
pub type CeleryRoundRobin =
RoundRobin<CelerySource, Celery, RRCeleryError, CeleryConnector<'static>>;

#[cfg(test)]
mod tests {
use super::CelerySource;

#[test]
fn test_display_debug_celery_source_strips_password() {
let source = CelerySource::from("amqp://mylogin:[email protected]/product".to_owned());

assert_eq!(format!("{source}"),"amqp://mylogin:********@rabbitmq.myserver.com/product");
assert_eq!(format!("{source:?}"),"\"amqp://mylogin:********@rabbitmq.myserver.com/product\"");
}
}

0 comments on commit 232f8a0

Please sign in to comment.