Skip to content

Commit

Permalink
correctly initialize messages chan when subscribing to channel messag…
Browse files Browse the repository at this point in the history
…es, add rmq subscriber
  • Loading branch information
ramin committed May 17, 2024
1 parent 612da12 commit 12a582f
Showing 1 changed file with 14 additions and 29 deletions.
43 changes: 14 additions & 29 deletions events/subscribers/rmq/rmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type RMQSubscriber struct {
connection *amqp.Connection
channel *amqp.Channel
queueName string

Messages chan interface{}
}

func NewSubscriber() (*RMQSubscriber, error) {
Expand Down Expand Up @@ -36,15 +38,17 @@ func NewSubscriber() (*RMQSubscriber, error) {
return nil, err
}

msgs := make(chan interface{})

return &RMQSubscriber{
connection: conn,
channel: ch,
queueName: queueName,
Messages: msgs,
}, nil
}

func (r *RMQSubscriber) Start() chan bool {
// Start consuming messages
msgs, err := r.channel.Consume(
r.queueName,
"",
Expand All @@ -61,42 +65,23 @@ func (r *RMQSubscriber) Start() chan bool {
done := make(chan bool)
go func() {
for d := range msgs {
// Process message
// Placeholder: Print message to console
println("Received message: ", string(d.Body))
if r.Messages == nil {
close(done)
break
}
r.Messages <- d.Body
}
done <- true
}()
return done
}

func (r *RMQSubscriber) Listen() <-chan interface{} {
return r.Messages
}

func (r *RMQSubscriber) Detach() error {
if err := r.channel.Close(); err != nil {
return err
}
return r.connection.Close()
}

func (r *RMQSubscriber) Listen() <-chan interface{} {
msgs, err := r.channel.Consume(
r.queueName,
"",
config.Read().RabbitMQ.AutoAck,
config.Read().RabbitMQ.Exclusive,
false,
config.Read().RabbitMQ.NoWait,
nil,
)
if err != nil {
panic(err)
}

output := make(chan interface{})
go func() {
for d := range msgs {
output <- d.Body
}
close(output)
}()
return output
}

0 comments on commit 12a582f

Please sign in to comment.