Skip to content

Commit

Permalink
add subroutine fields
Browse files Browse the repository at this point in the history
  • Loading branch information
kiwiidb committed Sep 7, 2023
1 parent dee42bb commit 6202547
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
err := json.Unmarshal(delivery.Body, &payment)
if err != nil {
captureErr(client.logger, err, log.JSON{
"message": "error unmarshalling payment json",
"subroutine": "payment finalizer",
"message": "error unmarshalling payment json",
})
delivery.Nack(false, false)

Expand All @@ -196,6 +197,7 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
t, err := svc.GetTransactionEntryByInvoiceId(ctx, invoice.ID)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "error fetching transaction entry by id",
})
Expand All @@ -211,6 +213,7 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv

if err = svc.HandleSuccessfulPayment(ctx, &invoice, t); err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"payment_hash": invoice.RHash,
"message": "error handling succesful payment",
})
Expand All @@ -225,6 +228,7 @@ func (client *DefaultClient) FinalizeInitializedPayments(ctx context.Context, sv
case lnrpc.Payment_FAILED:
if err = svc.HandleFailedPayment(ctx, &invoice, t, fmt.Errorf(payment.FailureReason.String())); err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "payment finalizer",
"message": "error handling failed payment",
"payment_hash": invoice.RHash,
})
Expand All @@ -248,7 +252,10 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
return err
}

client.logger.Info("Starting RabbitMQ invoice consumer loop")
client.logger.Infoj(log.JSON{
"subroutine": "invoice consumer",
"message": "starting loop",
})
for {
select {
case <-ctx.Done():
Expand All @@ -263,7 +270,8 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
err := json.Unmarshal(delivery.Body, &invoice)
if err != nil {
captureErr(client.logger, err, log.JSON{
"message": "error unmarshalling invoice json",
"subroutine": "invoice consumer",
"message": "error unmarshalling invoice json",
})

// If we can't even Unmarshall the message we are dealing with
Expand All @@ -272,6 +280,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
err = delivery.Nack(false, false)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error nacking invoice",
"payment_hash": invoice.RHash,
})
Expand All @@ -283,6 +292,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
err = handler(ctx, &invoice)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error handling invoice",
"payment_hash": invoice.RHash,
})
Expand All @@ -293,6 +303,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
err := delivery.Nack(false, false)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error nacking event",
"payment_hash": invoice.RHash,
})
Expand All @@ -304,6 +315,7 @@ func (client *DefaultClient) SubscribeToLndInvoices(ctx context.Context, handler
err = delivery.Ack(false)
if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error acking event",
"payment_hash": invoice.RHash,
})
Expand Down Expand Up @@ -332,7 +344,10 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS
return err
}

client.logger.Info("Starting rabbitmq publisher")
client.logger.Infoj(log.JSON{
"subroutine": "invoice publisher",
"message": "starting publisher",
})

in, out, err := invoicesSubscribeFunc()
if err != nil {
Expand All @@ -348,6 +363,7 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS

if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error publishing invoice",
"payment_hash": incomingInvoice.RHash,
})
Expand All @@ -357,6 +373,7 @@ func (client *DefaultClient) StartPublishInvoices(ctx context.Context, invoicesS

if err != nil {
captureErr(client.logger, err, log.JSON{
"subroutine": "invoice consumer",
"message": "error publishing invoice",
"payment_hash": outgoing.RHash,
})
Expand Down Expand Up @@ -388,7 +405,11 @@ func (client *DefaultClient) publishToLndhubExchange(ctx context.Context, invoic
return err
}

client.logger.Debugf("Successfully published invoice to rabbitmq with RHash %s", invoice.RHash)
client.logger.Infoj(log.JSON{
"subroutine": "invoice publisher",
"message": "succesfully published invoice",
"payment_hash": invoice.RHash,
})

return nil
}
Expand Down

0 comments on commit 6202547

Please sign in to comment.