From fb2b748148c7bc19b2b98da7af3135e9a7d5b778 Mon Sep 17 00:00:00 2001 From: Yehor Datsenko Date: Mon, 5 Feb 2024 15:20:10 +0200 Subject: [PATCH] hotfix rabbit losses connection --- app/app.go | 1 + app/rabbit_listener.go | 173 +++++++++++++++++++++++------------------ 2 files changed, 97 insertions(+), 77 deletions(-) diff --git a/app/app.go b/app/app.go index 47d08ec..8f778d8 100644 --- a/app/app.go +++ b/app/app.go @@ -215,6 +215,7 @@ func (a *App) Stop() errors.AppError { a.serviceDiscovery.Shutdown() } a.storage.Close() + a.rabbit.Stop() return nil } diff --git a/app/rabbit_listener.go b/app/rabbit_listener.go index ba6918d..bbd3225 100644 --- a/app/rabbit_listener.go +++ b/app/rabbit_listener.go @@ -17,28 +17,15 @@ import ( type HandleFunc func(context.Context, *amqp.Delivery) errors.AppError type RabbitListener struct { - config *model.RabbitConfig - handleFunc HandleFunc - connection *amqp.Connection - channel *amqp.Channel - exit chan errors.AppError + config *model.RabbitConfig + handleFunc HandleFunc + connection *amqp.Connection + channel *amqp.Channel + delivery <-chan amqp.Delivery + amqpCloseNotifier chan *amqp.Error + exit chan errors.AppError } -//func BuildAndServe(app *App, config *model.RabbitConfig, errChan chan errors.AppError) { -// handler, err := NewHandler(app) -// if err != nil { -// errChan <- err -// return -// } -// listener, err := NewListener(config, errChan) -// if err != nil { -// errChan <- err -// return -// } -// -// listener.Start() -//} - func BuildRabbit(app *App, config *model.RabbitConfig, errChan chan errors.AppError) (*RabbitListener, errors.AppError) { handler, err := NewHandler(app) if err != nil { @@ -52,28 +39,88 @@ func NewListener(config *model.RabbitConfig, f HandleFunc, errChan chan errors.A if config == nil { return nil, errors.NewInternalError("rabbit.listener.new_rabit_listener.arguments_check.config_nil", "rabbit config is nil") } - return &RabbitListener{config: config, handleFunc: f, exit: errChan}, nil + amqpErrorChan := make(chan *amqp.Error) + return &RabbitListener{config: config, handleFunc: f, exit: errChan, amqpCloseNotifier: amqpErrorChan}, nil } func (l *RabbitListener) Stop() { - l.channel.Cancel("", true) + l.channel.Close() + l.connection.Close() } func (l *RabbitListener) Start() { + var ( + err error + appErr errors.AppError + ) + appErr = l.connect() + if appErr != nil { + l.exit <- appErr + } + + go func() { + for { + <-l.amqpCloseNotifier + wlog.Info(fmtBrokerLog("connection closed.. trying to reconnect")) + err := l.reconnect() + if err != nil { + wlog.Info(fmtBrokerLog(err.Error())) + } + } + }() + //var forever chan struct{} + go func() { + var ( + message amqp.Delivery + appErr errors.AppError + ) + wlog.Info(fmtBrokerLog("waiting for the messages..")) + for message = range l.delivery { + wlog.Info(fmtBrokerLog("message received")) + appErr = l.handleFunc(context.Background(), &message) + if appErr != nil { + if checkNoRows(appErr) { // TODO: real foreign key check by postgres errors + wlog.Debug(fmtBrokerLog(fmt.Sprintf("error processing message, foreign key error, skipping message.. error: %s", appErr.Error()))) + appErr = nil + err = message.Ack(false) + if err != nil { + appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error()) + } + } else { + wlog.Debug(fmtBrokerLog(fmt.Sprintf("error while processing the messasge! nacking.. error: %s", appErr.Error()))) + message.Nack(false, true) + } + wlog.Info(fmtBrokerLog("message processed with errors!")) + } else { + wlog.Info(fmtBrokerLog("message processed! acknowledging.. ")) + err = message.Ack(false) + if err != nil { + appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error()) + } + } + if appErr != nil { + l.exit <- appErr + break + } + } + }() + <-l.exit + +} + +func (l *RabbitListener) connect() errors.AppError { conn, err := amqp.Dial(l.config.Url) if err != nil { - l.exit <- errors.NewInternalError("rabbit.listener.listen.server_connect.fail", err.Error()) - return + return errors.NewInternalError("rabbit.listener.listen.server_connect.fail", err.Error()) } l.connection = conn - defer conn.Close() channel, err := conn.Channel() if err != nil { - l.exit <- errors.NewInternalError("rabbit.listener.listen.channel_connect.fail", err.Error()) - return + return errors.NewInternalError("rabbit.listener.listen.channel_connect.fail", err.Error()) } - defer channel.Close() - wlog.Info("connecting to the exchange") + l.channel = channel + l.channel.NotifyClose(l.amqpCloseNotifier) + wlog.Info(fmtBrokerLog("connecting to the exchange")) err = channel.ExchangeDeclare( "logger", // name "topic", // type @@ -84,11 +131,10 @@ func (l *RabbitListener) Start() { nil, // arguments ) if err != nil { - l.exit <- errors.NewInternalError("rabbit.listener.listen.exchange_declare.fail", err.Error()) - return + return errors.NewInternalError("rabbit.listener.listen.exchange_declare.fail", err.Error()) } // queue, err := channel.QueueDeclarePassive( - wlog.Info("connecting or creating a queue 'logger.service'") + wlog.Info(fmtBrokerLog("connecting or creating a queue 'logger.service'")) queue, err := channel.QueueDeclare( "logger.service", true, @@ -98,10 +144,9 @@ func (l *RabbitListener) Start() { nil, ) if err != nil { - l.exit <- errors.NewInternalError("rabbit.listener.listen.queue_declare.fail", err.Error()) - return + return errors.NewInternalError("rabbit.listener.listen.queue_declare.fail", err.Error()) } - wlog.Info("binding queue..") + wlog.Info(fmtBrokerLog("binding queue..")) err = channel.QueueBind( queue.Name, // queue name "logger.#", // routing key @@ -110,14 +155,13 @@ func (l *RabbitListener) Start() { nil, ) if err != nil { - l.exit <- errors.NewInternalError("rabbit.listener.listen.queue_bind.fail", err.Error()) - return + return errors.NewInternalError("rabbit.listener.listen.queue_bind.fail", err.Error()) } err = channel.Qos(1, 0, false) if err != nil { log.Fatalf("basic.qos: %v", err) } - delivery, err := channel.Consume( + del, err := channel.Consume( queue.Name, // queue "", // consumer false, // auto-ack @@ -127,49 +171,24 @@ func (l *RabbitListener) Start() { nil, // args ) if err != nil { - l.exit <- errors.NewInternalError("rabbit.listener.listen.start_consuming.fail", err.Error()) - return + return errors.NewInternalError("rabbit.listener.listen.start_consuming.fail", err.Error()) } - //var forever chan struct{} - go func() { - var ( - message amqp.Delivery - appErr errors.AppError - ) - wlog.Info("waiting for the messages..") - for message = range delivery { - wlog.Info("message received") - appErr = l.handleFunc(context.Background(), &message) - if appErr != nil { - if checkNoRows(appErr) { - wlog.Debug(fmt.Sprintf("error processing message, foreign key error, skipping message.. error: %s", appErr.Error())) - appErr = nil - err = message.Ack(false) - if err != nil { - appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error()) - } - } else { - wlog.Debug(fmt.Sprintf("error while processing the messasge! nacking.. error: %s", appErr.Error())) - message.Nack(false, true) - } - wlog.Info("message processed with errors!") - } else { - wlog.Info("message processed! acknowledging.. ") - err = message.Ack(false) - if err != nil { - appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error()) - } - } - if appErr != nil { - l.exit <- appErr - break - } - } - }() - <-l.exit + l.delivery = del + return nil +} +func (l *RabbitListener) reconnect() errors.AppError { + err := l.connect() + if err != nil { + return err + } + return nil } func checkNoRows(err errors.AppError) bool { return strings.Contains(err.Error(), sql.ErrNoRows.Error()) } + +func fmtBrokerLog(description string) string { + return fmt.Sprintf("broker: %s", description) +}