Skip to content

Commit

Permalink
Merge pull request #4388 from systeminit/fnichol/naxum-updates
Browse files Browse the repository at this point in the history
feat(naxum): implement Json extractor
  • Loading branch information
fnichol authored Aug 21, 2024
2 parents f112126 + 6f33539 commit f3abbfe
Show file tree
Hide file tree
Showing 12 changed files with 1,229 additions and 778 deletions.
251 changes: 143 additions & 108 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ self-replace = "1.3.7"
serde = { version = "1.0.197", features = ["derive", "rc"] }
serde-aux = "4.5.0"
serde_json = { version = "1.0.115", features = ["preserve_order"] }
serde_path_to_error = { version = "0.1.16" }
serde_url_params = "0.2.1"
serde_with = "3.7.0"
serde_yaml = "0.9.33" # NOTE(nick): this has been archived upstream
Expand Down
3 changes: 3 additions & 0 deletions lib/naxum/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ rust_library(
"//third-party/rust:bytes",
"//third-party/rust:futures",
"//third-party/rust:pin-project-lite",
"//third-party/rust:serde",
"//third-party/rust:serde_json",
"//third-party/rust:serde_path_to_error",
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
"//third-party/rust:tower",
Expand Down
3 changes: 3 additions & 0 deletions lib/naxum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true } # NOTE: if extracted this can be `futures-util`
pin-project-lite = { workspace = true }
serde = { workspace = true } # NOTE: if extracted, this is used for a json feature
serde_json = { workspace = true } # NOTE: if extracted, this is used for a json feature
serde_path_to_error = { workspace = true } # NOTE: if extracted, this is used for a json feature
tokio = { workspace = true }
tokio-util = { workspace = true }
tower = { workspace = true }
Expand Down
33 changes: 32 additions & 1 deletion lib/naxum/src/extract/rejection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{error, fmt};

use crate::{
response::{IntoResponse, Response},
BoxError, Error,
BoxError, Error, __composite_rejection as composite_rejection,
__define_rejection as define_rejection,
};

#[derive(Debug)]
Expand Down Expand Up @@ -70,3 +71,33 @@ impl error::Error for StringRejection {
}
}
}

define_rejection! {
#[status_code = 422]
#[body = "Failed to deserialize the JSON body into the target type"]
/// Rejection type for [`Json`].
///
/// This rejection is used if the message body is syntactically valid JSON but couldn't be
/// deserialized into the target type.
pub struct JsonDataError(Error);
}

define_rejection! {
#[status_code = 400]
#[body = "Failed to parse the message body as JSON"]
/// Rejection type for [`Json`].
///
/// This rejection is used if the message body didn't contain syntactically valid JSON.
pub struct JsonSyntaxError(Error);
}

composite_rejection! {
/// Rejection type for [`Json`].
///
/// Contains one vaiant for each way the [`Json`] extractor can fail.
pub enum JsonRejection {
JsonDataError,
JsonSyntaxError,
// MissingJsonContentType,
}
}
65 changes: 65 additions & 0 deletions lib/naxum/src/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use async_trait::async_trait;
use bytes::Bytes;
use serde::de::DeserializeOwned;

use crate::{
extract::{
rejection::{JsonDataError, JsonRejection, JsonSyntaxError},
FromMessage,
},
MessageHead,
};

#[derive(Clone, Copy, Default, Debug)]
#[must_use]
pub struct Json<T>(pub T);

#[async_trait]
impl<T, S, R> FromMessage<S, R> for Json<T>
where
T: DeserializeOwned,
R: MessageHead + Send + 'static,
S: Send + Sync,
{
type Rejection = JsonRejection;

async fn from_message(req: R, state: &S) -> Result<Self, Self::Rejection> {
let bytes = Bytes::from_message(req, state)
.await
.expect("from_message is infallible");
Self::from_bytes(&bytes)
}
}

impl<T> Json<T>
where
T: DeserializeOwned,
{
pub fn from_bytes(bytes: &[u8]) -> Result<Self, JsonRejection> {
let deserializer = &mut serde_json::Deserializer::from_slice(bytes);

let value = match serde_path_to_error::deserialize(deserializer) {
Ok(value) => value,
Err(err) => {
let rejection = match err.inner().classify() {
serde_json::error::Category::Data => JsonDataError::from_err(err).into(),
serde_json::error::Category::Syntax | serde_json::error::Category::Eof => {
JsonSyntaxError::from_err(err).into()
}
serde_json::error::Category::Io => {
if cfg!(debug_assertions) {
// We don't use `serde_json::from_reader` and instead always buffer
// bodies first, so we shouldn't encounter any IO errors
unreachable!()
} else {
JsonSyntaxError::from_err(err).into()
}
}
};
return Err(rejection);
}
};

Ok(Json(value))
}
}
2 changes: 2 additions & 0 deletions lib/naxum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod error;
pub mod error_handling;
pub mod extract;
pub mod handler;
mod json;
mod make_service;
mod message;
pub mod middleware;
Expand All @@ -15,6 +16,7 @@ mod service_ext;

