Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: store less information per streaming client #618

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ pub async fn stream_features(
let (validated_token, _filter_set, query) =
get_feature_filter(&edge_token, &token_cache, filter_query.clone())?;

broadcaster
.connect(validated_token, filter_query, query)
.await
broadcaster.connect(validated_token, query).await
}

#[utoipa::path(
Expand Down
155 changes: 92 additions & 63 deletions server/src/http/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,70 @@
use std::{
hash::{Hash, Hasher},
sync::Arc,
time::Duration,
};
use std::{hash::Hash, sync::Arc, time::Duration};

use actix_web::{
rt::time::interval,
web::{Json, Query},
};
use actix_web::{rt::time::interval, web::Json};
use actix_web_lab::{
sse::{self, Event, Sse},
util::InfallibleStream,
};
use dashmap::DashMap;
use futures::future;
use prometheus::{register_int_gauge, IntGauge};
use serde::Serialize;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, warn};
use unleash_types::client_features::{ClientFeatures, Query as FlagQuery};
use unleash_types::client_features::{ClientFeatures, Query};

use crate::{
error::EdgeError,
feature_cache::FeatureCache,
filters::{filter_client_features, name_prefix_filter, project_filter, FeatureFilterSet},
tokens::cache_key,
types::{EdgeJsonResult, EdgeResult, EdgeToken, FeatureFilters},
filters::{filter_client_features, name_prefix_filter, FeatureFilter, FeatureFilterSet},
types::{EdgeJsonResult, EdgeResult, EdgeToken},
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct QueryWrapper {
query: FlagQuery,
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct StreamingQuery {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this struct, turns out we don't need Query to impl Hash anyway 😅

pub projects: Vec<String>,
pub name_prefix: Option<String>,
pub environment: String,
}

impl Hash for QueryWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
serde_json::to_string(&self.query).unwrap().hash(state);
impl From<StreamingQuery> for Query {
fn from(value: StreamingQuery) -> Self {
Self {
tags: None,
name_prefix: value.name_prefix,
environment: Some(value.environment),
inline_segment_constraints: Some(false),
projects: Some(value.projects),
}
}
}

Comment on lines +30 to +40
Copy link
Contributor Author

@thomasheartman thomasheartman Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use inline_segment_constraints and tags in the actual feature resolution.

I've set inline_segment_constraints to false because that indicates that the SDK supports resolution and if the SDK supports streaming, I think we can assume that it supports segment resolution. This is also consistent with it being hardcoded to Some(false) in client_api.rs.

As for tags: Edge doesn't support tag queries in general. So even if you were to make a tag query, you'll always get "None" back (this is also true for the existing client/features endpoint).

impl From<(&Query, &EdgeToken)> for StreamingQuery {
fn from((query, token): (&Query, &EdgeToken)) -> Self {
Self {
projects: token.projects.clone(),
name_prefix: query.name_prefix.clone(),
environment: match token.environment {
Some(ref env) => env.clone(),
None => token.token.clone(),
},
Comment on lines +47 to +49
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a bit weird, but it mirrors how we get the cache key in server/src/tokens.rs

}
}
Comment on lines +42 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementing From for a tuple feels kinda weird. This might be better as a normal function?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine in my opinion

}

#[derive(Clone, Debug)]
struct ClientData {
token: String,
sender: mpsc::Sender<sse::Event>,
}

Comment on lines +56 to +59
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each client connection now has an associated token, so that if a token expires, we can cut them loose.

#[derive(Clone, Debug)]
struct ClientGroup {
clients: Vec<mpsc::Sender<sse::Event>>,
filter_set: Query<FeatureFilters>,
token: EdgeToken,
clients: Vec<ClientData>,
}

pub struct Broadcaster {
active_connections: DashMap<QueryWrapper, ClientGroup>,
active_connections: DashMap<StreamingQuery, ClientGroup>,
features_cache: Arc<FeatureCache>,
}

Expand Down Expand Up @@ -101,88 +117,88 @@ impl Broadcaster {
async fn heartbeat(&self) {
let mut active_connections = 0i64;
for mut group in self.active_connections.iter_mut() {
let mut ok_clients = Vec::new();
let clients = std::mem::take(&mut group.clients);
let ok_clients = &mut group.clients;

for client in &group.clients {
if client
for ClientData { token, sender } in clients {
if sender
.send(sse::Event::Comment("keep-alive".into()))
.await
.is_ok()
{
ok_clients.push(client.clone());
ok_clients.push(ClientData { token, sender });
}
Comment on lines +120 to +129
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are memory shenanigans that we probably don't need, but might be nice?

I initially wanted to just avoid cloning each client (especially now that we also have a string in there). To do that, we'd have to take ownership, which std::mem::take lets you do.

This way, we take ownership of the clients and swap it in with an empty vec. We then mutably borrow that new vec and insert only the ok clients.

Love to have a discussion about this, but it appears to be working.

Copy link
Member

@sighphyre sighphyre Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think memory shenanigans are worth it here. I'm late to this party but unless this runs every time a connected client tries to get features or the client struct holds a non Arc'd struct that's a direct chunk of memory for all the features then simpler is better.

Profile first either way

Edit: Now that I think about it, I would love to see the Heaptrack outputs for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair. This runs every 30 seconds for every client. Again, probably not gonna make a massive difference, so I'd be happy to revert it to how it was 🤷🏼 I don't necessarily want to go through the rigmarole of profiling for this right away, though I'd be happy to have a look at it with you, if you want to have a look at the heaptrack output. I ... don't know what that is, so an intro might be in order 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yuh, we can take a look together, Heaptrack is this beast: https://github.com/KDE/heaptrack, give you insight into the heap allocations. It's not urgent and a quick and dirty "glance at the process memory in task manager is probably good enough for now but we should know how much memory this is going to eat before we release it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, neat. Yeah, if that's a Linux-only tool, then I'm a little out of luck. Looks like it may be possible to compile it for macOS, but yeah, let's not start there. I'd love to have a little sesh on it if you're down, though. Just hit me up whenever.

}

active_connections += ok_clients.len() as i64;
group.clients = ok_clients;
}
CONNECTED_STREAMING_CLIENTS.set(active_connections)
}

/// Registers client with broadcaster, returning an SSE response body.
pub async fn connect(
&self,
token: EdgeToken,
filter_set: Query<FeatureFilters>,
query: unleash_types::client_features::Query,
query: Query,
) -> EdgeResult<Sse<InfallibleStream<ReceiverStream<sse::Event>>>> {
let (tx, rx) = mpsc::channel(10);
self.create_connection(StreamingQuery::from((&query, &token)), &token.token)
.await
.map(Sse::from_infallible_receiver)
}

let features = &self
.resolve_features(&token, filter_set.clone(), query.clone())
.await?;
async fn create_connection(
&self,
Comment on lines +143 to +148
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split connect into connect and create_connection to facilitate easier testing. Instead of passing in an EdgeToken and a Query (which are almost the same thing), I wanted to just pass the bare minimum that we needed.

query: StreamingQuery,
token: &str,
) -> EdgeResult<mpsc::Receiver<sse::Event>> {
let (tx, rx) = mpsc::channel(10);

let features = self.resolve_features(query.clone()).await?;
tx.send(
sse::Data::new_json(features)?
sse::Data::new_json(&features)?
.event("unleash-connected")
.into(),
)
.await?;

self.active_connections
.entry(QueryWrapper { query })
.entry(query)
.and_modify(|group| {
group.clients.push(tx.clone());
group.clients.push(ClientData {
token: token.into(),
sender: tx.clone(),
});
})
.or_insert(ClientGroup {
clients: vec![tx.clone()],
filter_set,
token,
clients: vec![ClientData {
token: token.into(),
sender: tx.clone(),
}],
});
Ok(Sse::from_infallible_receiver(rx))
}

fn get_query_filters(
filter_query: Query<FeatureFilters>,
token: &EdgeToken,
) -> FeatureFilterSet {
let query_filters = filter_query.into_inner();
Ok(rx)
}

let filter_set = if let Some(name_prefix) = query_filters.name_prefix {
FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix)))
fn get_query_filters(query: &StreamingQuery) -> FeatureFilterSet {
let filter_set = if let Some(name_prefix) = &query.name_prefix {
FeatureFilterSet::from(Box::new(name_prefix_filter(name_prefix.clone())))
} else {
FeatureFilterSet::default()
}
.with_filter(project_filter(token));
.with_filter(project_filter(query.projects.clone()));
filter_set
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a new clone, but it's actually just replacing an old clone. The project_filter function in server/src/filters.rs looks like this:

pub(crate) fn project_filter(token: &EdgeToken) -> FeatureFilter {
    let token = token.clone();
    Box::new(move |feature| {
        if let Some(feature_project) = &feature.project {
            token.projects.is_empty()
                || token.projects.contains(&"*".to_string())
                || token.projects.contains(feature_project)
        } else {
            false
        }
    })
}

and clones the whole token. Instead, we'll just clone the projects list here.

I would like to suggest moving the project_filter function in this file back to filters, renaming it, and having the original project_filter function call this new one instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it makes sense and/or makes the code more readable then go for it. I wouldn't worry too much about the clone here, remember that this will get dwarfed by the memory copy of the client features JSON to the outgoing socket anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, I'm not worried about th ecloning here. That comment was just to say that I'm not replacing a previous non-clone with a new clone. The cloning happens anyway. This just clones less and more explicitly.

}

async fn resolve_features(
&self,
validated_token: &EdgeToken,
filter_set: Query<FeatureFilters>,
query: FlagQuery,
) -> EdgeJsonResult<ClientFeatures> {
let filter_set = Broadcaster::get_query_filters(filter_set.clone(), validated_token);
async fn resolve_features(&self, query: StreamingQuery) -> EdgeJsonResult<ClientFeatures> {
let filter_set = Broadcaster::get_query_filters(&query);

let features = self
.features_cache
.get(&cache_key(validated_token))
.get(&query.environment)
.map(|client_features| filter_client_features(&client_features, &filter_set));

match features {
Some(features) => Ok(Json(ClientFeatures {
query: Some(query),
query: Some(query.into()),
..features
})),
// Note: this is a simplification for now, using the following assumptions:
Expand All @@ -196,11 +212,12 @@ impl Broadcaster {
/// Broadcast new features to all clients.
pub async fn broadcast(&self) {
let mut client_events = Vec::new();

for entry in self.active_connections.iter() {
let (query, group) = entry.pair();

let event_data = self
.resolve_features(&group.token, group.filter_set.clone(), query.query.clone())
.resolve_features(query.clone())
.await
.and_then(|features| sse::Data::new_json(&features).map_err(|e| e.into()));

Expand All @@ -221,8 +238,20 @@ impl Broadcaster {
// disconnected clients will get swept up by `remove_stale_clients`
let send_events = client_events
.iter()
.map(|(client, event)| client.send(event.clone()));
.map(|(ClientData { sender, .. }, event)| sender.send(event.clone()));

let _ = future::join_all(send_events).await;
}
}

fn project_filter(projects: Vec<String>) -> FeatureFilter {
Box::new(move |feature| {
if let Some(feature_project) = &feature.project {
projects.is_empty()
|| projects.contains(&"*".to_string())
|| projects.contains(feature_project)
} else {
false
}
})
}
Loading