From b2e10264ba927d09ed47c30362b48b5877d69ca8 Mon Sep 17 00:00:00 2001 From: Sarat Chandra Date: Thu, 14 Mar 2024 10:52:49 +0530 Subject: [PATCH] fix: Initialize queue in in-memory broker if the queue isn't found in consume. --- brokers/in-memory/broker.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/brokers/in-memory/broker.go b/brokers/in-memory/broker.go index 524eacb..1ebaabe 100644 --- a/brokers/in-memory/broker.go +++ b/brokers/in-memory/broker.go @@ -24,9 +24,18 @@ func New() *Broker { func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) { r.mu.RLock() - ch := r.queues[queue] + ch, ok := r.queues[queue] r.mu.RUnlock() + // If the queue isn't found, make a queue. + if !ok { + ch = make(chan []byte, 100) + r.mu.Lock() + r.queues[queue] = ch + r.mu.Unlock() + + } + for { select { case <-ctx.Done():