Skip to content

Commit

Permalink
Merge pull request #414 from getAlby/chore/payment-finalizer-logging
Browse files Browse the repository at this point in the history
Chore: RabbitMQ logging
  • Loading branch information
kiwiidb authored Sep 11, 2023
2 parents 05824f5 + b929ca3 commit 7a5a778
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 23 deletions.
3 changes: 3 additions & 0 deletions init_lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func InitSingleLNDClient(c *service.Config, ctx context.Context) (result lnd.Lig
CertFile: c.LNDCertFile,
CertHex: c.LNDCertHex,
}, ctx)
if err != nil {
return nil, err
}
getInfo, err := client.GetInfo(ctx, &lnrpc.GetInfoRequest{})
if err != nil {
return nil, err
Expand Down
118 changes: 95 additions & 23 deletions rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
return err
}

client.logger.Infof("Payment finalizer: Found %d pending invoices", len(pendingInvoices))

client.logger.Info("Starting payment finalizer rabbitmq consumer")

client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"num_pending_payments": len(pendingInvoices),
"message": "starting payment finalizer loop",
})
for {
select {
case <-ctx.Done():
Expand All @@ -182,6 +183,10 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv

err := json.Unmarshal(delivery.Body, &payment)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"message": "error unmarshalling payment json",
})
delivery.Nack(false, false)

continue
Expand All @@ -191,36 +196,60 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
if invoice, ok := pendingInvoices[payment.PaymentHash]; ok {
t, err := svc.GetTransactionEntryByInvoiceId(ctx, invoice.ID)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "error fetching transaction entry by id",
})
delivery.Nack(false, false)

continue
}
client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "updating payment",
})

switch payment.Status {
case lnrpc.Payment_SUCCEEDED:
invoice.Fee = payment.FeeSat
invoice.Preimage = payment.PaymentPreimage

if err = svc.HandleSuccessfulPayment(ctx, &invoice, t); err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "error handling succesful payment",
})
delivery.Nack(false, false)

continue
}

client.logger.Infof("Payment finalizer: updated successful payment with hash: %s", payment.PaymentHash)
client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"message": "updated succesful payment",
"payment_hash": payment.PaymentHash,
})
delete(pendingInvoices, payment.PaymentHash)

case lnrpc.Payment_FAILED:
if err = svc.HandleFailedPayment(ctx, &invoice, t, fmt.Errorf(payment.FailureReason.String())); err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"message": "error handling failed payment",
"payment_hash": invoice.RHash,
})
delivery.Nack(false, false)

continue
}

client.logger.Infof("Payment finalizer: updated failed payment with hash: %s", payment.PaymentHash)
client.logger.Infoj(log.JSON{
"subroutine": "payment finalizer",
"message": "updated failed payment",
"payment_hash": payment.PaymentHash,
})
delete(pendingInvoices, payment.PaymentHash)
}
}
Expand All @@ -235,7 +264,10 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
return err
}

client.logger.Info("Starting RabbitMQ invoice consumer loop")
client.logger.Infoj(log.JSON{
"subroutine": "invoice consumer",
"message": "starting loop",
})
for {
select {
case <-ctx.Done():
Expand All @@ -249,37 +281,61 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler

err := json.Unmarshal(delivery.Body, &invoice)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error unmarshalling invoice json",
})

// If we can't even Unmarshall the message we are dealing with
// badly formatted events. In that case we simply Nack the message
// and explicitly do not requeue it.
err = delivery.Nack(false, false)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error nacking invoice",
"payment_hash": invoice.RHash,
})
}

continue
}
log.Infoj(log.JSON{
"subroutine": "invoice consumer",
"message": "adding invoice",
"payment_hash": invoice.RHash,
})

err = handler(ctx, &invoice)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error handling invoice",
"payment_hash": invoice.RHash,
})

// If for some reason we can't handle the message we also don't requeue
// because this can lead to an endless loop that puts pressure on the
// database and logs.
err := delivery.Nack(false, false)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error nacking event",
"payment_hash": invoice.RHash,
})
}

continue
}

err = delivery.Ack(false)
if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error acking event",
"payment_hash": invoice.RHash,
})
}
}
}
Expand All @@ -305,7 +361,10 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
return err
}

client.logger.Info("Starting rabbitmq publisher")
client.logger.Infoj(log.JSON{
"subroutine": "invoice publisher",
"message": "starting publisher",
})

in, out, err := invoicesSubscribeFunc()
if err != nil {
Expand All @@ -320,13 +379,21 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
err = client.publishToLndhubExchange(ctx, incomingInvoice, payloadFunc)

if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error publishing invoice",
"payment_hash": incomingInvoice.RHash,
})
}
case outgoing := <-out:
err = client.publishToLndhubExchange(ctx, outgoing, payloadFunc)

if err != nil {
captureErr(client.logger, err)
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error publishing invoice",
"payment_hash": outgoing.RHash,
})
}
}
}
Expand All @@ -352,16 +419,21 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic
},
)
if err != nil {
captureErr(client.logger, err)
return err
}

client.logger.Debugf("Successfully published invoice to rabbitmq with RHash %s", invoice.RHash)
client.logger.Infoj(log.JSON{
"subroutine": "invoice publisher",
"message": "succesfully published invoice",
"payment_hash": invoice.RHash,
"rabbitmq_routing_key": key,
})

return nil
}

func captureErr(logger *lecho.Logger, err error) {
logger.Error(err)
func captureErr(logger *lecho.Logger, err error, j log.JSON) {
j["error"] = err
logger.Errorj(j)
sentry.CaptureException(err)
}

0 comments on commit 7a5a778

Please sign in to comment.