Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: RabbitMQ logging #414

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions init_lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func InitSingleLNDClient(c *service.Config, ctx context.Context) (result lnd.Lig
CertFile: c.LNDCertFile,
CertHex: c.LNDCertHex,
}, ctx)
if err != nil {
return nil, err
}
getInfo, err := client.GetInfo(ctx, &lnrpc.GetInfoRequest{})
if err != nil {
return nil, err
Expand Down
118 changes: 95 additions & 23 deletions rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
return err
}

client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))

client.logger.Info("Starting payment finalizer rabbitmq consumer")

client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"num_pending_payments": len(pendingInvoices),
"message": "starting payment finalizer loop",
})
for {
select {
case <-ctx.Done():
Expand All @@ -182,6 +183,10 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv

err := json.Unmarshal(delivery.Body, &payment)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"message": "error unmarshalling payment json",
})
delivery.Nack(false, false)

continue
Expand All @@ -191,36 +196,60 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
if invoice, ok := pendingInvoices[payment.PaymentHash]; ok {
t, err := svc.GetTransactionEntryByInvoiceId(ctx, invoice.ID)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "error fetching transaction entry by id",
})
delivery.Nack(false, false)

continue
}
client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "updating payment",
})

switch payment.Status {
case lnrpc.Payment_SUCCEEDED:
invoice.Fee = payment.FeeSat
invoice.Preimage = payment.PaymentPreimage

if err = svc.HandleSuccessfulPayment(ctx, &invoice, t); err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "error handling succesful payment",
})
delivery.Nack(false, false)

continue
}

client.logger.Infof("Payment finalizer: updated successful payment with hash: %s", payment.PaymentHash)
client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"message": "updated succesful payment",
"payment_hash": payment.PaymentHash,
})
delete(pendingInvoices, payment.PaymentHash)

case lnrpc.Payment_FAILED:
if err = svc.HandleFailedPayment(ctx, &invoice, t, fmt.Errorf(payment.FailureReason.String())); err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"message": "error handling failed payment",
"payment_hash": invoice.RHash,
})
delivery.Nack(false, false)

continue
}

client.logger.Infof("Payment finalizer: updated failed payment with hash: %s", payment.PaymentHash)
client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"message": "updated failed payment",
"payment_hash": payment.PaymentHash,
})
delete(pendingInvoices, payment.PaymentHash)
}
}
Expand All @@ -235,7 +264,10 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
return err
}

client.logger.Info("Starting RabbitMQ invoice consumer loop")
client.logger.Infoj(log.JSON{
"subroutine": "invoice consumer",
"message": "starting loop",
})
for {
select {
case <-ctx.Done():
Expand All @@ -249,37 +281,61 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler

err := json.Unmarshal(delivery.Body, &invoice)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error unmarshalling invoice json",
})

// If we can't even Unmarshall the message we are dealing with
// badly formatted events. In that case we simply Nack the message
// and explicitly do not requeue it.
err = delivery.Nack(false, false)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error nacking invoice",
"payment_hash": invoice.RHash,
})
}

continue
}
log.Infoj(log.JSON{
"subroutine": "invoice consumer",
"message": "adding invoice",
"payment_hash": invoice.RHash,
})

err = handler(ctx, &invoice)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error handling invoice",
"payment_hash": invoice.RHash,
})

// If for some reason we can't handle the message we also don't requeue
// because this can lead to an endless loop that puts pressure on the
// database and logs.
err := delivery.Nack(false, false)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error nacking event",
"payment_hash": invoice.RHash,
})
}

continue
}

err = delivery.Ack(false)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error acking event",
"payment_hash": invoice.RHash,
})
}
}
}
Expand All @@ -305,7 +361,10 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
return err
}

client.logger.Info("Starting rabbitmq publisher")
client.logger.Infoj(log.JSON{
"subroutine": "invoice publisher",
"message": "starting publisher",
})

in, out, err := invoicesSubscribeFunc()
if err != nil {
Expand All @@ -320,13 +379,21 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
err = client.publishToLndhubExchange(ctx, incomingInvoice, payloadFunc)

if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error publishing invoice",
"payment_hash": incomingInvoice.RHash,
})
}
case outgoing := <-out:
err = client.publishToLndhubExchange(ctx, outgoing, payloadFunc)

if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error publishing invoice",
"payment_hash": outgoing.RHash,
})
}
}
}
Expand All @@ -352,16 +419,21 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic
},
)
if err != nil {
captureErr(client.logger, err)
return err
}

client.logger.Debugf("Successfully published invoice to rabbitmq with RHash %s", invoice.RHash)
client.logger.Infoj(log.JSON{
"subroutine": "invoice publisher",
"message": "succesfully published invoice",
"payment_hash": invoice.RHash,
"rabbitmq_routing_key": key,
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is some more metadata helpful like the key (and exchange?)
the key is computed it might be interesting to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exchange is always the same here but key can be added.


return nil
}

func captureErr(logger *lecho.Logger, err error) {
logger.Error(err)
func captureErr(logger *lecho.Logger, err error, j log.JSON) {
j["error"] = err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this adding to the logging? the error message?

logger.Errorj(j)
sentry.CaptureException(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I currently look at the sentry stack traces it does not give much information and it does not contain the original stack trace but links to this CaptureException error

I read here something about a different erros package and wrapping an error to keep the stack trace?
https://incident.io/blog/golang-errors
does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go stacktraces inherently require more work, yes. So that would be something to add later.

}
Loading