Skip to content

Commit

Permalink
chore: separate handlers for different modes (#937)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikhil Sinha <[email protected]>
  • Loading branch information
parmesant and nikhilsinhaparseable authored Oct 1, 2024
1 parent 32412bf commit 5d7cd68
Show file tree
Hide file tree
Showing 24 changed files with 1,844 additions and 1,085 deletions.
52 changes: 6 additions & 46 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);

pub async fn sync_cache_with_ingestors(
url: &str,
ingestor: IngestorMetadata,
body: bool,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let request_body: Bytes = Bytes::from(body.to_string());
let client = reqwest::Client::new();
let resp = client
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(request_body)
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to set cache: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to set cache: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await
);
}

Ok(())
}

// forward the create/update stream request to all ingestors to keep them in sync
pub async fn sync_streams_with_ingestors(
headers: HeaderMap,
Expand All @@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors(
continue;
}
let url = format!(
"{}{}/logstream/{}",
"{}{}/logstream/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
Expand Down Expand Up @@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors(
continue;
}
let url = format!(
"{}{}/user/{}/role",
"{}{}/user/{}/role/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(),
continue;
}
let url = format!(
"{}{}/user/{}",
"{}{}/user/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors(
continue;
}
let url = format!(
"{}{}/user/{}",
"{}{}/user/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
continue;
}
let url = format!(
"{}{}/user/{}/generate-new-password",
"{}{}/user/{}/generate-new-password/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
Expand Down Expand Up @@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors(
continue;
}
let url = format!(
"{}{}/role/{}",
"{}{}/role/{}/sync",
ingestor.domain_name,
base_path_without_preceding_slash(),
name
Expand Down
Loading

0 comments on commit 5d7cd68

Please sign in to comment.