From 891eb028a8ca47de6704394b9e9a8a409b03472d Mon Sep 17 00:00:00 2001 From: dekkku Date: Fri, 13 Sep 2024 02:43:55 +0530 Subject: [PATCH] bug fixes --- projects/dekkku/src/main.rs | 53 ++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/projects/dekkku/src/main.rs b/projects/dekkku/src/main.rs index 966c393..655f566 100644 --- a/projects/dekkku/src/main.rs +++ b/projects/dekkku/src/main.rs @@ -4,6 +4,7 @@ use async_graphql::*; use async_graphql_poem::*; use bytes::Bytes; use dashmap::DashMap; +use futures::future; use poem::{listener::TcpListener, web::Html, *}; use rand::seq::SliceRandom; use reqwest::Client; @@ -14,17 +15,16 @@ use std::{ time::Duration, }; use tokio::sync::oneshot; +use tokio::task; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; -const BASE_URL: &str = "http://localhost:3000"; const ALL_USERS: &str = "http://localhost:3000/users"; const ALL_POSTS: &str = "http://localhost:3000/posts"; struct Store { posts: RwLock>, users: RwLock>, - is_dirty: RwLock, } impl Default for Store { @@ -32,7 +32,6 @@ impl Default for Store { Self { posts: RwLock::new(HashMap::new()), users: RwLock::new(HashMap::new()), - is_dirty: RwLock::new(false), } } } @@ -55,7 +54,6 @@ impl Query { .any(|field| field.name() == "user"); if should_query_user { - // TODO: clean this up. let are_posts_same = { let is_cache_empty = store.posts.read().unwrap().is_empty(); if is_cache_empty { @@ -122,7 +120,6 @@ impl Query { } } } - *store.is_dirty.write().unwrap() = !are_posts_same; } Ok(posts) @@ -134,7 +131,16 @@ impl Query { id: i32, ) -> std::result::Result, async_graphql::Error> { let loader = ctx.data_unchecked::>(); - let post = loader.load_one(id).await?; + let mut post = loader.load_one(id).await?; + + // fetch the user as well. + // check if post is changed or not, if posts aren't changed then + if let Some(post_ref) = post.as_mut() { + let loader = ctx.data_unchecked::>(); + let user = loader.load_one(post_ref.user_id).await?; + post_ref.user = user; + } + Ok(post) } @@ -203,13 +209,36 @@ impl Loader for PostLoader { &self, keys: &[i32], ) -> std::result::Result, Self::Error> { + let client = &self.0; + + // Create a vector to hold all the futures + let futures: Vec<_> = keys + .iter() + .map(|&id| { + let url = format!("{}/{}", ALL_POSTS, id); + let client = client.clone(); + task::spawn(async move { + let post: Post = client.request(&url).await?; + Ok::<_, async_graphql::Error>((id, post)) + }) + }) + .collect(); + + // Wait for all futures to complete + let results = future::join_all(futures).await; + + // Collect results into a HashMap let mut result = std::collections::HashMap::new(); - // too slow, make it parallel. - for &id in keys { - let url = format!("{}/posts/{}", BASE_URL, id); - let post: Post = self.0.request(&url).await?; - result.insert(id, post); + for task_result in results { + match task_result { + Ok(Ok((id, post))) => { + result.insert(id, post); + } + Ok(Err(e)) => return Err(e), + Err(e) => return Err(async_graphql::Error::new(format!("Task join error: {}", e))), + } } + Ok(result) } } @@ -228,7 +257,7 @@ impl Loader for UserLoader { .map(|id| format!("id={}", id)) .collect::>() .join("&"); - let url = format!("{}/users?{}", BASE_URL, qp); + let url = format!("{}?{}", ALL_USERS, qp); let users: Vec = self.0.request(&url).await?; for user in users { result.insert(user.id, user);