Skip to content

Commit

Permalink
fix: Fix TTL to be time.Second based (#24383)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuziontech authored Aug 14, 2024
1 parent 904c750 commit 190ecf8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
21 changes: 9 additions & 12 deletions livestream/live_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
COUNTER_TTL = 60
COUNTER_TTL = time.Second * 60
)

type Stats struct {
Expand All @@ -20,24 +20,21 @@ type Stats struct {
func newStatsKeeper() *Stats {
return &Stats{
Store: make(map[string]*expirable.LRU[string, string]),
GlobalStore: expirable.NewLRU[string, string](0, nil, time.Second*COUNTER_TTL),
GlobalStore: expirable.NewLRU[string, string](0, nil, COUNTER_TTL),
Counter: NewSlidingWindowCounter(COUNTER_TTL),
}
}

func (ts *Stats) keepStats(statsChan chan PostHogEvent) {
log.Println("starting stats keeper...")

for { // ignore the range warning here - it's wrong
select {
case event := <-statsChan:
ts.Counter.Increment()
token := event.Token
if _, ok := ts.Store[token]; !ok {
ts.Store[token] = expirable.NewLRU[string, string](0, nil, time.Second*COUNTER_TTL)
}
ts.Store[token].Add(event.DistinctId, "1")
ts.GlobalStore.Add(event.DistinctId, "1")
for event := range statsChan {
ts.Counter.Increment()
token := event.Token
if _, ok := ts.Store[token]; !ok {
ts.Store[token] = expirable.NewLRU[string, string](0, nil, COUNTER_TTL)
}
ts.Store[token].Add(event.DistinctId, "1")
ts.GlobalStore.Add(event.DistinctId, "1")
}
}
8 changes: 4 additions & 4 deletions livestream/served.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (
)

type Counter struct {
EventCount uint32
UserCount uint32
EventCount int
UserCount int
}

func servedHandler(stats *Stats) func(c echo.Context) error {
return func(c echo.Context) error {
userCount := stats.GlobalStore.Len()
count := stats.Counter.Count()
resp := Counter{
EventCount: uint32(count),
UserCount: uint32(userCount),
EventCount: count,
UserCount: userCount,
}
return c.JSON(http.StatusOK, resp)
}
Expand Down
17 changes: 15 additions & 2 deletions livestream/ttl_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,24 @@ type SlidingWindowCounter struct {
}

func NewSlidingWindowCounter(windowSize time.Duration) *SlidingWindowCounter {
return &SlidingWindowCounter{
swc := &SlidingWindowCounter{
events: make([]time.Time, 0),
windowSize: windowSize,
}

// Start a goroutine to periodically remove old events
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for range ticker.C {
swc.mu.Lock()
swc.removeOldEvents(time.Now())
swc.mu.Unlock()
}
}()

return swc
}

func (swc *SlidingWindowCounter) Increment() {
Expand All @@ -24,7 +38,6 @@ func (swc *SlidingWindowCounter) Increment() {

now := time.Now()
swc.events = append(swc.events, now)
swc.removeOldEvents(now)
}

func (swc *SlidingWindowCounter) Count() int {
Expand Down

0 comments on commit 190ecf8

Please sign in to comment.