Skip to content

Commit

Permalink
Implement better SSE message response
Browse files Browse the repository at this point in the history
  • Loading branch information
Antony1060 committed Jan 29, 2024
1 parent 8242664 commit 86126f5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
2 changes: 1 addition & 1 deletion server/src/models/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use utoipa::ToSchema;
use crate::models::error::ErrorResponse;
use crate::routes::profile_http_error_mapper;

#[derive(serde::Serialize)]
#[derive(Debug, serde::Serialize)]
#[serde(tag = "type")]
pub enum BulkResponse<Ok> {
#[serde(rename = "success")]
Expand Down
23 changes: 16 additions & 7 deletions server/src/routes/universal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use enstate_shared::core::error::ProfileError;
use enstate_shared::core::lookup_data::{LookupInfo, NameParseError};
use enstate_shared::core::{ENSService, Profile};
use futures::future::join_all;
use futures::TryFutureExt;
use serde::Deserialize;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::info;

use crate::models::bulk::{BulkResponse, ListResponse};
use crate::routes::{profile_http_error_mapper, validate_bulk_input, FreshQuery, Qs, RouteError};
Expand Down Expand Up @@ -91,7 +91,11 @@ pub async fn get_bulk(
Ok(Json(joined))
}

struct SSEResponse {}
#[derive(Debug, serde::Serialize)]
struct SSEResponse {
query: String,
response: BulkResponse<Profile>,
}

pub async fn get_bulk_sse(
Qs(query): Qs<UniversalGetBulkQuery>,
Expand All @@ -100,24 +104,29 @@ pub async fn get_bulk_sse(
// TODO:
let queries = validate_bulk_input(&query.queries, 10).unwrap();

info!("Got SSE request: {} entries", queries.len());

let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<Result<Event, Infallible>>();

for input in queries {
let state_clone = state.clone();
let event_tx_clone = event_tx.clone();
tokio::spawn(async move {
let profile = profile_from_lookup_guess(
LookupInfo::guess(input),
LookupInfo::guess(&input),
&state_clone.service,
query.fresh.fresh,
)
.await;
.await
.map_err(profile_http_error_mapper);

let json = match profile {
Ok(profile) => serde_json::to_string(&profile).expect("a"),
Err(err) => serde_json::to_string(&profile_http_error_mapper(err)).expect("b"),
let sse_response = SSEResponse {
query: input,
response: profile.into(),
};

let json = serde_json::to_string(&sse_response).expect("to_string should've succeeded");

event_tx_clone.send(Ok(Event::default().data(json)))
});
}
Expand Down

0 comments on commit 86126f5

Please sign in to comment.