Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
feat: Track response in error details (#29)
Browse files Browse the repository at this point in the history
Co-authored-by: Brett Hoerner <[email protected]>
  • Loading branch information
tomasfarias and bretthoerner authored May 3, 2024
1 parent c6d5c67 commit f7e02cc
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 56 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ metrics = "0.22.0"
metrics-exporter-prometheus = "0.14.0"
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
reqwest = { version = "0.12.3" }
reqwest = { version = "0.12.3", features = ["stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand Down
111 changes: 106 additions & 5 deletions hook-worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,125 @@
use std::fmt;
use std::time;

use hook_common::pgqueue;
use hook_common::{pgqueue, webhook::WebhookJobError};
use thiserror::Error;

/// Enumeration of errors related to webhook job processing in the WebhookWorker.
/// Enumeration of error classes handled by `WebhookWorker`.
#[derive(Error, Debug)]
pub enum WebhookError {
#[error(transparent)]
Parse(#[from] WebhookParseError),
#[error(transparent)]
Request(#[from] WebhookRequestError),
}

/// Enumeration of parsing errors that can occur as `WebhookWorker` sets up a webhook.
#[derive(Error, Debug)]
pub enum WebhookParseError {
#[error("{0} is not a valid HttpMethod")]
ParseHttpMethodError(String),
#[error("error parsing webhook headers")]
ParseHeadersError(http::Error),
#[error("error parsing webhook url")]
ParseUrlError(url::ParseError),
#[error("a webhook could not be delivered but it could be retried later: {error}")]
}

/// Enumeration of request errors that can occur as `WebhookWorker` sends a request.
#[derive(Error, Debug)]
pub enum WebhookRequestError {
RetryableRequestError {
error: reqwest::Error,
response: Option<String>,
retry_after: Option<time::Duration>,
},
#[error("a webhook could not be delivered and it cannot be retried further: {0}")]
NonRetryableRetryableRequestError(reqwest::Error),
NonRetryableRetryableRequestError {
error: reqwest::Error,
response: Option<String>,
},
}

/// Enumeration of errors that can occur while handling a `reqwest::Response`.
/// Currently, not consumed anywhere. Grouped here to support a common error type for
/// `utils::first_n_bytes_of_response`.
#[derive(Error, Debug)]
pub enum WebhookResponseError {
#[error("failed to parse a response as UTF8")]
ParseUTF8StringError(#[from] std::str::Utf8Error),
#[error("error while iterating over response body chunks")]
StreamIterationError(#[from] reqwest::Error),
#[error("attempted to slice a chunk of length {0} with an out of bounds index of {1}")]
ChunkOutOfBoundsError(usize, usize),
}

/// Implement display of `WebhookRequestError` by appending to the underlying `reqwest::Error`
/// any response message if available.
impl fmt::Display for WebhookRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WebhookRequestError::RetryableRequestError {
error, response, ..
}
| WebhookRequestError::NonRetryableRetryableRequestError { error, response } => {
let response_message = match response {
Some(m) => m.to_string(),
None => "No response from the server".to_string(),
};
writeln!(f, "{}", error)?;
write!(f, "{}", response_message)?;

Ok(())
}
}
}
}

/// Implementation of `WebhookRequestError` designed to further describe the error.
/// In particular, we pass some calls to underyling `reqwest::Error` to provide more details.
impl WebhookRequestError {
pub fn is_timeout(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_timeout()
}
}
}

pub fn is_status(&self) -> bool {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.is_status()
}
}
}

pub fn status(&self) -> Option<http::StatusCode> {
match self {
WebhookRequestError::RetryableRequestError { error, .. }
| WebhookRequestError::NonRetryableRetryableRequestError { error, .. } => {
error.status()
}
}
}
}

impl From<&WebhookRequestError> for WebhookJobError {
fn from(error: &WebhookRequestError) -> Self {
if error.is_timeout() {
WebhookJobError::new_timeout(&error.to_string())
} else if error.is_status() {
WebhookJobError::new_http_status(
error.status().expect("status code is defined").into(),
&error.to_string(),
)
} else {
// Catch all other errors as `app_metrics::ErrorType::Connection` errors.
// Not all of `reqwest::Error` may strictly be connection errors, so our supported error types may need an extension
// depending on how strict error reporting has to be.
WebhookJobError::new_connection(&error.to_string())
}
}
}

/// Enumeration of errors related to initialization and consumption of webhook jobs.
Expand Down
1 change: 1 addition & 0 deletions hook-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod config;
pub mod error;
pub mod util;
pub mod worker;
35 changes: 35 additions & 0 deletions hook-worker/src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::error::WebhookResponseError;
use futures::StreamExt;
use reqwest::Response;

pub async fn first_n_bytes_of_response(
response: Response,
n: usize,
) -> Result<String, WebhookResponseError> {
let mut body = response.bytes_stream();
let mut buffer = String::with_capacity(n);

while let Some(chunk) = body.next().await {
if buffer.len() >= n {
break;
}

let chunk = chunk?;
let chunk_str = std::str::from_utf8(&chunk)?;
let upper_bound = std::cmp::min(n - buffer.len(), chunk_str.len());

if let Some(partial_chunk_str) = chunk_str.get(0..upper_bound) {
buffer.push_str(partial_chunk_str);
} else {
// For whatever reason we are out of bounds. We should never land here
// given the `std::cmp::min` usage, but I am being extra careful by not
// using a slice index that would panic instead.
return Err(WebhookResponseError::ChunkOutOfBoundsError(
chunk_str.len(),
upper_bound,
));
}
}

Ok(buffer)
}
Loading

0 comments on commit f7e02cc

Please sign in to comment.