Skip to content

Commit

Permalink
Merge pull request #16 from getAlby/fix/rabbitmq-confirms
Browse files Browse the repository at this point in the history
Fix/rabbitmq confirms
This now runs without any issues for 21h in production. Seems lgood to merge.
  • Loading branch information
kiwiidb authored Aug 11, 2023
2 parents e18c5b8 + b7bdae7 commit f3ce209
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 14 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/kelseyhightower/envconfig v1.4.0
github.com/lightningnetwork/lnd v0.16.0-beta.rc1
github.com/rabbitmq/amqp091-go v1.6.1
github.com/rabbitmq/amqp091-go v1.8.1
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
google.golang.org/grpc v1.52.3
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8u
github.com/prometheus/common v0.39.0/go.mod h1:6XBZ7lYdLCbkAVhwRsWTZn+IN5AB9F/NXd5w0BbEX0Y=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/rabbitmq/amqp091-go v1.6.1 h1:r6HybD9gOdWeUTP9TKIKdcAuFl4Va4p3OmWUUoeICAU=
github.com/rabbitmq/amqp091-go v1.6.1/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
Expand Down Expand Up @@ -559,8 +559,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -804,7 +804,6 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4=
golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k=
Expand Down
26 changes: 24 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package main

import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"time"

"github.com/getAlby/ln-event-publisher/lnd"
"github.com/getsentry/sentry-go"
Expand Down Expand Up @@ -70,18 +74,36 @@ func main() {
ctx, _ := signal.NotifyContext(backgroundCtx, os.Interrupt)

//start both subscriptions
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err = svc.startInvoiceSubscription(ctx)
if err != nil {
if err != nil && !strings.Contains(err.Error(), context.Canceled.Error()) {
logrus.Fatal(err)
}
logrus.Info("invoice routine done")
wg.Done()
}()
wg.Add(1)
go func() {
err = svc.startPaymentSubscription(ctx)
if err != nil {
if err != nil && !strings.Contains(err.Error(), context.Canceled.Error()) {
logrus.Fatal(err)
}
logrus.Info("payment routine done")
wg.Done()
}()
<-ctx.Done()
// start goroutine that will exit program after 10 seconds
// in case graceful shutdown fails
go func() {
time.Sleep(10 * time.Second)
nonGracefulShutdownErr := fmt.Errorf("non-graceful shutdown because of timeout")
sentry.CaptureException(nonGracefulShutdownErr)
logrus.Fatal(nonGracefulShutdownErr)

}()
//wait for goroutines to finish
wg.Wait()
logrus.Info("Exited gracefully. Goodbye.")
}
20 changes: 14 additions & 6 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (svc *Service) ProcessPayment(ctx context.Context, payment *lnrpc.Payment)
//if the payment was in the database as final then we already published it
//and we only publish completed payments
if notInflight && !alreadyPublished {
startTime := time.Now()
err := svc.PublishPayload(ctx, payment, LNDPaymentExchange, routingKey)
if err != nil {
logrus.WithFields(
Expand All @@ -314,9 +315,11 @@ func (svc *Service) ProcessPayment(ctx context.Context, payment *lnrpc.Payment)
}
logrus.WithFields(
logrus.Fields{
"payload_type": "payment",
"status": fmt.Sprintf("%s", payment.Status),
"payment_hash": payment.PaymentHash,
"payload_type": "payment",
"status": fmt.Sprintf("%s", payment.Status),
"rabbitmq_latency": time.Since(startTime).Seconds(),
"amount": payment.ValueSat,
"payment_hash": payment.PaymentHash,
}).Info("published payment")
}

Expand All @@ -325,6 +328,7 @@ func (svc *Service) ProcessPayment(ctx context.Context, payment *lnrpc.Payment)

func (svc *Service) ProcessInvoice(ctx context.Context, invoice *lnrpc.Invoice) error {
if invoice.State == lnrpc.Invoice_SETTLED {
startTime := time.Now()
err := svc.PublishPayload(ctx, invoice, LNDInvoiceExchange, LNDInvoiceRoutingKey)
if err != nil {
logrus.WithFields(
Expand All @@ -336,8 +340,13 @@ func (svc *Service) ProcessInvoice(ctx context.Context, invoice *lnrpc.Invoice)
}
logrus.WithFields(
logrus.Fields{
"payload_type": "invoice",
"payment_hash": hex.EncodeToString(invoice.RHash),
"payload_type": "invoice",
"rabbitmq_latency": time.Since(startTime).Seconds(),
"amount": invoice.AmtPaidSat,
"keysend": invoice.IsKeysend,
"add_index": invoice.AddIndex,
"settle_date": invoice.SettleDate,
"payment_hash": hex.EncodeToString(invoice.RHash),
}).Info("published invoice")
//add it to the database if we have one
if svc.db != nil {
Expand All @@ -356,7 +365,6 @@ func (svc *Service) PublishPayload(ctx context.Context, payload interface{}, exc

timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(svc.cfg.RabbitMQTimeoutSeconds)*time.Second)
defer cancel()
logrus.Info("Publishing message")
conf, err := svc.rabbitChannel.PublishWithDeferredConfirmWithContext(
timeoutCtx,
//todo from config
Expand Down

0 comments on commit f3ce209

Please sign in to comment.