From 12a582f1ef68c5990e1569443fc51511b876a29d Mon Sep 17 00:00:00 2001 From: ramin Date: Fri, 17 May 2024 22:46:03 +0100 Subject: [PATCH] correctly initialize messages chan when subscribing to channel messages, add rmq subscriber --- events/subscribers/rmq/rmq.go | 43 ++++++++++++----------------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/events/subscribers/rmq/rmq.go b/events/subscribers/rmq/rmq.go index 72860c4..b02df64 100644 --- a/events/subscribers/rmq/rmq.go +++ b/events/subscribers/rmq/rmq.go @@ -9,6 +9,8 @@ type RMQSubscriber struct { connection *amqp.Connection channel *amqp.Channel queueName string + + Messages chan interface{} } func NewSubscriber() (*RMQSubscriber, error) { @@ -36,15 +38,17 @@ func NewSubscriber() (*RMQSubscriber, error) { return nil, err } + msgs := make(chan interface{}) + return &RMQSubscriber{ connection: conn, channel: ch, queueName: queueName, + Messages: msgs, }, nil } func (r *RMQSubscriber) Start() chan bool { - // Start consuming messages msgs, err := r.channel.Consume( r.queueName, "", @@ -61,42 +65,23 @@ func (r *RMQSubscriber) Start() chan bool { done := make(chan bool) go func() { for d := range msgs { - // Process message - // Placeholder: Print message to console - println("Received message: ", string(d.Body)) + if r.Messages == nil { + close(done) + break + } + r.Messages <- d.Body } - done <- true }() return done } +func (r *RMQSubscriber) Listen() <-chan interface{} { + return r.Messages +} + func (r *RMQSubscriber) Detach() error { if err := r.channel.Close(); err != nil { return err } return r.connection.Close() } - -func (r *RMQSubscriber) Listen() <-chan interface{} { - msgs, err := r.channel.Consume( - r.queueName, - "", - config.Read().RabbitMQ.AutoAck, - config.Read().RabbitMQ.Exclusive, - false, - config.Read().RabbitMQ.NoWait, - nil, - ) - if err != nil { - panic(err) - } - - output := make(chan interface{}) - go func() { - for d := range msgs { - output <- d.Body - } - close(output) - }() - return output -}