Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
With this PR tranquility can handle segment granularity changes without losing data and also prevents tasks (that spans the new and old segment interval) to hang up indefinitely. The basic idea used here is that we see an event and if the event timestamp falls in some existing beam interval we use that beam otherwise we try to create new beam. Implications of the following changes -
To achieve this functionality following design changes have been made -
Beam
trait now exposesgetInterval()
which returns an optional interval as it may not make sense for all implementations ofBeam
beams
object inClusteredBeam
is now a reverse sorted list (by interval start) of beams known to us. This will be used while grouping the events to check if the event timestamp falls in the interval of theBeam
. This should not degrade the performance as compared to doing look up in HashMap as most of the times the head item in thebeams
list would be the one that can handle the event, also truncating the event timestamp will be node be needed anymore.Question - I am not sure why the
beams
object was ConcurrentHashMap instead of just HashMap ? As all the writes tobeams
are synchronized usingbeamWriteMonitor
, I have used non-thread safe list forbeams
in this code.Note - This is the first time I am writing Scala code so please feel free to point out mistakes