Skip to content

Commit

Permalink
Merge branch 'main' into renovate/shadow-rs-0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
chriswk authored Dec 19, 2024
2 parents 362ce27 + 9fcdbff commit b13f1e5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 33 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 5 additions & 9 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,16 @@ pub async fn get_features(
#[get("/streaming")]
pub async fn stream_features(
edge_token: EdgeToken,
broadcaster: Data<Broadcaster>,
token_cache: Data<DashMap<String, EdgeToken>>,
filter_query: Query<FeatureFilters>,
req: HttpRequest,
) -> EdgeResult<impl Responder> {
let (validated_token, _filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;
match req.app_data::<Data<Broadcaster>>() {
Some(broadcaster) => {
broadcaster
.connect(validated_token, filter_query, query)
.await
}
_ => Err(EdgeError::ClientCacheError),
}

broadcaster
.connect(validated_token, filter_query, query)
.await
}

#[utoipa::path(
Expand Down
20 changes: 11 additions & 9 deletions server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,28 @@ pub enum EdgeError {
ClientFeaturesParseError(String),
ClientHydrationFailed(String),
ClientRegisterError,
FrontendNotYetHydrated(FrontendHydrationMissing),
FrontendExpectedToBeHydrated(String),
FeatureNotFound(String),
PersistenceError(String),
ContextParseError,
EdgeMetricsError,
EdgeMetricsRequestError(reqwest::StatusCode, Option<UnleashBadRequest>),
EdgeTokenError,
EdgeTokenParseError,
FeatureNotFound(String),
FrontendExpectedToBeHydrated(String),
FrontendNotYetHydrated(FrontendHydrationMissing),
HealthCheckError(String),
InvalidBackupFile(String, String),
InvalidServerUrl(String),
InvalidTokenWithStrictBehavior,
HealthCheckError(String),
JsonParseError(String),
NoFeaturesFile,
NoTokenProvider,
NoTokens(String),
NotReady,
PersistenceError(String),
ReadyCheckError(String),
SseError(String),
TlsError,
TokenParseError(String),
ContextParseError,
TokenValidationError(reqwest::StatusCode),
}

Expand Down Expand Up @@ -210,6 +211,7 @@ impl Display for EdgeError {
write!(f, "Edge is not ready to serve requests")
}
EdgeError::InvalidTokenWithStrictBehavior => write!(f, "Edge is running with strict behavior and the token is not subsumed by any registered tokens"),
EdgeError::SseError(message) => write!(f, "{}", message),
}
}
}
Expand Down Expand Up @@ -250,6 +252,7 @@ impl ResponseError for EdgeError {
EdgeError::FrontendExpectedToBeHydrated(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::NotReady => StatusCode::SERVICE_UNAVAILABLE,
EdgeError::InvalidTokenWithStrictBehavior => StatusCode::FORBIDDEN,
EdgeError::SseError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down Expand Up @@ -294,9 +297,8 @@ impl From<serde_json::Error> for EdgeError {
}

impl From<SendError<Event>> for EdgeError {
// todo: create better enum representation. use this is placeholder
fn from(_value: SendError<Event>) -> Self {
EdgeError::TlsError
fn from(value: SendError<Event>) -> Self {
EdgeError::SseError(value.to_string())
}
}

Expand Down
18 changes: 13 additions & 5 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use actix_web_lab::{
};
use dashmap::DashMap;
use futures::future;
use prometheus::{register_int_gauge, IntGauge};
use serde::Serialize;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -51,6 +52,14 @@ pub struct Broadcaster {
features_cache: Arc<FeatureCache>,
}

lazy_static::lazy_static! {
pub static ref CONNECTED_STREAMING_CLIENTS: IntGauge = register_int_gauge!(
"connected_streaming_clients",
"Number of connected streaming clients",
)
.unwrap();
}

impl Broadcaster {
/// Constructs new broadcaster and spawns ping loop.
pub fn new(features: Arc<FeatureCache>) -> Arc<Self> {
Expand Down Expand Up @@ -90,6 +99,7 @@ impl Broadcaster {

/// Removes all non-responsive clients from broadcast list.
async fn heartbeat(&self) {
let mut active_connections = 0i64;
for mut group in self.active_connections.iter_mut() {
let mut ok_clients = Vec::new();

Expand All @@ -103,11 +113,10 @@ impl Broadcaster {
}
}

// validate tokens here?
// ok_clients.iter().filter(|client| client.token_is_valid())

active_connections += ok_clients.len() as i64;
group.clients = ok_clients;
}
CONNECTED_STREAMING_CLIENTS.set(active_connections)
}

/// Registers client with broadcaster, returning an SSE response body.
Expand Down Expand Up @@ -140,7 +149,6 @@ impl Broadcaster {
filter_set,
token,
});

Ok(Sse::from_infallible_receiver(rx))
}

Expand Down Expand Up @@ -181,7 +189,7 @@ impl Broadcaster {
// 1. We'll only allow streaming in strict mode
// 2. We'll check whether the token is subsumed *before* trying to add it to the broadcaster
// If both of these are true, then we should never hit this case (if Thomas's understanding is correct).
None => Err(EdgeError::ClientCacheError),
None => Err(EdgeError::AuthorizationDenied),
}
}

Expand Down
5 changes: 5 additions & 0 deletions server/src/prom_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ fn register_custom_metrics(registry: &prometheus::Registry) {
crate::metrics::client_metrics::FEATURE_TOGGLE_USAGE_TOTAL.clone(),
))
.unwrap();
registry
.register(Box::new(
crate::http::broadcaster::CONNECTED_STREAMING_CLIENTS.clone(),
))
.unwrap();
}

#[cfg(test)]
Expand Down

0 comments on commit b13f1e5

Please sign in to comment.