pub use self::cancellation::wait_on_cancelled;
pub use self::error::Error;
pub use self::json::Json;
pub use self::make_service::IntoMakeService;
pub use self::message::{Head, MessageHead};
pub use self::serve::serve;
Expand Down
212 changes: 212 additions & 0 deletions lib/naxum/src/macros.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,215 @@
/// Private API.
#[doc(hidden)]
#[macro_export]
macro_rules! __log_rejection {
(
rejection_type = $ty:ident,
body_text = $body_text:expr,
status = $status:expr,
) => {
tracing::event!(
target: "naxum::rejection",
tracing::Level::TRACE,
status = $status.as_u16(),
body = $body_text,
rejection_type = std::any::type_name::<$ty>(),
"rejecting message",
);
};
}

/// Private API.
#[doc(hidden)]
#[macro_export]
macro_rules! __define_rejection {
(
#[status_code = $status_code:expr]
#[body = $body:expr]
$(#[$m:meta])*
pub struct $name:ident;
) => {
$(#[$m])*
#[derive(Debug)]
#[non_exhaustive]
pub struct $name;

impl $crate::response::IntoResponse for $name {
fn into_response(self) -> $crate::response::Response {
let status = self.status();
$crate::__log_rejection!(
rejection_type = $name,
body_text = $body,
status = status,
);
(status, $body).into_response()
}
}

impl $name {
/// Get the response body text used for this rejection.
pub fn body_text(&self) -> String {
$body.into()
}

/// Get the status code used for this rejection.
pub fn status(&self) -> ::async_nats::StatusCode {
::async_nats::StatusCode::from_u16($status_code).expect("status code is valid")
}
}

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", $body)
}
}

impl std::error::Error for $name {}

impl Default for $name {
fn default() -> Self {
Self
}
}
};

(
#[status_code = $status_code:expr]
#[body = $body:expr]
$(#[$m:meta])*
pub struct $name:ident(Error);
) => {
$(#[$m])*
#[derive(Debug)]
pub struct $name(pub(crate) $crate::Error);

impl $name {
pub(crate) fn from_err<E>(err: E) -> Self
where
E: Into<$crate::BoxError>,
{
Self($crate::Error::new(err))
}
}

impl $crate::response::IntoResponse for $name {
fn into_response(self) -> $crate::response::Response {
let status = self.status();
$crate::__log_rejection!(
rejection_type = $name,
body_text = $body,
status = status,
);
(status, $body).into_response()
}
}

impl $name {
/// Get the response body text used for this rejection.
pub fn body_text(&self) -> String {
$body.into()
}

/// Get the status code used for this rejection.
pub fn status(&self) -> ::async_nats::StatusCode {
::async_nats::StatusCode::from_u16($status_code).expect("status code is valid")
}
}

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", $body)
}
}

impl std::error::Error for $name {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}
};
}

/// Private API.
#[doc(hidden)]
#[macro_export]
macro_rules! __composite_rejection {
(
$(#[$m:meta])*
pub enum $name:ident {
$($variant:ident),+
$(,)?
}
) => {
$(#[$m])*
#[derive(Debug)]
#[non_exhaustive]
pub enum $name {
$(
#[allow(missing_docs)]
$variant($variant)
),+
}

impl $crate::response::IntoResponse for $name {
fn into_response(self) -> $crate::response::Response {
match self {
$(
Self::$variant(inner) => inner.into_response(),
)+
}
}
}

impl $name {
/// Get the response body text used for this rejection.
pub fn body_text(&self) -> String {
match self {
$(
Self::$variant(inner) => inner.body_text(),
)+
}
}

/// Get the status code used for this rejection.
pub fn status(&self) -> ::async_nats::StatusCode {
match self {
$(
Self::$variant(inner) => inner.status(),
)+
}
}
}

$(
impl From<$variant> for $name {
fn from(inner: $variant) -> Self {
Self::$variant(inner)
}
}
)+

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
$(
Self::$variant(inner) => write!(f, "{inner}"),
)+
}
}
}

impl std::error::Error for $name {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
$(
Self::$variant(inner) => inner.source(),
)+
}
}
}
};
}

#[rustfmt::skip]
macro_rules! all_the_tuples {
($name:ident) => {
Expand Down
8 changes: 4 additions & 4 deletions lib/naxum/src/middleware/ack/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ where

match result {
Ok(response) => {
if response.status().is_server_error() {
// Transition the state to run the failure case
*this.state = State::Failure(Some(response));
} else {
if response.status().is_success() {
// Transition the state to run the success case
*this.state = State::Success(Some(response));
} else {
// Transition the state to run the failure case
*this.state = State::Failure(Some(response));
}
}
Err(err) => {
Expand Down
Loading

0 comments on commit f3abbfe

Please sign in to comment.