-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support pending writes within CachedCounterValue #299
Conversation
e770469
to
d4eed7b
Compare
for counter in &batch { | ||
self.updates | ||
.remove_if(counter, |_, v| v.no_pending_writes()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If after having flushed to redis no additional pending writes were added, we can remove these entries from the queue.
pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>, priority: bool) { | ||
let priority = priority || value.value.ttl() < self.interval; | ||
self.updates.entry(counter).or_insert(value); | ||
if priority { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we start constraining the batch size sent to Redis, we may want to add the priority flag in the value too so that the first batch we execute contains the priority items first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I do this "hack" where I check the TTL on entry… to decide the things that are "important", but I think I should maybe just waste the additional bit…
expiry: AtomicExpiryTime, | ||
} | ||
|
||
pub struct Batcher { | ||
updates: DashMap<Counter, Arc<CachedCounterValue>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DashMap makes it much cleaner and easier to consume <3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The part that I particularly find hard/complex is the consume
method, that first iterates to get the hot counters that needs to flush asap, but then we iterate again to find a remainder and push it to the same batch (?), then we need to iterate again to have the final batch that we will send to the batch update fn. I only wonder if it's possible to simplify the collection we send to update... maybe in the near future we can review this. Great work! 👍🏼
Alright, this is a shot at unifying the "write-behind" and the cache we use in front of Redis.
It does so in a few ways:
CachedCounterValue
CachedCounterValue
(within anArc
) for the sameCounter
"key"Cache
should be atomic and only then added to theBatcher
(our write-behind "queue")Batcher
if aCachedCounterValue
is mapped to key, in case it got evicted from the cacheCachedCounterValue
to theBatcher
with an.pending_writes() > 0
Batched
if the.pending_writes() == 0
CachedCounterValue
instances (there is a race when inserting into the cache and being evicted before we made it into theBatcher
, where we then merge the pending writes from oneCachedCounterValue
into the other oneThis was achieved by doing:
CachedCounterValue
CachedRedisStorage.batcher_counter_updates
to point to the sameArc
as withinCountersCache.cache
CachedCounterValue
to populate the batch to redis (and update the values back)Batcher
type back exposing.add()
and.consume(u64).await
Batcher
awaken on either: flush period being elapsed the batch size being reached.consume
would return the most "important" updates first always0
as a default onCachedRedisStorage
cache misses #298This still misses tests 🤦 yet I still think this is better than what we had before... So, I think we should merge this and iterate, but only if my reasoning makes sense and "seems to be implemented" in this PR.