Skip to content

Commit

Permalink
fix(grpc): show additional error info from expression (#1809)
Browse files Browse the repository at this point in the history
Co-authored-by: Tushar Mathur <[email protected]>
  • Loading branch information
meskill and tusharmath authored May 3, 2024
1 parent 4ca53f9 commit ca0a8bb
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 70 deletions.
7 changes: 5 additions & 2 deletions src/blueprint/into_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::sync::Arc;

use async_graphql::dynamic::{self, FieldFuture, FieldValue, SchemaBuilder};
use async_graphql::ErrorExtensions;
use async_graphql_value::ConstValue;
use futures_util::TryFutureExt;
use tracing::Instrument;
Expand Down Expand Up @@ -68,8 +69,10 @@ fn to_type(def: &Definition) -> dynamic::Type {
let ctx: ResolverContext = ctx.into();
let ctx = EvaluationContext::new(req_ctx, &ctx);

let const_value =
expr.eval(ctx, &Concurrent::Sequential).await?;
let const_value = expr
.eval(ctx, &Concurrent::Sequential)
.await
.map_err(|err| err.extend())?;
let p = match const_value {
ConstValue::List(a) => Some(FieldValue::list(a)),
ConstValue::Null => FieldValue::NONE,
Expand Down
3 changes: 2 additions & 1 deletion src/http/request_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::graphql::GraphqlDataLoader;
use crate::grpc;
use crate::grpc::data_loader::GrpcDataLoader;
use crate::http::{AppContext, DataLoaderRequest, HttpDataLoader};
use crate::lambda::EvaluationError;
use crate::runtime::TargetRuntime;

#[derive(Setters)]
Expand All @@ -33,7 +34,7 @@ pub struct RequestContext {
pub min_max_age: Arc<Mutex<Option<i32>>>,
pub cache_public: Arc<Mutex<Option<bool>>>,
pub runtime: TargetRuntime,
pub cache: AsyncCache<u64, ConstValue, String>,
pub cache: AsyncCache<u64, ConstValue, EvaluationError>,
}

impl RequestContext {
Expand Down
7 changes: 4 additions & 3 deletions src/lambda/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::num::NonZeroU64;
use std::ops::Deref;
use std::pin::Pin;

use anyhow::Result;
use async_graphql_value::ConstValue;

use super::{Concurrent, Eval, EvaluationContext, Expression, ResolverContextLike};
use super::{
Concurrent, Eval, EvaluationContext, EvaluationError, Expression, ResolverContextLike,
};

pub trait CacheKey<Ctx> {
fn cache_key(&self, ctx: &Ctx) -> u64;
Expand Down Expand Up @@ -39,7 +40,7 @@ impl Eval for Cache {
&'a self,
ctx: EvaluationContext<'a, Ctx>,
conc: &'a Concurrent,
) -> Pin<Box<dyn Future<Output = Result<ConstValue>> + 'a + Send>> {
) -> Pin<Box<dyn Future<Output = Result<ConstValue, EvaluationError>> + 'a + Send>> {
Box::pin(async move {
if let Expression::IO(io) = self.expr.deref() {
let key = io.cache_key(&ctx);
Expand Down
8 changes: 5 additions & 3 deletions src/lambda/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, StreamExt};

use super::EvaluationError;

///
/// Concurrent controls the concurrency of a fold or foreach operation on lists.
/// It's a flag that is set based on operators that are applied on a list.
Expand All @@ -19,7 +21,7 @@ impl Concurrent {
iter: impl Iterator<Item = F>,
acc: B,
f: impl Fn(B, A) -> anyhow::Result<B>,
) -> anyhow::Result<B>
) -> Result<B, EvaluationError>
where
F: Future<Output = A>,
{
Expand All @@ -46,9 +48,9 @@ impl Concurrent {
&self,
iter: impl Iterator<Item = F>,
f: impl Fn(A) -> B,
) -> anyhow::Result<Vec<B>>
) -> Result<Vec<B>, EvaluationError>
where
F: Future<Output = anyhow::Result<A>>,
F: Future<Output = Result<A, EvaluationError>>,
{
self.fold(iter, vec![], |mut acc, val| {
acc.push(f(val?));
Expand Down
6 changes: 2 additions & 4 deletions src/lambda/eval.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use core::future::Future;
use std::pin::Pin;

use anyhow::Result;

use super::{Concurrent, EvaluationContext, ResolverContextLike};
use super::{Concurrent, EvaluationContext, EvaluationError, ResolverContextLike};

pub trait Eval<Output = async_graphql::Value>
where
Expand All @@ -13,7 +11,7 @@ where
&'a self,
ctx: EvaluationContext<'a, Ctx>,
conc: &'a Concurrent,
) -> Pin<Box<dyn Future<Output = Result<Output>> + 'a + Send>>
) -> Pin<Box<dyn Future<Output = Result<Output, EvaluationError>> + 'a + Send>>
where
Output: 'a;
}
38 changes: 26 additions & 12 deletions src/lambda/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use async_graphql::ErrorExtensions;
use async_graphql_value::ConstValue;
use thiserror::Error;

use super::{Concurrent, Eval, EvaluationContext, ResolverContextLike, IO};
use crate::auth;
use crate::blueprint::DynamicValue;
use crate::json::JsonLike;
use crate::lambda::cache::Cache;
Expand Down Expand Up @@ -67,10 +67,19 @@ pub enum EvaluationError {
#[error("APIValidationError: {0:?}")]
APIValidationError(Vec<String>),

#[error("ExprEvalError: {0:?}")]
#[error("ExprEvalError: {0}")]
ExprEvalError(String),

#[error("DeserializeError: {0}")]
DeserializeError(String),

#[error("Authentication Failure: {0}")]
AuthError(auth::error::Error),
}

// TODO: remove conversion from anyhow and don't use anyhow to pass errors
// since it loses potentially valuable information that could be later provided
// in the error extensions
impl From<anyhow::Error> for EvaluationError {
fn from(value: anyhow::Error) -> Self {
match value.downcast::<EvaluationError>() {
Expand Down Expand Up @@ -99,19 +108,19 @@ impl ErrorExtensions for EvaluationError {
grpc_status_details,
} = self
{
e.set("grpc_code", *grpc_code);
e.set("grpc_description", grpc_description);
e.set("grpc_status_message", grpc_status_message);
e.set("grpc_status_details", grpc_status_details.clone());
e.set("grpcCode", *grpc_code);
e.set("grpcDescription", grpc_description);
e.set("grpcStatusMessage", grpc_status_message);
e.set("grpcStatusDetails", grpc_status_details.clone());
}
})
}
}

impl<'a> From<crate::valid::ValidationError<&'a str>> for EvaluationError {
fn from(_value: crate::valid::ValidationError<&'a str>) -> Self {
fn from(value: crate::valid::ValidationError<&'a str>) -> Self {
EvaluationError::APIValidationError(
_value
value
.as_vec()
.iter()
.map(|e| e.message.to_owned())
Expand All @@ -120,6 +129,12 @@ impl<'a> From<crate::valid::ValidationError<&'a str>> for EvaluationError {
}
}

impl From<auth::error::Error> for EvaluationError {
fn from(value: auth::error::Error) -> Self {
EvaluationError::AuthError(value)
}
}

impl Expression {
pub fn and_then(self, next: Self) -> Self {
Expression::Context(Context::PushArgs { expr: Box::new(self), and_then: Box::new(next) })
Expand All @@ -136,7 +151,7 @@ impl Eval for Expression {
&'a self,
ctx: EvaluationContext<'a, Ctx>,
conc: &'a Concurrent,
) -> Pin<Box<dyn Future<Output = Result<ConstValue>> + 'a + Send>> {
) -> Pin<Box<dyn Future<Output = Result<ConstValue, EvaluationError>> + 'a + Send>> {
Box::pin(async move {
match self {
Expression::Context(op) => match op {
Expand Down Expand Up @@ -165,14 +180,13 @@ impl Eval for Expression {
.unwrap_or(&async_graphql::Value::Null)
.clone())
}
Expression::Dynamic(value) => value.render_value(&ctx),
Expression::Dynamic(value) => Ok(value.render_value(&ctx)),
Expression::Protect(expr) => {
ctx.request_ctx
.auth_ctx
.validate(ctx.request_ctx)
.await
.to_result()
.map_err(|e| anyhow!("Authentication Failure: {}", e.to_string()))?;
.to_result()?;
expr.eval(ctx, conc).await
}
Expression::IO(operation) => operation.eval(ctx, conc).await,
Expand Down
34 changes: 14 additions & 20 deletions src/lambda/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use anyhow::Result;
use async_graphql::from_value;
use async_graphql_value::ConstValue;
use reqwest::Request;

Expand Down Expand Up @@ -49,20 +49,15 @@ impl Eval for IO {
&'a self,
ctx: super::EvaluationContext<'a, Ctx>,
_conc: &'a super::Concurrent,
) -> Pin<Box<dyn Future<Output = Result<ConstValue>> + 'a + Send>> {
) -> Pin<Box<dyn Future<Output = Result<ConstValue, EvaluationError>> + 'a + Send>> {
let key = self.cache_key(&ctx);
Box::pin(async move {
ctx.request_ctx
.cache
.get_or_eval(key, move || {
Box::pin(async {
self.eval_inner(ctx, _conc)
.await
.map_err(|err| err.to_string())
})
Box::pin(async { self.eval_inner(ctx, _conc).await })
})
.await
.map_err(|err| anyhow::anyhow!(err))
})
}
}
Expand All @@ -72,7 +67,7 @@ impl IO {
&'a self,
ctx: super::EvaluationContext<'a, Ctx>,
_conc: &'a super::Concurrent,
) -> Pin<Box<dyn Future<Output = Result<ConstValue>> + 'a + Send>> {
) -> Pin<Box<dyn Future<Output = Result<ConstValue, EvaluationError>> + 'a + Send>> {
Box::pin(async move {
match self {
IO::Http { req_template, dl_id, .. } => {
Expand Down Expand Up @@ -190,7 +185,7 @@ fn set_cookie_headers<'ctx, Ctx: ResolverContextLike<'ctx>>(
async fn execute_raw_request<'ctx, Ctx: ResolverContextLike<'ctx>>(
ctx: &EvaluationContext<'ctx, Ctx>,
req: Request,
) -> Result<Response<async_graphql::Value>> {
) -> Result<Response<async_graphql::Value>, EvaluationError> {
let response = ctx
.request_ctx
.runtime
Expand All @@ -207,12 +202,10 @@ async fn execute_raw_grpc_request<'ctx, Ctx: ResolverContextLike<'ctx>>(
ctx: &EvaluationContext<'ctx, Ctx>,
req: Request,
operation: &ProtobufOperation,
) -> Result<Response<async_graphql::Value>> {
Ok(
execute_grpc_request(&ctx.request_ctx.runtime, operation, req)
.await
.map_err(EvaluationError::from)?,
)
) -> Result<Response<async_graphql::Value>, EvaluationError> {
execute_grpc_request(&ctx.request_ctx.runtime, operation, req)
.await
.map_err(EvaluationError::from)
}

async fn execute_grpc_request_with_dl<
Expand All @@ -227,7 +220,7 @@ async fn execute_grpc_request_with_dl<
ctx: &EvaluationContext<'ctx, Ctx>,
rendered: RenderedRequestTemplate,
data_loader: Option<&DataLoader<grpc::DataLoaderRequest, Dl>>,
) -> Result<Response<async_graphql::Value>> {
) -> Result<Response<async_graphql::Value>, EvaluationError> {
let headers = ctx
.request_ctx
.upstream
Expand All @@ -253,7 +246,7 @@ async fn execute_request_with_dl<
ctx: &EvaluationContext<'ctx, Ctx>,
req: Request,
data_loader: Option<&DataLoader<DataLoaderRequest, Dl>>,
) -> Result<Response<async_graphql::Value>> {
) -> Result<Response<async_graphql::Value>, EvaluationError> {
let headers = ctx
.request_ctx
.upstream
Expand All @@ -275,8 +268,9 @@ fn parse_graphql_response<'ctx, Ctx: ResolverContextLike<'ctx>>(
ctx: &EvaluationContext<'ctx, Ctx>,
res: Response<async_graphql::Value>,
field_name: &str,
) -> Result<async_graphql::Value> {
let res: async_graphql::Response = serde_json::from_value(res.body.into_json()?)?;
) -> Result<async_graphql::Value, EvaluationError> {
let res: async_graphql::Response =
from_value(res.body).map_err(|err| EvaluationError::DeserializeError(err.to_string()))?;

for error in res.errors {
ctx.add_error(error);
Expand Down
Loading

1 comment on commit ca0a8bb

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 7.65ms 3.56ms 85.20ms 72.60%
Req/Sec 3.31k 257.54 3.69k 86.25%

395785 requests in 30.00s, 1.98GB read

Requests/sec: 13190.98

Transfer/sec: 67.70MB

Please sign in to comment.