Skip to content

Commit

Permalink
Tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Sep 21, 2023
1 parent 9058102 commit caba5b2
Showing 1 changed file with 3 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class SessionRecordingIngesterV2 {
recordingConsumerConfig: PluginsServerConfig
topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS

private promises: Promise<any>[] = []
private promises: Set<Promise<any>> = new Set()

constructor(
private serverConfig: PluginsServerConfig,
Expand Down Expand Up @@ -182,8 +182,8 @@ export class SessionRecordingIngesterV2 {
* Helper to handle graceful shutdowns. Every time we do some work we add a promise to this array and remove it when finished.
* That way when shutting down we can wait for all promises to finish before exiting.
*/
this.promises.push(promise)
promise.finally(() => (this.promises = this.promises.filter((p) => p !== promise)))
this.promises.add(promise)
promise.finally(() => this.promises.delete(promise))

return promise
}
Expand Down Expand Up @@ -553,16 +553,6 @@ export class SessionRecordingIngesterV2 {
* As a result, we need to drop all sessions currently managed for the revoked partitions
*/

/**
* IDEA
*
* 1. Pull out all sessions we are revoking.
* 2. Reset all the relevant metrics - that way we don't have to worry about races
* 3. Lock all the partitions we are revoking - gives us N seconds to deal with them
* 4. Flush all the sessions (probably with a timeout)
* 5. We can safely destroy them as they aren't pat of the sessions map
*/

const revokedPartitions = topicPartitions.map((x) => x.partition)
if (!revokedPartitions.length) {
return
Expand Down

0 comments on commit caba5b2

Please sign in to comment.