Skip to content

Commit

Permalink
hotfix rabbit losses connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Yehor Datsenko committed Feb 5, 2024
1 parent 246755d commit fb2b748
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 77 deletions.
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (a *App) Stop() errors.AppError {
a.serviceDiscovery.Shutdown()
}
a.storage.Close()
a.rabbit.Stop()

return nil
}
Expand Down
173 changes: 96 additions & 77 deletions app/rabbit_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,15 @@ import (
type HandleFunc func(context.Context, *amqp.Delivery) errors.AppError

type RabbitListener struct {
config *model.RabbitConfig
handleFunc HandleFunc
connection *amqp.Connection
channel *amqp.Channel
exit chan errors.AppError
config *model.RabbitConfig
handleFunc HandleFunc
connection *amqp.Connection
channel *amqp.Channel
delivery <-chan amqp.Delivery
amqpCloseNotifier chan *amqp.Error
exit chan errors.AppError
}

//func BuildAndServe(app *App, config *model.RabbitConfig, errChan chan errors.AppError) {
// handler, err := NewHandler(app)
// if err != nil {
// errChan <- err
// return
// }
// listener, err := NewListener(config, errChan)
// if err != nil {
// errChan <- err
// return
// }
//
// listener.Start()
//}

func BuildRabbit(app *App, config *model.RabbitConfig, errChan chan errors.AppError) (*RabbitListener, errors.AppError) {
handler, err := NewHandler(app)
if err != nil {
Expand All @@ -52,28 +39,88 @@ func NewListener(config *model.RabbitConfig, f HandleFunc, errChan chan errors.A
if config == nil {
return nil, errors.NewInternalError("rabbit.listener.new_rabit_listener.arguments_check.config_nil", "rabbit config is nil")
}
return &RabbitListener{config: config, handleFunc: f, exit: errChan}, nil
amqpErrorChan := make(chan *amqp.Error)
return &RabbitListener{config: config, handleFunc: f, exit: errChan, amqpCloseNotifier: amqpErrorChan}, nil
}

func (l *RabbitListener) Stop() {
l.channel.Cancel("", true)
l.channel.Close()
l.connection.Close()
}

func (l *RabbitListener) Start() {
var (
err error
appErr errors.AppError
)
appErr = l.connect()
if appErr != nil {
l.exit <- appErr
}

go func() {
for {
<-l.amqpCloseNotifier
wlog.Info(fmtBrokerLog("connection closed.. trying to reconnect"))
err := l.reconnect()
if err != nil {
wlog.Info(fmtBrokerLog(err.Error()))
}
}
}()
//var forever chan struct{}
go func() {
var (
message amqp.Delivery
appErr errors.AppError
)
wlog.Info(fmtBrokerLog("waiting for the messages.."))
for message = range l.delivery {
wlog.Info(fmtBrokerLog("message received"))
appErr = l.handleFunc(context.Background(), &message)
if appErr != nil {
if checkNoRows(appErr) { // TODO: real foreign key check by postgres errors
wlog.Debug(fmtBrokerLog(fmt.Sprintf("error processing message, foreign key error, skipping message.. error: %s", appErr.Error())))
appErr = nil
err = message.Ack(false)
if err != nil {
appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error())
}
} else {
wlog.Debug(fmtBrokerLog(fmt.Sprintf("error while processing the messasge! nacking.. error: %s", appErr.Error())))
message.Nack(false, true)
}
wlog.Info(fmtBrokerLog("message processed with errors!"))
} else {
wlog.Info(fmtBrokerLog("message processed! acknowledging.. "))
err = message.Ack(false)
if err != nil {
appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error())
}
}
if appErr != nil {
l.exit <- appErr
break
}
}
}()
<-l.exit

}

func (l *RabbitListener) connect() errors.AppError {
conn, err := amqp.Dial(l.config.Url)
if err != nil {
l.exit <- errors.NewInternalError("rabbit.listener.listen.server_connect.fail", err.Error())
return
return errors.NewInternalError("rabbit.listener.listen.server_connect.fail", err.Error())
}
l.connection = conn
defer conn.Close()
channel, err := conn.Channel()
if err != nil {
l.exit <- errors.NewInternalError("rabbit.listener.listen.channel_connect.fail", err.Error())
return
return errors.NewInternalError("rabbit.listener.listen.channel_connect.fail", err.Error())
}
defer channel.Close()
wlog.Info("connecting to the exchange")
l.channel = channel
l.channel.NotifyClose(l.amqpCloseNotifier)
wlog.Info(fmtBrokerLog("connecting to the exchange"))
err = channel.ExchangeDeclare(
"logger", // name
"topic", // type
Expand All @@ -84,11 +131,10 @@ func (l *RabbitListener) Start() {
nil, // arguments
)
if err != nil {
l.exit <- errors.NewInternalError("rabbit.listener.listen.exchange_declare.fail", err.Error())
return
return errors.NewInternalError("rabbit.listener.listen.exchange_declare.fail", err.Error())
}
// queue, err := channel.QueueDeclarePassive(
wlog.Info("connecting or creating a queue 'logger.service'")
wlog.Info(fmtBrokerLog("connecting or creating a queue 'logger.service'"))
queue, err := channel.QueueDeclare(
"logger.service",
true,
Expand All @@ -98,10 +144,9 @@ func (l *RabbitListener) Start() {
nil,
)
if err != nil {
l.exit <- errors.NewInternalError("rabbit.listener.listen.queue_declare.fail", err.Error())
return
return errors.NewInternalError("rabbit.listener.listen.queue_declare.fail", err.Error())
}
wlog.Info("binding queue..")
wlog.Info(fmtBrokerLog("binding queue.."))
err = channel.QueueBind(
queue.Name, // queue name
"logger.#", // routing key
Expand All @@ -110,14 +155,13 @@ func (l *RabbitListener) Start() {
nil,
)
if err != nil {
l.exit <- errors.NewInternalError("rabbit.listener.listen.queue_bind.fail", err.Error())
return
return errors.NewInternalError("rabbit.listener.listen.queue_bind.fail", err.Error())
}
err = channel.Qos(1, 0, false)
if err != nil {
log.Fatalf("basic.qos: %v", err)
}
delivery, err := channel.Consume(
del, err := channel.Consume(
queue.Name, // queue
"", // consumer
false, // auto-ack
Expand All @@ -127,49 +171,24 @@ func (l *RabbitListener) Start() {
nil, // args
)
if err != nil {
l.exit <- errors.NewInternalError("rabbit.listener.listen.start_consuming.fail", err.Error())
return
return errors.NewInternalError("rabbit.listener.listen.start_consuming.fail", err.Error())
}
//var forever chan struct{}
go func() {
var (
message amqp.Delivery
appErr errors.AppError
)
wlog.Info("waiting for the messages..")
for message = range delivery {
wlog.Info("message received")
appErr = l.handleFunc(context.Background(), &message)
if appErr != nil {
if checkNoRows(appErr) {
wlog.Debug(fmt.Sprintf("error processing message, foreign key error, skipping message.. error: %s", appErr.Error()))
appErr = nil
err = message.Ack(false)
if err != nil {
appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error())
}
} else {
wlog.Debug(fmt.Sprintf("error while processing the messasge! nacking.. error: %s", appErr.Error()))
message.Nack(false, true)
}
wlog.Info("message processed with errors!")
} else {
wlog.Info("message processed! acknowledging.. ")
err = message.Ack(false)
if err != nil {
appErr = errors.NewInternalError("rabbit.listener.listen.acknowledge.fail", err.Error())
}
}
if appErr != nil {
l.exit <- appErr
break
}
}
}()
<-l.exit
l.delivery = del
return nil
}

func (l *RabbitListener) reconnect() errors.AppError {
err := l.connect()
if err != nil {
return err
}
return nil
}

func checkNoRows(err errors.AppError) bool {
return strings.Contains(err.Error(), sql.ErrNoRows.Error())
}

func fmtBrokerLog(description string) string {
return fmt.Sprintf("broker: %s", description)
}

0 comments on commit fb2b748

Please sign in to comment.