-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: store less information per streaming client #618
refactor: store less information per streaming client #618
Conversation
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.OpenSSF Scorecard
Scanned Files |
impl From<StreamingQuery> for Query { | ||
fn from(value: StreamingQuery) -> Self { | ||
Self { | ||
tags: None, | ||
name_prefix: value.name_prefix, | ||
environment: Some(value.environment), | ||
inline_segment_constraints: Some(false), | ||
projects: Some(value.projects), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use inline_segment_constraints
and tags
in the actual feature resolution.
I've set inline_segment_constraints
to false
because that indicates that the SDK supports resolution and if the SDK supports streaming, I think we can assume that it supports segment resolution. This is also consistent with it being hardcoded to Some(false)
in client_api.rs
.
As for tags: Edge doesn't support tag queries in general. So even if you were to make a tag query, you'll always get "None" back (this is also true for the existing client/features endpoint).
impl From<(&Query, &EdgeToken)> for StreamingQuery { | ||
fn from((query, token): (&Query, &EdgeToken)) -> Self { | ||
Self { | ||
projects: token.projects.clone(), | ||
name_prefix: query.name_prefix.clone(), | ||
environment: match token.environment { | ||
Some(ref env) => env.clone(), | ||
None => token.token.clone(), | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implementing From for a tuple feels kinda weird. This might be better as a normal function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine in my opinion
struct ClientData { | ||
token: String, | ||
sender: mpsc::Sender<sse::Event>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each client connection now has an associated token, so that if a token expires, we can cut them loose.
#[derive(Debug, Clone, PartialEq, Eq, Serialize)] | ||
struct QueryWrapper { | ||
query: FlagQuery, | ||
#[derive(Debug, Clone, PartialEq, Eq, Hash)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this struct, turns out we don't need Query to impl Hash anyway 😅
server/src/http/broadcaster.rs
Outdated
match key { | ||
UpdateType::Full(env) | UpdateType::Update(env) => { | ||
this.broadcast(Some(env)).await; | ||
} | ||
UpdateType::Deletion => { | ||
this.broadcast(None).await; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actual feature logic: use the env key from the update event and pass it on to the broadcaster.
environment: match token.environment { | ||
Some(ref env) => env.clone(), | ||
None => token.token.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks a bit weird, but it mirrors how we get the cache key in server/src/tokens.rs
self.create_connection(StreamingQuery::from((&query, &token)), &token.token) | ||
.await | ||
.map(Sse::from_infallible_receiver) | ||
} | ||
|
||
let features = &self | ||
.resolve_features(&token, filter_set.clone(), query.clone()) | ||
.await?; | ||
async fn create_connection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split connect
into connect
and create_connection
to facilitate easier testing. Instead of passing in an EdgeToken and a Query (which are almost the same thing), I wanted to just pass the bare minimum that we needed.
} else { | ||
FeatureFilterSet::default() | ||
} | ||
.with_filter(project_filter(token)); | ||
.with_filter(project_filter(query.projects.clone())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a new clone, but it's actually just replacing an old clone. The project_filter
function in server/src/filters.rs
looks like this:
pub(crate) fn project_filter(token: &EdgeToken) -> FeatureFilter {
let token = token.clone();
Box::new(move |feature| {
if let Some(feature_project) = &feature.project {
token.projects.is_empty()
|| token.projects.contains(&"*".to_string())
|| token.projects.contains(feature_project)
} else {
false
}
})
}
and clones the whole token. Instead, we'll just clone the projects list here.
I would like to suggest moving the project_filter
function in this file back to filters
, renaming it, and having the original project_filter
function call this new one instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it makes sense and/or makes the code more readable then go for it. I wouldn't worry too much about the clone here, remember that this will get dwarfed by the memory copy of the client features JSON to the outgoing socket anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, I'm not worried about th ecloning here. That comment was just to say that I'm not replacing a previous non-clone with a new clone. The cloning happens anyway. This just clones less and more explicitly.
fn project_filter(projects: Vec<String>) -> FeatureFilter { | ||
Box::new(move |feature| { | ||
if let Some(feature_project) = &feature.project { | ||
projects.is_empty() | ||
|| projects.contains(&"*".to_string()) | ||
|| projects.contains(feature_project) | ||
} else { | ||
false | ||
} | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the more general version of the project_filter
function from server/src/filters.rs
.
I think we can move this into the original file and either rename it and the original project_filter
call it, or use this more generic version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this seems simpler.
This PR performs a small amount of cleanup after #618 based on the PR discussion. Specifically, it: - Adds a new `project_filter_from_projects` function to the filters file, and makes `project_filter` fall back to that on the back end (we can discuss the names here, but doing it this way saved me touching any more code) - Reverts the `std::mem::take` shenanigans in favor of cloning. Even though it clones one string per attached listener every 30 seconds, it's probably not going to cause a lot of memory overhead given that we drop the original shortly thereafter. * refactor: move project_filter into filters.rs * refactor: revert std mem take change
This PR refactors the broadcaster and what it stores per client:
I realized when setting up the tests for this that the Query, EdgeToken, and FilterQuery all contain more or less the same bits of data. But all we really need is:
In the Unleash Types
Query
type, projects and env are optional, but we need them to be present to perform the calculation.So I created a
StreamingQuery
struct, which consolidates the data we need from the Query and EdgeToken. I also copied in the methods we use for this elsewhere in Unleash and slightly adapted them. I've added notes inline.