Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dekkku committed Sep 12, 2024
1 parent c7111f2 commit 891eb02
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions projects/dekkku/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_graphql::*;
use async_graphql_poem::*;
use bytes::Bytes;
use dashmap::DashMap;
use futures::future;
use poem::{listener::TcpListener, web::Html, *};
use rand::seq::SliceRandom;
use reqwest::Client;
Expand All @@ -14,25 +15,23 @@ use std::{
time::Duration,
};
use tokio::sync::oneshot;
use tokio::task;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

const BASE_URL: &str = "http://localhost:3000";
const ALL_USERS: &str = "http://localhost:3000/users";
const ALL_POSTS: &str = "http://localhost:3000/posts";

struct Store {
posts: RwLock<HashMap<i32, Post>>,
users: RwLock<HashMap<i32, User>>,
is_dirty: RwLock<bool>,
}

impl Default for Store {
fn default() -> Self {
Self {
posts: RwLock::new(HashMap::new()),
users: RwLock::new(HashMap::new()),
is_dirty: RwLock::new(false),
}
}
}
Expand All @@ -55,7 +54,6 @@ impl Query {
.any(|field| field.name() == "user");

if should_query_user {
// TODO: clean this up.
let are_posts_same = {
let is_cache_empty = store.posts.read().unwrap().is_empty();
if is_cache_empty {
Expand Down Expand Up @@ -122,7 +120,6 @@ impl Query {
}
}
}
*store.is_dirty.write().unwrap() = !are_posts_same;
}

Ok(posts)
Expand All @@ -134,7 +131,16 @@ impl Query {
id: i32,
) -> std::result::Result<Option<Post>, async_graphql::Error> {
let loader = ctx.data_unchecked::<DataLoader<PostLoader>>();
let post = loader.load_one(id).await?;
let mut post = loader.load_one(id).await?;

// fetch the user as well.
// check if post is changed or not, if posts aren't changed then
if let Some(post_ref) = post.as_mut() {
let loader = ctx.data_unchecked::<DataLoader<UserLoader>>();
let user = loader.load_one(post_ref.user_id).await?;
post_ref.user = user;
}

Ok(post)
}

Expand Down Expand Up @@ -203,13 +209,36 @@ impl Loader<i32> for PostLoader {
&self,
keys: &[i32],
) -> std::result::Result<std::collections::HashMap<i32, Self::Value>, Self::Error> {
let client = &self.0;

// Create a vector to hold all the futures
let futures: Vec<_> = keys
.iter()
.map(|&id| {
let url = format!("{}/{}", ALL_POSTS, id);
let client = client.clone();
task::spawn(async move {
let post: Post = client.request(&url).await?;
Ok::<_, async_graphql::Error>((id, post))
})
})
.collect();

// Wait for all futures to complete
let results = future::join_all(futures).await;

// Collect results into a HashMap
let mut result = std::collections::HashMap::new();
// too slow, make it parallel.
for &id in keys {
let url = format!("{}/posts/{}", BASE_URL, id);
let post: Post = self.0.request(&url).await?;
result.insert(id, post);
for task_result in results {
match task_result {
Ok(Ok((id, post))) => {
result.insert(id, post);
}
Ok(Err(e)) => return Err(e),
Err(e) => return Err(async_graphql::Error::new(format!("Task join error: {}", e))),
}
}

Ok(result)
}
}
Expand All @@ -228,7 +257,7 @@ impl Loader<i32> for UserLoader {
.map(|id| format!("id={}", id))
.collect::<Vec<_>>()
.join("&");
let url = format!("{}/users?{}", BASE_URL, qp);
let url = format!("{}?{}", ALL_USERS, qp);
let users: Vec<User> = self.0.request(&url).await?;
for user in users {
result.insert(user.id, user);
Expand Down

0 comments on commit 891eb02

Please sign in to comment.