-
Notifications
You must be signed in to change notification settings - Fork 113
/
stream.go
432 lines (351 loc) · 11.2 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
package radix
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
"strconv"
"time"
"errors"
"github.com/mediocregopher/radix/v3/internal/bytesutil"
"github.com/mediocregopher/radix/v3/resp"
"github.com/mediocregopher/radix/v3/resp/resp2"
)
// StreamEntryID represents an ID used in a Redis stream with the format <time>-<seq>.
type StreamEntryID struct {
// Time is the first part of the ID, which is based on the time of the server that Redis runs on.
Time uint64
// Seq is the sequence number of the ID for entries with the same Time value.
Seq uint64
}
// Before returns true if s comes before o in a stream (is less than o).
func (s StreamEntryID) Before(o StreamEntryID) bool {
if s.Time != o.Time {
return s.Time < o.Time
}
return s.Seq < o.Seq
}
// Prev returns the previous stream entry ID or s if there is no prior id (s is 0-0).
func (s StreamEntryID) Prev() StreamEntryID {
if s.Seq > 0 {
s.Seq--
return s
}
if s.Time > 0 {
s.Time--
s.Seq = math.MaxUint64
return s
}
return s
}
// Next returns the next stream entry ID or s if there is no higher id (s is 18446744073709551615-18446744073709551615).
func (s StreamEntryID) Next() StreamEntryID {
if s.Seq < math.MaxUint64 {
s.Seq++
return s
}
if s.Time < math.MaxUint64 {
s.Time++
s.Seq = 0
return s
}
return s
}
var _ resp.Marshaler = (*StreamEntryID)(nil)
var _ resp.Unmarshaler = (*StreamEntryID)(nil)
var maxUint64Len = len(strconv.FormatUint(math.MaxUint64, 10))
func (s *StreamEntryID) bytes() []byte {
b := make([]byte, 0, maxUint64Len*2+1)
b = strconv.AppendUint(b, s.Time, 10)
b = append(b, '-')
b = strconv.AppendUint(b, s.Seq, 10)
return b
}
// MarshalRESP implements the resp.Marshaler interface.
func (s *StreamEntryID) MarshalRESP(w io.Writer) error {
return resp2.BulkStringBytes{B: s.bytes()}.MarshalRESP(w)
}
var errInvalidStreamID = errors.New("invalid stream entry id")
// UnmarshalRESP implements the resp.Unmarshaler interface.
func (s *StreamEntryID) UnmarshalRESP(br *bufio.Reader) error {
buf := bytesutil.GetBytes()
defer bytesutil.PutBytes(buf)
bsb := resp2.BulkStringBytes{B: (*buf)[:0]}
if err := bsb.UnmarshalRESP(br); err != nil {
return err
}
split := bytes.IndexByte(bsb.B, '-')
if split == -1 {
return errInvalidStreamID
}
time, err := bytesutil.ParseUint(bsb.B[:split])
if err != nil {
return errInvalidStreamID
}
seq, err := bytesutil.ParseUint(bsb.B[split+1:])
if err != nil {
return errInvalidStreamID
}
s.Time, s.Seq = time, seq
return nil
}
var _ fmt.Stringer = (*StreamEntryID)(nil)
// String returns the ID in the format <time>-<seq> (the same format used by
// Redis).
//
// String implements the fmt.Stringer interface.
func (s StreamEntryID) String() string {
return string(s.bytes())
}
// StreamEntry is an entry in a stream as returned by XRANGE, XREAD and
// XREADGROUP.
type StreamEntry struct {
// ID is the ID of the entry in a stream.
ID StreamEntryID
// Fields contains the fields and values for the stream entry.
Fields map[string]string
}
var _ resp.Unmarshaler = (*StreamEntry)(nil)
var errInvalidStreamEntry = errors.New("invalid stream entry")
// UnmarshalRESP implements the resp.Unmarshaler interface.
func (s *StreamEntry) UnmarshalRESP(br *bufio.Reader) error {
var ah resp2.ArrayHeader
if err := ah.UnmarshalRESP(br); err != nil {
return err
} else if ah.N != 2 {
return errInvalidStreamEntry
} else if err := s.ID.UnmarshalRESP(br); err != nil {
return err
}
// put this here in case the array has size -1
for k := range s.Fields {
delete(s.Fields, k)
}
// resp2.Any{I: &s.Fields}.UnmarshalRESP(br)
if err := ah.UnmarshalRESP(br); err != nil {
return err
} else if ah.N == -1 {
return nil
} else if ah.N%2 != 0 {
return errInvalidStreamEntry
} else if s.Fields == nil {
s.Fields = make(map[string]string, ah.N/2)
}
var bs resp2.BulkString
for i := 0; i < ah.N; i += 2 {
if err := bs.UnmarshalRESP(br); err != nil {
return err
}
key := bs.S
if err := bs.UnmarshalRESP(br); err != nil {
return err
}
s.Fields[key] = bs.S
}
return nil
}
// StreamEntries is a stream name and set of entries as returned by XREAD and
// XREADGROUP. The results from a call to XREAD(GROUP) can be unmarshaled into a
// []StreamEntries.
type StreamEntries struct {
Stream string
Entries []StreamEntry
}
// UnmarshalRESP implements the resp.Unmarshaler interface.
func (s *StreamEntries) UnmarshalRESP(br *bufio.Reader) error {
var ah resp2.ArrayHeader
if err := ah.UnmarshalRESP(br); err != nil {
return err
} else if ah.N != 2 {
return errors.New("invalid xread[group] response")
}
var stream resp2.BulkString
if err := stream.UnmarshalRESP(br); err != nil {
return err
}
s.Stream = stream.S
if err := ah.UnmarshalRESP(br); err != nil {
return err
}
s.Entries = make([]StreamEntry, ah.N)
for i := range s.Entries {
if err := s.Entries[i].UnmarshalRESP(br); err != nil {
return err
}
}
return nil
}
// StreamReaderOpts contains various options given for NewStreamReader that influence the behaviour.
//
// The only required field is Streams.
type StreamReaderOpts struct {
// Streams must contain one or more stream names that will be read.
//
// The value for each stream can either be nil or an existing ID.
// If a value is non-nil, only newer stream entries will be returned.
Streams map[string]*StreamEntryID
// FallbackToUndelivered will cause any streams in with a non-nil value in Streams to fallback
// to delivering messages not-yet-delivered to other consumers (as if the value in the Streams map was nil),
// once the reader has read all its pending messages in the stream.
// This must be used in conjunction with Group.
FallbackToUndelivered bool
// Group is an optional consumer group name.
//
// If Group is not empty reads will use XREADGROUP with the Group as consumer group instead of XREAD.
Group string
// Consumer is an optional consumer name for use with Group.
Consumer string
// NoAck optionally enables passing the NOACK flag to XREADGROUP.
NoAck bool
// Block specifies the duration in milliseconds that reads will wait for new data before returning.
//
// If Block is negative, reads will block indefinitely until new entries can be read or there is an error.
//
// The default, if Block is 0, is 5 seconds.
//
// If Block is non-negative, the Client used for the StreamReader must not have a timeout for commands or
// the timeout duration must be substantial higher than the Block duration (at least 50% for small Block values,
// but may be less for higher values).
Block time.Duration
// NoBlock disables blocking when no new data is available.
//
// If this is true, setting Block will not have any effect.
NoBlock bool
// Count can be used to limit the number of entries retrieved by each call to Next.
//
// If Count is 0, all available entries will be retrieved.
Count int
}
// StreamReader allows reading from on or more streams, always returning newer
// entries.
type StreamReader interface {
// Err returns any error that happened while calling Next or nil if no error
// happened.
//
// Once Err returns a non-nil error, all successive calls will return the
// same error.
Err() error
// Next returns new entries for any of the configured streams.
//
// The returned slice is only valid until the next call to Next.
//
// If there was an error, ok will be false. Otherwise, even if no entries
// were read, ok will be true.
//
// If there was an error, all future calls to Next will return ok == false.
Next() (stream string, entries []StreamEntry, ok bool)
}
// NewStreamReader returns a new StreamReader for the given client.
//
// Any changes on opts after calling NewStreamReader will have no effect.
func NewStreamReader(c Client, opts StreamReaderOpts) StreamReader {
sr := &streamReader{c: c, opts: opts}
if sr.opts.Group != "" {
sr.cmd = "XREADGROUP"
sr.fixedArgs = []string{"GROUP", sr.opts.Group, sr.opts.Consumer}
sr.fallbackToUndelivered = opts.FallbackToUndelivered
} else {
sr.cmd = "XREAD"
sr.fixedArgs = nil
}
if sr.opts.Count > 0 {
sr.fixedArgs = append(sr.fixedArgs, "COUNT", strconv.Itoa(sr.opts.Count))
}
if !sr.opts.NoBlock {
dur := 5 * time.Second
if sr.opts.Block < 0 {
dur = 0
} else if sr.opts.Block > 0 {
dur = sr.opts.Block
}
msec := int(dur / time.Millisecond)
sr.fixedArgs = append(sr.fixedArgs, "BLOCK", strconv.Itoa(msec))
}
if sr.opts.Group != "" && sr.opts.NoAck {
sr.fixedArgs = append(sr.fixedArgs, "NOACK")
}
sr.streams = make([]string, 0, len(sr.opts.Streams))
sr.ids = make(map[string]string, len(sr.opts.Streams))
for stream, id := range sr.opts.Streams {
sr.streams = append(sr.streams, stream)
if id != nil {
sr.ids[stream] = id.String()
} else if sr.cmd == "XREAD" {
sr.ids[stream] = "$"
} else if sr.cmd == "XREADGROUP" {
sr.ids[stream] = ">"
}
}
// set to nil so we don't accidentally use it later, since the user could have changed
// the map after using the reader.
sr.opts.Streams = nil
sr.fixedArgs = append(sr.fixedArgs, "STREAMS")
sr.fixedArgs = append(sr.fixedArgs, sr.streams...)
// preallocate space for all arguments passed to Cmd
sr.args = make([]string, 0, len(sr.fixedArgs)+len(sr.streams))
return sr
}
// streamReader implements the StreamReader interface.
type streamReader struct {
c Client
opts StreamReaderOpts // copy of the options given to NewStreamReader with Streams == nil
streams []string
ids map[string]string
fallbackToUndelivered bool
cmd string // command. either XREAD or XREADGROUP
fixedArgs []string // fixed arguments that always come directly after the command
args []string // arguments passed to Cmd. reused between calls to Next to avoid allocations.
unread []StreamEntries
err error
}
func (sr *streamReader) backfill() bool {
sr.args = append(sr.args[:0], sr.fixedArgs...)
for _, s := range sr.streams {
sr.args = append(sr.args, sr.ids[s])
}
if sr.err = sr.c.Do(Cmd(&sr.unread, sr.cmd, sr.args...)); sr.err != nil {
return false
}
return true
}
// Err implements the StreamReader interface.
func (sr *streamReader) Err() error {
return sr.err
}
func (sr *streamReader) nextFromBuffer() (stream string, entries []StreamEntry) {
for len(sr.unread) > 0 {
sre := sr.unread[len(sr.unread)-1]
sr.unread = sr.unread[:len(sr.unread)-1]
stream = sre.Stream
// entries can be empty if we are using XREADGROUP and reading unacknowledged entries.
if len(sre.Entries) == 0 {
if sr.fallbackToUndelivered && sr.cmd == "XREADGROUP" && sr.ids[stream] != ">" {
sr.ids[stream] = ">"
}
continue
}
// do not update the ID for XREADGROUP when we are not reading unacknowledged entries.
if sr.cmd == "XREAD" || (sr.cmd == "XREADGROUP" && sr.ids[stream] != ">") {
sr.ids[stream] = sre.Entries[len(sre.Entries)-1].ID.String()
}
return stream, sre.Entries
}
return "", nil
}
// Next implements the StreamReader interface.
func (sr *streamReader) Next() (stream string, entries []StreamEntry, ok bool) {
if sr.err != nil {
return "", nil, false
}
if stream, entries = sr.nextFromBuffer(); stream != "" {
return stream, entries, true
}
if !sr.backfill() {
return "", nil, false
}
if stream, entries = sr.nextFromBuffer(); stream != "" {
return stream, entries, true
}
return "", nil, true
}