Skip to content

Commit

Permalink
fix: per channel meta ttl for in memory broker (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Mar 14, 2024
1 parent 34d8984 commit 23c243d
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
18 changes: 14 additions & 4 deletions broker_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,13 @@ func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (Stre
h.nextExpireCheck = expireAt
}

if h.historyMetaTTL > 0 {
removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds())
historyMetaTTL := opts.HistoryMetaTTL
if historyMetaTTL == 0 {
historyMetaTTL = h.historyMetaTTL
}

if historyMetaTTL > 0 {
removeAt := time.Now().Unix() + int64(historyMetaTTL.Seconds())
if _, ok := h.removes[ch]; !ok {
heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt})
}
Expand Down Expand Up @@ -388,8 +393,13 @@ func (h *historyHub) get(ch string, opts HistoryOptions) ([]*Publication, Stream

filter := opts.Filter

if h.historyMetaTTL > 0 {
removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds())
historyMetaTTL := opts.MetaTTL
if historyMetaTTL == 0 {
historyMetaTTL = h.historyMetaTTL
}

if historyMetaTTL > 0 {
removeAt := time.Now().Unix() + int64(historyMetaTTL.Seconds())
if _, ok := h.removes[ch]; !ok {
heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt})
}
Expand Down
40 changes: 40 additions & 0 deletions broker_memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,46 @@ func TestMemoryHistoryHubMetaTTL(t *testing.T) {
h.RUnlock()
}

func TestMemoryHistoryHubMetaTTLPerChannel(t *testing.T) {
h := newHistoryHub(300*time.Second, make(chan struct{}))
h.runCleanups()

ch1 := "channel1"
ch2 := "channel2"
pub := newTestPublication()
h.RLock()
require.Equal(t, int64(0), h.nextRemoveCheck)
h.RUnlock()
_, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
_, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
_, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
_, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second})
h.RLock()
require.True(t, h.nextRemoveCheck > 0)
require.Equal(t, 2, len(h.streams))
h.RUnlock()
pubs, _, err := h.get(ch1, HistoryOptions{
Filter: HistoryFilter{Limit: -1},
MetaTTL: time.Second,
})
require.NoError(t, err)
require.Len(t, pubs, 1)
pubs, _, err = h.get(ch2, HistoryOptions{
Filter: HistoryFilter{Limit: -1},
MetaTTL: time.Second,
})
require.NoError(t, err)
require.Len(t, pubs, 2)

time.Sleep(2 * time.Second)

// test that stream cleaned up by periodic task
h.RLock()
require.Equal(t, 0, len(h.streams))
require.Equal(t, int64(0), h.nextRemoveCheck)
h.RUnlock()
}

func TestMemoryBrokerRecover(t *testing.T) {
e := testMemoryBroker()
defer func() { _ = e.node.Shutdown(context.Background()) }()
Expand Down

0 comments on commit 23c243d

Please sign in to comment.