diff --git a/cmd_stream.go b/cmd_stream.go index b6d0034..e8d9b4d 100644 --- a/cmd_stream.go +++ b/cmd_stream.go @@ -569,9 +569,11 @@ func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) { return } - c.WriteMapLen(1) + c.WriteMapLen(2) c.WriteBulk("length") c.WriteInt(len(s.entries)) + c.WriteBulk("entries-added") + c.WriteInt(s.entriesAdded) }) } @@ -610,9 +612,14 @@ func (m *Miniredis) cmdXinfoGroups(c *server.Peer, args []string) { c.WriteBulk("last-delivered-id") c.WriteBulk(g.lastID) c.WriteBulk("entries-read") - c.WriteNull() + c.WriteInt(g.entriesRead) c.WriteBulk("lag") - c.WriteInt(len(g.stream.entries)) + lag := g.lag() + if lag == -1 { + c.WriteNull() + } else { + c.WriteInt(lag) + } } }) } diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 719a28e..9dea28e 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -35,7 +35,7 @@ func TestStream(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "s", - proto.Array(proto.String("length"), proto.Int(1)), + proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) now := time.Date(2001, 1, 1, 4, 4, 5, 4000000, time.UTC) @@ -73,7 +73,7 @@ func TestStream(t *testing.T) { t.Run("resp3", func(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "s", - proto.Map(proto.String("length"), proto.Int(1)), + proto.Map(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) }) } @@ -546,7 +546,7 @@ func TestStreamInfo(t *testing.T) { mustDo(t, c, "XINFO", "STREAM", "planets", - proto.Array(proto.String("length"), proto.Int(1)), + proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)), ) mustDo(t, c, @@ -605,7 +605,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -627,7 +627,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -660,7 +660,7 @@ func TestStreamGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -727,7 +727,7 @@ func TestStreamReadGroup(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(0), ), ), @@ -747,6 +747,20 @@ func TestStreamReadGroup(t *testing.T) { "XLEN", "planets", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(0), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-0"), + proto.String("entries-read"), proto.Int(0), + proto.String("lag"), proto.Int(1), + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", proto.Array( @@ -762,8 +776,8 @@ func TestStreamReadGroup(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-1"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), ), ), ) @@ -806,6 +820,11 @@ func TestStreamDelete(t *testing.T) { proto.String("0-1"), ) + mustDo(t, c, + "XADD", "planets", "0-2", "name", "Venus", + proto.String("0-2"), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">", proto.Array( @@ -816,24 +835,84 @@ func TestStreamDelete(t *testing.T) { proto.String("0-1"), proto.Strings("name", "Mercury"), ), + proto.Array( + proto.String("0-2"), + proto.Strings("name", "Venus"), + ), ), ), ), ) mustDo(t, c, - "XADD", "planets", "0-2", "name", "Mercury", - proto.String("0-2"), + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(0), + ), + ), + ) + + mustDo(t, c, + "XADD", "planets", "0-3", "name", "Earth", + proto.String("0-3"), + ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(2), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(1), + ), + ), ) must1(t, c, "XDEL", "planets", "0-1", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(1), + ), + ), + ) + must1(t, c, "XDEL", "planets", "0-2", ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-2"), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Nil, + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0", proto.Array( @@ -865,9 +944,38 @@ func TestStreamAck(t *testing.T) { ), ) + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(1), + proto.String("last-delivered-id"), proto.String("0-1"), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), + ), + ), + ) + must1(t, c, "XACK", "planets", "processing", "0-1", ) + + mustDo(t, c, + "XINFO", "GROUPS", "planets", + proto.Array( + proto.Array( + proto.String("name"), proto.String("processing"), + proto.String("consumers"), proto.Int(1), + proto.String("pending"), proto.Int(0), + proto.String("last-delivered-id"), proto.String("0-1"), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), + ), + ), + ) + mustDo(t, c, "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0", proto.Array( @@ -885,8 +993,8 @@ func TestStreamAck(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-1"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(1), + proto.String("lag"), proto.Int(0), ), ), ) @@ -1353,8 +1461,8 @@ func TestStreamAutoClaim(t *testing.T) { proto.String("consumers"), proto.Int(0), proto.String("pending"), proto.Int(0), proto.String("last-delivered-id"), proto.String("0-2"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(2), + proto.String("entries-read"), proto.Int(2), + proto.String("lag"), proto.Int(0), ), ), ) @@ -1428,7 +1536,7 @@ func TestStreamClaim(t *testing.T) { proto.String("consumers"), proto.Int(1), proto.String("pending"), proto.Int(2), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, + proto.String("entries-read"), proto.Int(0), proto.String("lag"), proto.Int(2), ), ), @@ -1479,8 +1587,8 @@ func TestStreamClaim(t *testing.T) { proto.String("consumers"), proto.Int(2), proto.String("pending"), proto.Int(1), proto.String("last-delivered-id"), proto.String("0-0"), - proto.String("entries-read"), proto.Nil, - proto.String("lag"), proto.Int(1), + proto.String("entries-read"), proto.Int(0), + proto.String("lag"), proto.Int(2), ), ), ) diff --git a/stream.go b/stream.go index 55faa5e..8ec81c6 100644 --- a/stream.go +++ b/stream.go @@ -18,6 +18,7 @@ type streamKey struct { entries []StreamEntry groups map[string]*streamGroup lastAllocatedID string + entriesAdded int mu sync.Mutex } @@ -30,10 +31,12 @@ type StreamEntry struct { } type streamGroup struct { - stream *streamKey - lastID string - pending []pendingEntry - consumers map[string]*consumer + stream *streamKey + lastID string + pending []pendingEntry + consumers map[string]*consumer + entriesRead int + entriesReadValid bool } type consumer struct { @@ -93,6 +96,14 @@ func (s *streamKey) lastIDUnlocked() string { return s.entries[len(s.entries)-1].ID } +func (s *streamKey) firstIDUnlocked() string { + if len(s.entries) == 0 { + return "0-0" + } + + return s.entries[0].ID +} + func (s *streamKey) copy() *streamKey { s.mu.Lock() defer s.mu.Unlock() @@ -218,13 +229,22 @@ func (s *streamKey) createGroup(group, id string) error { return errors.New("BUSYGROUP Consumer Group name already exists") } + var entriesRead = 0 + var entriesReadValid = true if id == "$" { id = s.lastIDUnlocked() + entriesRead = len(s.entries) + } else if id == "0" || id == "0-0" || id == s.firstIDUnlocked() { + entriesRead = 0 + } else { + entriesReadValid = false } s.groups[group] = &streamGroup{ - stream: s, - lastID: id, - consumers: map[string]*consumer{}, + stream: s, + lastID: id, + consumers: map[string]*consumer{}, + entriesRead: entriesRead, + entriesReadValid: entriesReadValid, } return nil } @@ -255,6 +275,7 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, ID: entryID, Values: values, }) + s.entriesAdded++ return entryID, nil } @@ -263,10 +284,26 @@ func (s *streamKey) trim(n int) { defer s.mu.Unlock() if len(s.entries) > n { + for _, group := range s.groups { + for _, entry := range s.entries[:n] { + group.onDelete(entry.ID) + } + } s.entries = s.entries[len(s.entries)-n:] } } +func (g *streamGroup) onDelete(id string) { + if !g.entriesReadValid { + return + } + compare := streamCmp(g.lastID, id) + if compare <= 0 { + // an item between last-delivered-id and last-written-id is deleted. Entries read is not valid anymore + g.entriesReadValid = false + } +} + // trimBefore deletes entries with an id less than the provided id // and returns the number of entries deleted func (s *streamKey) trimBefore(id string) int { @@ -295,6 +332,12 @@ func (s *streamKey) after(id string) []StreamEntry { return s.entries[pos:] } +func (s *streamKey) len() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.entries) +} + // get a stream entry by ID // Also returns the position in the entries slice, if found. func (s *streamKey) get(id string) (int, *StreamEntry) { @@ -363,6 +406,8 @@ func (g *streamGroup) readGroup( } g.consumers[consumerID].numPendingEntries += len(msgs) g.lastID = msgs[len(msgs)-1].ID + + g.updateEntriesRead(msgs) return msgs } @@ -434,6 +479,9 @@ func (s *streamKey) delete(ids []string) (int, error) { continue } + for _, group := range s.groups { + group.onDelete(entry.ID) + } s.entries = append(s.entries[:i], s.entries[i+1:]...) count++ } @@ -498,3 +546,25 @@ func (g *streamGroup) setLastSuccess(c string, t time.Time) { g.setLastSeen(c, t) g.consumers[c].lastSuccess = t } + +func (g *streamGroup) lag() int { + if !g.entriesReadValid { + return -1 + } + g.stream.mu.Lock() + defer g.stream.mu.Unlock() + return g.stream.entriesAdded - g.entriesRead +} + +func (g *streamGroup) updateEntriesRead(msgs []StreamEntry) { + // mutex protects lastId and len(g.stream.entries) together. + // We should not lock the mutex twice for each + g.stream.mu.Lock() + defer g.stream.mu.Unlock() + if g.entriesReadValid { + g.entriesRead += len(msgs) + } else if g.lastID == g.stream.lastIDUnlocked() { + // reset entries read as we catch up to the last ID + g.entriesRead = len(g.stream.entries) + } +}