Skip to content

Commit

Permalink
feat: parallel entry allocation
Browse files Browse the repository at this point in the history
Signed-off-by: OxalisCu <[email protected]>
  • Loading branch information
OxalisCu committed Oct 14, 2024
1 parent c9c5c24 commit 3bb3299
Showing 1 changed file with 17 additions and 26 deletions.
43 changes: 17 additions & 26 deletions internal/writer/redis_cluster_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,35 +70,26 @@ func (r *RedisClusterWriter) StartWrite(ctx context.Context) chan *entry.Entry {
chs[stat.Name] = w.StartWrite(ctx)
}

r.chWg = sync.WaitGroup{}
r.chWg.Add(1)
go func() {
for entry := range r.ch {
if len(entry.Slots) == 0 {
for _, writer := range r.writers {
writer.Write(entry)
}
continue
}
lastSlot := -1
for _, slot := range entry.Slots {
if lastSlot == -1 {
lastSlot = slot
}
if slot != lastSlot {
log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
}
}
r.router[lastSlot].Write(entry)
}
r.chWg.Done()
}()

return r.ch
return nil
}

func (r *RedisClusterWriter) Write(entry *entry.Entry) {
r.ch <- entry
if len(entry.Slots) == 0 {
for _, writer := range r.writers {
writer.Write(entry)
}
return
}
lastSlot := -1
for _, slot := range entry.Slots {
if lastSlot == -1 {
lastSlot = slot
}
if slot != lastSlot {
log.Panicf("CROSSSLOT Keys in request don't hash to the same slot. argv=%v", entry.Argv)
}
}
r.router[lastSlot].Write(entry)
}

func (r *RedisClusterWriter) Consistent() bool {
Expand Down

0 comments on commit 3bb3299

Please sign in to comment.