From 971abad3fda17384cdc7f2e867723a9801f0feb0 Mon Sep 17 00:00:00 2001 From: sancar Date: Thu, 6 Jun 2024 16:49:53 +0300 Subject: [PATCH 1/4] Fix Stream Lag and Add EntriesAdded/EntriesRead Added them according to redis spec described here https://redis.io/docs/latest/commands/xinfo-groups/ --- cmd_stream.go | 13 +++- cmd_stream_test.go | 144 +++++++++++++++++++++++++++++++++++++++------ stream.go | 84 +++++++++++++++++++++++--- 3 files changed, 213 insertions(+), 28 deletions(-) diff --git a/cmd_stream.go b/cmd_stream.go index b6d00343..e8d9b4d7 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -569,9 +569,11 @@ func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) { return } - c.WriteMapLen(1) + c.WriteMapLen(2) c.WriteBulk("length") c.WriteInt(len(s.entries)) + c.WriteBulk("entries-added") + c.WriteInt(s.entriesAdded) }) } @@ -610,9 +612,14 @@ func (m *Miniredis) cmdXinfoGroups(c *server.Peer, args []string) { c.WriteBulk("last-delivered-id") c.WriteBulk(g.lastID) c.WriteBulk("entries-read") - c.WriteNull() + c.WriteInt(g.entriesRead) c.WriteBulk("lag") - c.WriteInt(len(g.stream.entries)) + lag := g.lag() + if lag == -1 { + c.WriteNull() + } else { + c.WriteInt(lag) + } } }) } diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 719a28eb..9dea28e4 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -35,7 +35,7 @@ func TestStream(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "s", - proto.Array(proto.String("length"), proto.Int(1)), + proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) now := time.Date(2001, 1, 1, 4, 4, 5, 4000000, time.UTC) @@ -73,7 +73,7 @@ func TestStream(t *testing.T) { t.Run("resp3", func(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "s", - proto.Map(proto.String("length"), proto.Int(1)), + proto.Map(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) }) } @@ -546,7 +546,7 @@ func TestStreamInfo(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "planets", - proto.Array(proto.String("length"), proto.Int(1)), + proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) mustDo(t, c, @@ -605,7 +605,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -627,7 +627,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -660,7 +660,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -727,7 +727,7 @@ func TestStreamReadGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -747,6 +747,20 @@ func TestStreamReadGroup(t *testing.T) { "XLEN", "planets", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + proto.String("entries-read"), proto.Int(0), + proto.String("lag"), proto.Int(1), + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", proto.Array( @@ -762,8 +776,8 @@ func TestStreamReadGroup(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-1"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), ), ), ) @@ -806,6 +820,11 @@ func TestStreamDelete(t *testing.T) { proto.String("0-1"), ) + mustDo(t, c, + "XADD", "planets", "0-2", "name", "Venus", + proto.String("0-2"), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", proto.Array( @@ -816,24 +835,84 @@ func TestStreamDelete(t *testing.T) { proto.String("0-1"), proto.Strings("name", "Mercury"), ), + proto.Array( + proto.String("0-2"), + proto.Strings("name", "Venus"), + ), ), ), ), ) mustDo(t, c, - "XADD", "planets", "0-2", "name", "Mercury", - proto.String("0-2"), + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(0), + ), + ), + ) + + mustDo(t, c, + "XADD", "planets", "0-3", "name", "Earth", + proto.String("0-3"), + ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(1), + ), + ), ) must1(t, c, "XDEL", "planets", "0-1", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(1), + ), + ), + ) + must1(t, c, "XDEL", "planets", "0-2", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Nil, + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0", proto.Array( @@ -865,9 +944,38 @@ func TestStreamAck(t *testing.T) { ), ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-1"), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), + ), + ), + ) + must1(t, c, "XACK", "planets", "processing", "0-1", ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-1"), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0", proto.Array( @@ -885,8 +993,8 @@ func TestStreamAck(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-1"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), ), ), ) @@ -1353,8 +1461,8 @@ func TestStreamAutoClaim(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-2"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(2), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(0), ), ), ) @@ -1428,7 +1536,7 @@ func TestStreamClaim(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(2), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(2), ), ), @@ -1479,8 +1587,8 @@ func TestStreamClaim(t *testing.T) { proto.String("consumers"), proto.Int(2), proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(0), + proto.String("lag"), proto.Int(2), ), ), ) diff --git a/stream.go b/stream.go index 55faa5e1..8ec81c6e 100644 --- a/stream.go +++ b/stream.go @@ -18,6 +18,7 @@ type streamKey struct { entries []StreamEntry groups map[string]*streamGroup lastAllocatedID string + entriesAdded int mu sync.Mutex } @@ -30,10 +31,12 @@ type StreamEntry struct { } type streamGroup struct { - stream *streamKey - lastID string - pending []pendingEntry - consumers map[string]*consumer + stream *streamKey + lastID string + pending []pendingEntry + consumers map[string]*consumer + entriesRead int + entriesReadValid bool } type consumer struct { @@ -93,6 +96,14 @@ func (s *streamKey) lastIDUnlocked() string { return s.entries[len(s.entries)-1].ID } +func (s *streamKey) firstIDUnlocked() string { + if len(s.entries) == 0 { + return "0-0" + } + + return s.entries[0].ID +} + func (s *streamKey) copy() *streamKey { s.mu.Lock() defer s.mu.Unlock() @@ -218,13 +229,22 @@ func (s *streamKey) createGroup(group, id string) error { return errors.New("BUSYGROUP Consumer Group name already exists") } + var entriesRead = 0 + var entriesReadValid = true if id == "$" { id = s.lastIDUnlocked() + entriesRead = len(s.entries) + } else if id == "0" || id == "0-0" || id == s.firstIDUnlocked() { + entriesRead = 0 + } else { + entriesReadValid = false } s.groups[group] = &streamGroup{ - stream: s, - lastID: id, - consumers: map[string]*consumer{}, + stream: s, + lastID: id, + consumers: map[string]*consumer{}, + entriesRead: entriesRead, + entriesReadValid: entriesReadValid, } return nil } @@ -255,6 +275,7 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, ID: entryID, Values: values, }) + s.entriesAdded++ return entryID, nil } @@ -263,10 +284,26 @@ func (s *streamKey) trim(n int) { defer s.mu.Unlock() if len(s.entries) > n { + for _, group := range s.groups { + for _, entry := range s.entries[:n] { + group.onDelete(entry.ID) + } + } s.entries = s.entries[len(s.entries)-n:] } } +func (g *streamGroup) onDelete(id string) { + if !g.entriesReadValid { + return + } + compare := streamCmp(g.lastID, id) + if compare <= 0 { + // an item between last-delivered-id and last-written-id is deleted. Entries read is not valid anymore + g.entriesReadValid = false + } +} + // trimBefore deletes entries with an id less than the provided id // and returns the number of entries deleted func (s *streamKey) trimBefore(id string) int { @@ -295,6 +332,12 @@ func (s *streamKey) after(id string) []StreamEntry { return s.entries[pos:] } +func (s *streamKey) len() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.entries) +} + // get a stream entry by ID // Also returns the position in the entries slice, if found. func (s *streamKey) get(id string) (int, *StreamEntry) { @@ -363,6 +406,8 @@ func (g *streamGroup) readGroup( } g.consumers[consumerID].numPendingEntries += len(msgs) g.lastID = msgs[len(msgs)-1].ID + + g.updateEntriesRead(msgs) return msgs } @@ -434,6 +479,9 @@ func (s *streamKey) delete(ids []string) (int, error) { continue } + for _, group := range s.groups { + group.onDelete(entry.ID) + } s.entries = append(s.entries[:i], s.entries[i+1:]...) count++ } @@ -498,3 +546,25 @@ func (g *streamGroup) setLastSuccess(c string, t time.Time) { g.setLastSeen(c, t) g.consumers[c].lastSuccess = t } + +func (g *streamGroup) lag() int { + if !g.entriesReadValid { + return -1 + } + g.stream.mu.Lock() + defer g.stream.mu.Unlock() + return g.stream.entriesAdded - g.entriesRead +} + +func (g *streamGroup) updateEntriesRead(msgs []StreamEntry) { + // mutex protects lastId and len(g.stream.entries) together. + // We should not lock the mutex twice for each + g.stream.mu.Lock() + defer g.stream.mu.Unlock() + if g.entriesReadValid { + g.entriesRead += len(msgs) + } else if g.lastID == g.stream.lastIDUnlocked() { + // reset entries read as we catch up to the last ID + g.entriesRead = len(g.stream.entries) + } +} From 1d5162927bfd047471696c7744bb85c62891e407 Mon Sep 17 00:00:00 2001 From: sancar Date: Thu, 6 Jun 2024 23:42:45 +0300 Subject: [PATCH 2/4] test fix --- cmd_stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 9dea28e4..b8dc385f 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -1588,7 +1588,7 @@ func TestStreamClaim(t *testing.T) { proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-0"), proto.String("entries-read"), proto.Int(0), - proto.String("lag"), proto.Int(2), + proto.String("lag"), proto.Nil, ), ), ) From 368f06748974c4a11fd5a0abf5dc5ff7023d17ed Mon Sep 17 00:00:00 2001 From: sancar Date: Fri, 7 Jun 2024 10:10:18 +0300 Subject: [PATCH 3/4] a small fix on lag calculation. Still following spec --- cmd_stream_test.go | 27 +++++++++++++++++++++++++-- stream.go | 2 +- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/cmd_stream_test.go b/cmd_stream_test.go index b8dc385f..e5d42813 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -863,6 +863,11 @@ func TestStreamDelete(t *testing.T) { proto.String("0-3"), ) + mustDo(t, c, + "XADD", "planets", "0-4", "name", "Jupiter", + proto.String("0-4"), + ) + mustDo(t, c, "XINFO", "GROUPS", "planets", proto.Array( @@ -872,7 +877,7 @@ func TestStreamDelete(t *testing.T) { proto.String("pending"), proto.Int(2), proto.String("last-delivered-id"), proto.String("0-2"), proto.String("entries-read"), proto.Int(2), - proto.String("lag"), proto.Int(1), + proto.String("lag"), proto.Int(2), ), ), ) @@ -890,7 +895,7 @@ func TestStreamDelete(t *testing.T) { proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-2"), proto.String("entries-read"), proto.Int(2), - proto.String("lag"), proto.Int(1), + proto.String("lag"), proto.Int(2), ), ), ) @@ -899,6 +904,24 @@ func TestStreamDelete(t *testing.T) { "XDEL", "planets", "0-2", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(2), + ), + ), + ) + + must1(t, c, + "XDEL", "planets", "0-3", + ) + mustDo(t, c, "XINFO", "GROUPS", "planets", proto.Array( diff --git a/stream.go b/stream.go index 8ec81c6e..12edcd64 100644 --- a/stream.go +++ b/stream.go @@ -298,7 +298,7 @@ func (g *streamGroup) onDelete(id string) { return } compare := streamCmp(g.lastID, id) - if compare <= 0 { + if compare < 0 { // an item between last-delivered-id and last-written-id is deleted. Entries read is not valid anymore g.entriesReadValid = false } From ab408dd82d84cfcd597cec2b3be9d6088cfd7980 Mon Sep 17 00:00:00 2001 From: sancar Date: Fri, 7 Jun 2024 12:07:21 +0300 Subject: [PATCH 4/4] add integration test --- cmd_stream_test.go | 30 ++++++++++++++++++++++++++++++ integration/stream_test.go | 24 ++++++++++++++++++++++++ integration/test.go | 2 +- stream.go | 5 +++-- 4 files changed, 58 insertions(+), 3 deletions(-) diff --git a/cmd_stream_test.go b/cmd_stream_test.go index e5d42813..4abb2669 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -945,6 +945,36 @@ func TestStreamDelete(t *testing.T) { ), ), ) + + mustDo(t, c, + "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", + proto.Array( + proto.Array( + proto.String("planets"), + proto.Array( + proto.Array( + proto.String("0-4"), + proto.Strings("name", "Jupiter"), + ), + ), + ), + ), + ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-4"), + proto.String("entries-read"), proto.Int(4), + proto.String("lag"), proto.Int(0), + ), + ), + ) + } // Test XACK diff --git a/integration/stream_test.go b/integration/stream_test.go index 6e817a3d..9047a75c 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -531,6 +531,30 @@ func TestStreamGroup(t *testing.T) { }) }) + t.Run("XREADGROUP XDEL XINFO lag", func(t *testing.T) { + testRaw(t, func(c *client) { + // TODO ALL INFO needs to be uncommented, even DoLoosely is not enough + // for some of them becase Nil is returned instead of Int + c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM") + c.Do("XADD", "planets", "0-1", "name", "Mercury") + c.Do("XADD", "planets", "0-2", "name", "Venus") + c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">") + //c.Do("XINFO", "GROUPS", "planets") //SPEC entries-added 2, entries-read = 2, lag = 0 + c.Do("XADD", "planets", "0-3", "name", "Earth") + c.Do("XADD", "planets", "0-4", "name", "Jupiter") + //c.Do("XINFO", "GROUPS", "planets") //SPEC entries-added 4, entries-read = 2, lag = 2 + + c.Do("XDEL", "planets", "0-1") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = 2 + c.Do("XDEL", "planets", "0-2") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = 2 + c.Do("XDEL", "planets", "0-3") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 2, lag = nil + c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">") + //c.Do("XINFO", "GROUPS", "planets") // SPEC entries-added 4, entries-read = 4, lag = 0 + }) + }) + t.Run("XACK", func(t *testing.T) { testRaw(t, func(c *client) { c.Do("XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM") diff --git a/integration/test.go b/integration/test.go index f91163a6..1fd08368 100644 --- a/integration/test.go +++ b/integration/test.go @@ -346,7 +346,7 @@ func (c *client) Do(cmd string, args ...string) { // c.t.Logf("real:%q mini:%q", string(resReal), string(resMini)) if resReal != resMini { - c.t.Errorf("real: %q mini: %q", string(resReal), string(resMini)) + c.t.Errorf("\nreal: %q \nmini: %q", string(resReal), string(resMini)) return } diff --git a/stream.go b/stream.go index 12edcd64..d1a442d8 100644 --- a/stream.go +++ b/stream.go @@ -233,7 +233,7 @@ func (s *streamKey) createGroup(group, id string) error { var entriesReadValid = true if id == "$" { id = s.lastIDUnlocked() - entriesRead = len(s.entries) + entriesRead = s.entriesAdded } else if id == "0" || id == "0-0" || id == s.firstIDUnlocked() { entriesRead = 0 } else { @@ -565,6 +565,7 @@ func (g *streamGroup) updateEntriesRead(msgs []StreamEntry) { g.entriesRead += len(msgs) } else if g.lastID == g.stream.lastIDUnlocked() { // reset entries read as we catch up to the last ID - g.entriesRead = len(g.stream.entries) + g.entriesRead = g.stream.entriesAdded + g.entriesReadValid = true } }