Skip to content

Commit

Permalink
feat: add dedupe in Upstream (#1778)
Browse files Browse the repository at this point in the history
Co-authored-by: Amit Singh <[email protected]>
Co-authored-by: Tushar Mathur <[email protected]>
  • Loading branch information
3 people authored May 3, 2024
1 parent ca0a8bb commit b534fce
Show file tree
Hide file tree
Showing 25 changed files with 496 additions and 20 deletions.
5 changes: 5 additions & 0 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ directive @upstream(
"""
connectTimeout: Int
"""
When set to `true`, it will ensure no HTTP, GRPC, or any other IO call is made more
than once within the context of a single GraphQL request.
"""
dedupe: Boolean!
"""
The `http2Only` setting allows you to specify whether the client should always issue
HTTP2 requests, without checking if the server supports it or not. By default it
is set to `false` for all HTTP requests made by the server, but is automatically
Expand Down
4 changes: 4 additions & 0 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,10 @@
"format": "uint64",
"minimum": 0.0
},
"dedupe": {
"description": "When set to `true`, it will ensure no HTTP, GRPC, or any other IO call is made more than once within the context of a single GraphQL request.",
"type": "boolean"
},
"http2Only": {
"description": "The `http2Only` setting allows you to specify whether the client should always issue HTTP2 requests, without checking if the server supports it or not. By default it is set to `false` for all HTTP requests made by the server, but is automatically set to true for GRPC.",
"type": [
Expand Down
20 changes: 15 additions & 5 deletions src/async_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct AsyncCache<Key, Value, Error> {

#[derive(Clone)]
pub enum CacheValue<Value, Error> {
Pending(Sender<Result<Value, Error>>),
Ready(Result<Value, Error>),
Pending(Sender<Arc<Result<Value, Error>>>),
Ready(Arc<Result<Value, Error>>),
}

impl<Key: Eq + Hash + Send + Clone, Value: Debug + Clone + Send, Error: Debug + Clone + Send>
Expand All @@ -41,7 +41,7 @@ impl<Key: Eq + Hash + Send + Clone, Value: Debug + Clone + Send, Error: Debug +
&self,
key: Key,
or_else: impl FnOnce() -> Pin<Box<dyn Future<Output = Result<Value, Error>> + 'a + Send>> + Send,
) -> Result<Value, Error> {
) -> Arc<Result<Value, Error>> {
if let Some(cache_value) = self.get_cache_value(&key) {
match cache_value {
CacheValue::Pending(tx) => tx.subscribe().recv().await.unwrap(),
Expand All @@ -53,7 +53,7 @@ impl<Key: Eq + Hash + Send + Clone, Value: Debug + Clone + Send, Error: Debug +
.write()
.unwrap()
.insert(key.clone(), CacheValue::Pending(tx.clone()));
let result = or_else().await;
let result = Arc::new(or_else().await);
let mut guard = self.cache.write().unwrap();
if let Some(cache_value) = guard.get_mut(&key) {
*cache_value = CacheValue::Ready(result.clone())
Expand All @@ -76,6 +76,8 @@ mod tests {
let actual = cache
.get_or_eval(1, || Box::pin(async { Ok(1) }))
.await
.as_ref()
.clone()
.unwrap();
assert_eq!(actual, 1);
}
Expand All @@ -86,11 +88,15 @@ mod tests {
cache
.get_or_eval(1, || Box::pin(async { Ok(1) }))
.await
.as_ref()
.clone()
.unwrap();

let actual = cache
.get_or_eval(1, || Box::pin(async { Ok(2) }))
.await
.as_ref()
.clone()
.unwrap();
assert_eq!(actual, 1);
}
Expand All @@ -103,12 +109,16 @@ mod tests {
cache
.get_or_eval(1, || Box::pin(async move { Ok(i) }))
.await
.as_ref()
.clone()
.unwrap();
}

let actual = cache
.get_or_eval(1, || Box::pin(async { Ok(2) }))
.await
.as_ref()
.clone()
.unwrap();
assert_eq!(actual, 0);
}
Expand Down Expand Up @@ -155,7 +165,7 @@ mod tests {
let results: Vec<_> = futures_util::future::join_all(handles)
.await
.into_iter()
.map(|res| res.unwrap().unwrap()) // Unwrap the Result from the join, and the Result from get_or_eval
.map(|res| res.unwrap().as_ref().clone().unwrap()) // Unwrap the Result from the join, and the Result from get_or_eval
.collect();

// Check that all tasks received the correct value.
Expand Down
4 changes: 4 additions & 0 deletions src/blueprint/blueprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ impl Type {
Type::ListType { non_null, .. } => *non_null,
}
}
/// checks if the type is a list
pub fn is_list(&self) -> bool {
matches!(self, Type::ListType { .. })
}
}

#[derive(Clone, Debug)]
Expand Down
2 changes: 2 additions & 0 deletions src/blueprint/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Upstream {
pub http_cache: bool,
pub batch: Option<Batch>,
pub http2_only: bool,
pub dedupe: bool,
}

impl Upstream {
Expand Down Expand Up @@ -78,6 +79,7 @@ impl TryFrom<&ConfigModule> for Upstream {
http_cache: (config_upstream).get_enable_http_cache(),
batch,
http2_only: (config_upstream).get_http_2_only(),
dedupe: (config_upstream).get_dedupe(),
})
.to_result()
}
Expand Down
9 changes: 9 additions & 0 deletions src/config/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ pub struct Upstream {
/// The User-Agent header value to be used in HTTP requests. @default
/// `Tailcall/1.0`
pub user_agent: Option<String>,

#[serde(default, skip_serializing_if = "is_default")]
/// When set to `true`, it will ensure no HTTP, GRPC, or any other IO call
/// is made more than once within the context of a single GraphQL request.
pub dedupe: bool,
}

impl Upstream {
Expand Down Expand Up @@ -186,6 +191,10 @@ impl Upstream {
pub fn get_http_2_only(&self) -> bool {
self.http2_only.unwrap_or(false)
}

pub fn get_dedupe(&self) -> bool {
self.dedupe
}
}

#[cfg(test)]
Expand Down
24 changes: 15 additions & 9 deletions src/lambda/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,21 @@ impl Eval for IO {
ctx: super::EvaluationContext<'a, Ctx>,
_conc: &'a super::Concurrent,
) -> Pin<Box<dyn Future<Output = Result<ConstValue, EvaluationError>> + 'a + Send>> {
let key = self.cache_key(&ctx);
Box::pin(async move {
ctx.request_ctx
.cache
.get_or_eval(key, move || {
Box::pin(async { self.eval_inner(ctx, _conc).await })
})
.await
})
if ctx.request_ctx.upstream.dedupe {
Box::pin(async move {
let key = self.cache_key(&ctx);
ctx.request_ctx
.cache
.get_or_eval(key, move || {
Box::pin(async { self.eval_inner(ctx, _conc).await })
})
.await
.as_ref()
.clone()
})
} else {
Box::pin(self.eval_inner(ctx, _conc))
}
}
}

Expand Down
19 changes: 19 additions & 0 deletions tests/core/snapshots/async-cache-disabled.md_0.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
source: tests/core/spec.rs
expression: response
---
{
"status": 200,
"headers": {
"content-type": "application/json"
},
"body": {
"data": {
"posts": {
"user": {
"name": "Leanne Graham"
}
}
}
}
}
36 changes: 36 additions & 0 deletions tests/core/snapshots/async-cache-disabled.md_client.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
source: tests/core/spec.rs
expression: client
---
scalar Date

scalar Email

scalar Empty

scalar JSON

scalar PhoneNumber

type Post {
body: String
id: Int
title: String
user: User
userId: Int!
}

type Query {
posts: Post
}

scalar Url

type User {
id: Int
name: String
}

schema {
query: Query
}
24 changes: 24 additions & 0 deletions tests/core/snapshots/async-cache-disabled.md_merged.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
source: tests/core/spec.rs
expression: merged
---
schema @server(port: 8000, queryValidation: false) @upstream(baseURL: "http://jsonplaceholder.typicode.com") {
query: Query
}

type Post {
body: String
id: Int
title: String
user: User @http(path: "/users/{{.value.userId}}")
userId: Int!
}

type Query {
posts: Post @http(path: "/post?id=1")
}

type User {
id: Int
name: String
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
source: tests/core/spec.rs
expression: response
---
{
"status": 200,
"headers": {
"content-type": "application/json"
},
"body": {
"data": {
"posts": [
{
"user": {
"name": "Leanne Graham"
},
"taggedUsers": [
{},
{}
]
},
{
"user": {
"name": "Leanne Graham"
},
"taggedUsers": [
{},
{}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
---
source: tests/core/spec.rs
expression: client
---
scalar Date

scalar Email

scalar Empty

scalar JSON

scalar PhoneNumber

type Post {
body: String
id: Int!
taggedUsers: [User]
title: String
user: User
userId: Int!
}

type Query {
posts: [Post]
}

scalar Url

type User {
id: Int
name: String
}

schema {
query: Query
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: tests/core/spec.rs
expression: merged
---
schema @server(port: 8000, queryValidation: false) @upstream(baseURL: "http://jsonplaceholder.typicode.com", dedupe: true) {
query: Query
}

type Post {
body: String
id: Int!
taggedUsers: [User] @http(path: "/taggedUsers/{{.value.id}}")
title: String
user: User @http(path: "/users/{{.value.userId}}")
userId: Int!
}

type Query {
posts: [Post] @http(path: "/posts?id=1")
}

type User {
id: Int
name: String
}
26 changes: 26 additions & 0 deletions tests/core/snapshots/async-cache-enabled.md_0.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
source: tests/core/spec.rs
expression: response
---
{
"status": 200,
"headers": {
"content-type": "application/json"
},
"body": {
"data": {
"posts": [
{
"user": {
"name": "Leanne Graham"
}
},
{
"user": {
"name": "Leanne Graham"
}
}
]
}
}
}
Loading

0 comments on commit b534fce

Please sign in to comment.