diff --git a/Dockerfile b/Dockerfile index 42c3993..ef2923b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,10 +14,14 @@ COPY . . # Build the application RUN go build -o main +# Build the utility scripts +RUN go build ./cmd/republish-invoices + # Start a new, final image to reduce size. FROM alpine as final # Copy the binaries and entrypoint from the builder image. COPY --from=builder /build/main /bin/ +COPY --from=builder /build/republish-invoices /bin/ ENTRYPOINT [ "/bin/main" ] diff --git a/README.md b/README.md index 1bb8143..187b83b 100644 --- a/README.md +++ b/README.md @@ -22,4 +22,15 @@ Possible missed-while-offline outgoing payments are handled by looking up the ea - Routing key: `invoice.incoming.settled` # LND outgoing payments - Payload [lnrpc.Payment](https://github.com/lightningnetwork/lnd/blob/master/lnrpc/lightning.pb.go#L12612) -- Routing keys `payment.outgoing.settled`, `payment.outgoing.error` \ No newline at end of file +- Routing keys `payment.outgoing.settled`, `payment.outgoing.error` + +# Republish Invoices + +If you need to republish settled invoices to update state in lndhub, you can use the cmd/republish-invoices by providing all payment hashes separated by commas: +- "REPUBLISH_INVOICE_HASHES" : `,....` + +Use this in a job by setting: +``` +command: +- /bin/republish-invoices +``` diff --git a/check_invoice_test.go b/check_invoice_test.go index b402b65..196b6b0 100644 --- a/check_invoice_test.go +++ b/check_invoice_test.go @@ -1,6 +1,7 @@ package main import ( + "github.com/getAlby/ln-event-publisher/service" "testing" "github.com/lightningnetwork/lnd/lnrpc" @@ -9,7 +10,7 @@ import ( func TestCheckInvoice(t *testing.T) { //test non keysend - assert.True(t, shouldPublishInvoice(&lnrpc.Invoice{ + assert.True(t, service.ShouldPublishInvoice(&lnrpc.Invoice{ State: lnrpc.Invoice_SETTLED, IsKeysend: false, Htlcs: []*lnrpc.InvoiceHTLC{ @@ -28,7 +29,7 @@ func TestCheckInvoice(t *testing.T) { }, })) //test keysend with wallet id tlv - assert.True(t, shouldPublishInvoice(&lnrpc.Invoice{ + assert.True(t, service.ShouldPublishInvoice(&lnrpc.Invoice{ State: lnrpc.Invoice_SETTLED, IsKeysend: true, Htlcs: []*lnrpc.InvoiceHTLC{ @@ -50,7 +51,7 @@ func TestCheckInvoice(t *testing.T) { }, })) //test keysend without wallet id tlv - assert.False(t, shouldPublishInvoice(&lnrpc.Invoice{ + assert.False(t, service.ShouldPublishInvoice(&lnrpc.Invoice{ State: lnrpc.Invoice_SETTLED, IsKeysend: true, Htlcs: []*lnrpc.InvoiceHTLC{ diff --git a/cmd/republish-invoices/main.go b/cmd/republish-invoices/main.go new file mode 100644 index 0000000..db07810 --- /dev/null +++ b/cmd/republish-invoices/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "encoding/hex" + "github.com/getAlby/ln-event-publisher/config" + "github.com/getAlby/ln-event-publisher/lnd" + "github.com/getAlby/ln-event-publisher/service" + "github.com/getsentry/sentry-go" + "github.com/joho/godotenv" + "github.com/kelseyhightower/envconfig" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/sirupsen/logrus" + "os" + "os/signal" +) + +func main() { + c := &config.Config{} + logrus.SetFormatter(&logrus.JSONFormatter{}) + + // Load configruation from environment variables + err := godotenv.Load(".env") + if err != nil { + logrus.Warn("Failed to load .env file") + } + err = envconfig.Process("", c) + if err != nil { + logrus.Fatalf("Error loading environment variables: %v", err) + } + + // Setup exception tracking with Sentry if configured + if c.SentryDSN != "" { + if err = sentry.Init(sentry.ClientOptions{ + Dsn: c.SentryDSN, + }); err != nil { + logrus.Error(err) + } + } + client, err := lnd.NewLNDclient(lnd.LNDoptions{ + Address: c.LNDAddress, + MacaroonFile: c.LNDMacaroonFile, + CertFile: c.LNDCertFile, + }) + if err != nil { + sentry.CaptureException(err) + logrus.Fatalf("Error loading environment variables: %v", err) + } + resp, err := client.GetInfo(context.Background(), &lnrpc.GetInfoRequest{}) + if err != nil { + sentry.CaptureException(err) + logrus.Fatal(err) + } + logrus.Infof("Connected to LND: %s - %s", resp.Alias, resp.IdentityPubkey) + svc := &service.Service{ + Cfg: c, + Lnd: client, + } + err = svc.InitRabbitMq() + if err != nil { + sentry.CaptureException(err) + logrus.Fatal(err) + } + backgroundCtx := context.Background() + ctx, _ := signal.NotifyContext(backgroundCtx, os.Interrupt) + + for i := 0; i < len(c.RepublishInvoiceHashes); i++ { + hashBytes, err := hex.DecodeString(c.RepublishInvoiceHashes[i]) + if err != nil { + logrus.Error("Invalid Hash ", c.RepublishInvoiceHashes[i], " ", err) + continue + } + + // Create a PaymentHash struct + paymentHash := &lnrpc.PaymentHash{ + RHash: hashBytes, + } + svc.RepublishInvoice(ctx, paymentHash) + } +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..08f45b9 --- /dev/null +++ b/config/config.go @@ -0,0 +1,15 @@ +package config + +type Config struct { + LNDAddress string `envconfig:"LND_ADDRESS" required:"true"` + LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"` + LNDCertFile string `envconfig:"LND_CERT_FILE"` + DatabaseUri string `envconfig:"DATABASE_URI" required:"true"` + DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"` + DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"` + DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes + RabbitMQUri string `envconfig:"RABBITMQ_URI" required:"true"` + RabbitMQTimeoutSeconds int `envconfig:"RABBITMQ_TIMEOUT_SECONDS" default:"10"` + SentryDSN string `envconfig:"SENTRY_DSN"` + RepublishInvoiceHashes []string `envconfig:"REPUBLISH_INVOICE_HASHES"` +} diff --git a/db.go b/db/db.go similarity index 90% rename from db.go rename to db/db.go index b0aec17..d828a28 100644 --- a/db.go +++ b/db/db.go @@ -1,6 +1,7 @@ -package main +package db import ( + "github.com/getAlby/ln-event-publisher/config" "log" "os" "time" @@ -11,7 +12,7 @@ import ( "gorm.io/gorm/logger" ) -func OpenDB(config *Config) (db *gorm.DB, err error) { +func OpenDB(config *config.Config) (db *gorm.DB, err error) { //overwrite logger so we don't print warnings for slow sql //because we use db transactions that span the rabbitmq publish operation dbLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{ diff --git a/integration_test.go b/integration_test.go index f2d3e6e..e7c77b7 100644 --- a/integration_test.go +++ b/integration_test.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "encoding/json" + "github.com/getAlby/ln-event-publisher/config" + db2 "github.com/getAlby/ln-event-publisher/db" + "github.com/getAlby/ln-event-publisher/service" "os" "testing" "time" @@ -16,9 +19,9 @@ import ( "google.golang.org/grpc" ) -func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) (svc *Service, mlnd *MockLND, msgs <-chan amqp091.Delivery) { +func createTestService(t *testing.T, cfg *config.Config, exchange, routingKey string) (svc *service.Service, mlnd *MockLND, msgs <-chan amqp091.Delivery) { - svc = &Service{cfg: cfg} + svc = &service.Service{Cfg: cfg} mlnd = &MockLND{ Sub: &MockSubscribeInvoices{invoiceChan: make(chan *lnrpc.Invoice)}, PaymentSub: &MockSubscribePayments{ @@ -31,7 +34,7 @@ func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) ( assert.NoError(t, err) //sub to the rabbit exchange ourselves to test e2e - q, err := svc.rabbitChannel.QueueDeclare( + q, err := svc.RabbitChannel.QueueDeclare( "integration_test", true, false, @@ -40,18 +43,18 @@ func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) ( nil, ) assert.NoError(t, err) - err = svc.rabbitChannel.QueueBind(q.Name, routingKey, exchange, false, nil) + err = svc.RabbitChannel.QueueBind(q.Name, routingKey, exchange, false, nil) assert.NoError(t, err) // - init PG - db, err := OpenDB(cfg) + db, err := db2.OpenDB(cfg) assert.NoError(t, err) - svc.db = db - svc.lnd = mlnd + svc.Db = db + svc.Lnd = mlnd //init rabbit channel //consume channel to check that invoice was published - m, err := svc.rabbitChannel.Consume( + m, err := svc.RabbitChannel.Consume( q.Name, "", true, @@ -64,14 +67,14 @@ func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) ( return svc, mlnd, m } func TestInvoicePublish(t *testing.T) { - cfg := &Config{ + cfg := &config.Config{ DatabaseUri: os.Getenv("DATABASE_URI"), RabbitMQUri: os.Getenv("RABBITMQ_URI"), } - svc, mlnd, m := createTestService(t, cfg, LNDInvoiceExchange, LNDInvoiceRoutingKey) + svc, mlnd, m := createTestService(t, cfg, service.LNDInvoiceExchange, service.LNDInvoiceRoutingKey) ctx, cancel := context.WithCancel(context.Background()) go func() { - svc.startInvoiceSubscription(ctx) + svc.StartInvoiceSubscription(ctx) }() // - mock incoming invoice // the new invoice that will be saved will have addIndex + 1 @@ -90,21 +93,21 @@ func TestInvoicePublish(t *testing.T) { //stop service cancel() - svc.rabbitChannel.Close() + svc.RabbitChannel.Close() // - clean up database - svc.db.Exec("delete from invoices;") + svc.Db.Exec("delete from invoices;") } func TestPaymentPublish(t *testing.T) { - cfg := &Config{ + cfg := &config.Config{ DatabaseUri: os.Getenv("DATABASE_URI"), RabbitMQUri: os.Getenv("RABBITMQ_URI"), RabbitMQTimeoutSeconds: 1, } - svc, mlnd, m := createTestService(t, cfg, LNDPaymentExchange, "payment.outgoing.*") - defer svc.db.Exec("delete from payments;") + svc, mlnd, m := createTestService(t, cfg, service.LNDPaymentExchange, "payment.outgoing.*") + defer svc.Db.Exec("delete from payments;") ctx, cancel := context.WithCancel(context.Background()) go func() { - err := svc.startPaymentSubscription(ctx) + err := svc.StartPaymentSubscription(ctx) assert.EqualError(t, err, context.Canceled.Error()) }() // - mock outgoing payment @@ -170,7 +173,7 @@ func TestPaymentPublish(t *testing.T) { // - start service again, ctx, cancel2 := context.WithCancel(context.Background()) go func() { - err := svc.startPaymentSubscription(ctx) + err := svc.StartPaymentSubscription(ctx) assert.EqualError(t, err, context.Canceled.Error()) }() // test that all new updates are being published @@ -187,7 +190,7 @@ func TestPaymentPublish(t *testing.T) { timedOut, receivedPayment = timeoutOrNewPaymentFromRabbit(t, m) assert.True(t, timedOut) cancel2() - svc.rabbitChannel.Close() + svc.RabbitChannel.Close() } func timeoutOrNewPaymentFromRabbit(t *testing.T, m <-chan amqp091.Delivery) (timeout bool, payment *lnrpc.Payment) { @@ -321,3 +324,7 @@ func (mlnd *MockLND) ListPayments(ctx context.Context, req *lnrpc.ListPaymentsRe Payments: mlnd.ListPaymentsResponse, }, nil } + +func (mlnd *MockLND) LookupInvoice(ctx context.Context, req *lnrpc.PaymentHash, options ...grpc.CallOption) (*lnrpc.Invoice, error) { + panic("not implemented") // TODO: Implement +} diff --git a/lnd/interface.go b/lnd/interface.go index ad5f591..8521c9b 100644 --- a/lnd/interface.go +++ b/lnd/interface.go @@ -18,6 +18,7 @@ type LightningClientWrapper interface { GetInfo(ctx context.Context, req *lnrpc.GetInfoRequest, options ...grpc.CallOption) (*lnrpc.GetInfoResponse, error) DecodeBolt11(ctx context.Context, bolt11 string, options ...grpc.CallOption) (*lnrpc.PayReq, error) ListPayments(ctx context.Context, req *lnrpc.ListPaymentsRequest, options ...grpc.CallOption) (*lnrpc.ListPaymentsResponse, error) + LookupInvoice(ctx context.Context, req *lnrpc.PaymentHash, options ...grpc.CallOption) (*lnrpc.Invoice, error) } type SubscribeInvoicesWrapper interface { diff --git a/lnd/lnd.go b/lnd/lnd.go index af584db..f8655d5 100644 --- a/lnd/lnd.go +++ b/lnd/lnd.go @@ -139,3 +139,7 @@ func (wrapper *LNDWrapper) SubscribePayments(ctx context.Context, req *routerrpc func (wrapper *LNDWrapper) ListPayments(ctx context.Context, req *lnrpc.ListPaymentsRequest, options ...grpc.CallOption) (*lnrpc.ListPaymentsResponse, error) { return wrapper.client.ListPayments(ctx, req) } + +func (wrapper *LNDWrapper) LookupInvoice(ctx context.Context, req *lnrpc.PaymentHash, options ...grpc.CallOption) (*lnrpc.Invoice, error) { + return wrapper.client.LookupInvoice(ctx, req) +} diff --git a/main.go b/main.go index 13b8c67..0339f57 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,9 @@ package main import ( "context" "fmt" + "github.com/getAlby/ln-event-publisher/config" + "github.com/getAlby/ln-event-publisher/db" + "github.com/getAlby/ln-event-publisher/service" "os" "os/signal" "strings" @@ -18,7 +21,7 @@ import ( ) func main() { - c := &Config{} + c := &config.Config{} logrus.SetFormatter(&logrus.JSONFormatter{}) // Load configruation from environment variables @@ -55,15 +58,15 @@ func main() { } logrus.Infof("Connected to LND: %s - %s", resp.Alias, resp.IdentityPubkey) logrus.Info("Opening PG database") - db, err := OpenDB(c) + db, err := db.OpenDB(c) if err != nil { sentry.CaptureException(err) logrus.Fatal(err) } - svc := &Service{ - cfg: c, - lnd: client, - db: db, + svc := &service.Service{ + Cfg: c, + Lnd: client, + Db: db, } err = svc.InitRabbitMq() if err != nil { @@ -77,7 +80,7 @@ func main() { wg := sync.WaitGroup{} wg.Add(1) go func() { - err = svc.startInvoiceSubscription(ctx) + err = svc.StartInvoiceSubscription(ctx) if err != nil && !strings.Contains(err.Error(), context.Canceled.Error()) { logrus.Fatal(err) } @@ -86,7 +89,7 @@ func main() { }() wg.Add(1) go func() { - err = svc.startPaymentSubscription(ctx) + err = svc.StartPaymentSubscription(ctx) if err != nil && !strings.Contains(err.Error(), context.Canceled.Error()) { logrus.Fatal(err) } diff --git a/service.go b/service/service.go similarity index 79% rename from service.go rename to service/service.go index c0f786d..a1de3b4 100644 --- a/service.go +++ b/service/service.go @@ -1,4 +1,4 @@ -package main +package service import ( "bytes" @@ -6,6 +6,8 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/getAlby/ln-event-publisher/config" + "github.com/getAlby/ln-event-publisher/db" "time" "github.com/getAlby/ln-event-publisher/lnd" @@ -22,19 +24,6 @@ var codeMappings = map[lnrpc.Payment_PaymentStatus]string{ lnrpc.Payment_SUCCEEDED: LNDPaymentSuccessRoutingKey, } -type Config struct { - LNDAddress string `envconfig:"LND_ADDRESS" required:"true"` - LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"` - LNDCertFile string `envconfig:"LND_CERT_FILE"` - DatabaseUri string `envconfig:"DATABASE_URI" required:"true"` - DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"` - DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"` - DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes - RabbitMQUri string `envconfig:"RABBITMQ_URI" required:"true"` - RabbitMQTimeoutSeconds int `envconfig:"RABBITMQ_TIMEOUT_SECONDS" default:"10"` - SentryDSN string `envconfig:"SENTRY_DSN"` -} - const ( LNDInvoiceExchange = "lnd_invoice" LNDChannelExchange = "lnd_channel" @@ -47,14 +36,14 @@ const ( ) type Service struct { - cfg *Config - lnd lnd.LightningClientWrapper - rabbitChannel *amqp.Channel - db *gorm.DB + Cfg *config.Config + Lnd lnd.LightningClientWrapper + RabbitChannel *amqp.Channel + Db *gorm.DB } func (svc *Service) InitRabbitMq() (err error) { - conn, err := amqp.Dial(svc.cfg.RabbitMQUri) + conn, err := amqp.Dial(svc.Cfg.RabbitMQUri) if err != nil { return err } @@ -88,18 +77,18 @@ func (svc *Service) InitRabbitMq() (err error) { if err != nil { return err } - svc.rabbitChannel = ch + svc.RabbitChannel = ch // Put the channel in confirm mode - svc.rabbitChannel.Confirm(false) + svc.RabbitChannel.Confirm(false) return } func (svc *Service) lookupLastInvoiceIndex(ctx context.Context) (index uint64, err error) { - //get last invoice from db - inv := &Invoice{} - tx := svc.db.WithContext(ctx).Last(inv) + //get last invoice from Db + inv := &db.Invoice{} + tx := svc.Db.WithContext(ctx).Last(inv) if tx.Error != nil && tx.Error != gorm.ErrRecordNotFound { return 0, tx.Error } @@ -110,26 +99,26 @@ func (svc *Service) lookupLastInvoiceIndex(ctx context.Context) (index uint64, e } func (svc *Service) lookupLastPaymentTimestamp(ctx context.Context) (lastPaymentCreationTimeUnix int64, err error) { - //get the creation time in unix seconds of the earliest non-final payment in db + //get the creation time in unix seconds of the earliest non-final payment in Db //that is not older than 24h (to avoid putting too much stress on LND) //so we assume that we are never online for longer than 24h - //in case there are no non-final payments in the db, we get the last completed payment - firstInflightOrLastCompleted := &Payment{} - err = svc.db.Limit(1).Where(&Payment{ + //in case there are no non-final payments in the Db, we get the last completed payment + firstInflightOrLastCompleted := &db.Payment{} + err = svc.Db.Limit(1).Where(&db.Payment{ Status: lnrpc.Payment_IN_FLIGHT, }).Where("creation_time_ns > ?", time.Now().Add(-24*time.Hour).UnixNano()).Order("creation_time_ns ASC").First(firstInflightOrLastCompleted).Error if err != nil { if err == gorm.ErrRecordNotFound { //look up last completed payment that we have instead //and use that one. - err = svc.db.WithContext(ctx).Last(firstInflightOrLastCompleted).Error + err = svc.Db.WithContext(ctx).Last(firstInflightOrLastCompleted).Error if err != nil { if err == gorm.ErrRecordNotFound { - //if we get here there are no payment in the db: + //if we get here there are no payment in the Db: //first start, nothing found return 0, nil } - //real db error + //real Db error return 0, err } return firstInflightOrLastCompleted.CreationTimeNs / 1e9, nil @@ -144,14 +133,14 @@ func (svc *Service) lookupLastPaymentTimestamp(ctx context.Context) (lastPayment } func (svc *Service) AddLastPublishedInvoice(ctx context.Context, invoice *lnrpc.Invoice) error { - return svc.db.WithContext(ctx).Create(&Invoice{ + return svc.Db.WithContext(ctx).Create(&db.Invoice{ AddIndex: invoice.AddIndex, SettleIndex: invoice.SettleIndex, }).Error } func (svc *Service) StorePayment(ctx context.Context, tx *gorm.DB, payment *lnrpc.Payment) (alreadyProcessed bool, err error) { - toUpdate := &Payment{ + toUpdate := &db.Payment{ Model: gorm.Model{ ID: uint(payment.PaymentIndex), }, @@ -191,7 +180,7 @@ func (svc *Service) CheckPaymentsSinceLast(ctx context.Context) error { return nil } //make LND listpayments request starting from the first payment that we might have missed - paymentResponse, err := svc.lnd.ListPayments(ctx, &lnrpc.ListPaymentsRequest{ + paymentResponse, err := svc.Lnd.ListPayments(ctx, &lnrpc.ListPaymentsRequest{ //apparently LL considers a failed payment to be "incomplete" IncludeIncomplete: true, CreationDateStart: uint64(ts), @@ -218,8 +207,8 @@ func (svc *Service) CheckPaymentsSinceLast(ctx context.Context) error { return nil } -func (svc *Service) startPaymentSubscription(ctx context.Context) error { - paymentSub, err := svc.lnd.SubscribePayments(ctx, &routerrpc.TrackPaymentsRequest{}) +func (svc *Service) StartPaymentSubscription(ctx context.Context) error { + paymentSub, err := svc.Lnd.SubscribePayments(ctx, &routerrpc.TrackPaymentsRequest{}) if err != nil { sentry.CaptureException(err) return err @@ -253,12 +242,12 @@ func (svc *Service) startPaymentSubscription(ctx context.Context) error { } } -func (svc *Service) startInvoiceSubscription(ctx context.Context) error { +func (svc *Service) StartInvoiceSubscription(ctx context.Context) error { settleIndex, err := svc.lookupLastInvoiceIndex(ctx) if err != nil { return err } - invoiceSub, err := svc.lnd.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{ + invoiceSub, err := svc.Lnd.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{ SettleIndex: settleIndex, }) if err != nil { @@ -286,7 +275,7 @@ func (svc *Service) startInvoiceSubscription(ctx context.Context) error { } func (svc *Service) ProcessPayment(ctx context.Context, payment *lnrpc.Payment) error { - tx := svc.db.Begin() + tx := svc.Db.Begin() defer func() { if r := recover(); r != nil { tx.Rollback() @@ -329,7 +318,7 @@ func (svc *Service) ProcessPayment(ctx context.Context, payment *lnrpc.Payment) } func (svc *Service) ProcessInvoice(ctx context.Context, invoice *lnrpc.Invoice) error { - if shouldPublishInvoice(invoice) { + if ShouldPublishInvoice(invoice) { startTime := time.Now() err := svc.PublishPayload(ctx, invoice, LNDInvoiceExchange, LNDInvoiceRoutingKey) if err != nil { @@ -356,7 +345,7 @@ func (svc *Service) ProcessInvoice(ctx context.Context, invoice *lnrpc.Invoice) } // check if we need to publish an invoice -func shouldPublishInvoice(invoice *lnrpc.Invoice) (ok bool) { +func ShouldPublishInvoice(invoice *lnrpc.Invoice) (ok bool) { //don't publish unsettled invoice if invoice.State != lnrpc.Invoice_SETTLED { @@ -378,9 +367,9 @@ func (svc *Service) PublishPayload(ctx context.Context, payload interface{}, exc return err } - timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(svc.cfg.RabbitMQTimeoutSeconds)*time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(svc.Cfg.RabbitMQTimeoutSeconds)*time.Second) defer cancel() - conf, err := svc.rabbitChannel.PublishWithDeferredConfirmWithContext( + conf, err := svc.RabbitChannel.PublishWithDeferredConfirmWithContext( timeoutCtx, //todo from config exchange, key, false, false, amqp.Publishing{ @@ -399,3 +388,35 @@ func (svc *Service) PublishPayload(ctx context.Context, payload interface{}, exc return err } + +func (svc *Service) RepublishInvoice(ctx context.Context, paymentHash *lnrpc.PaymentHash) { + invoice, err := svc.Lnd.LookupInvoice(ctx, paymentHash) + if err != nil { + sentry.CaptureException(err) + logrus.Error("Invoice NOT FOUND ", paymentHash, err) + return + } + if ShouldPublishInvoice(invoice) { + startTime := time.Now() + err := svc.PublishPayload(ctx, invoice, LNDInvoiceExchange, LNDInvoiceRoutingKey) + if err != nil { + sentry.CaptureException(err) + logrus.WithFields( + logrus.Fields{ + "payload_type": "invoice", + "payment_hash": hex.EncodeToString(invoice.RHash), + }).WithError(err).Error("error publishing invoice") + return + } + logrus.WithFields( + logrus.Fields{ + "payload_type": "invoice", + "rabbitmq_latency": time.Since(startTime).Seconds(), + "amount": invoice.AmtPaidSat, + "keysend": invoice.IsKeysend, + "add_index": invoice.AddIndex, + "settle_date": invoice.SettleDate, + "payment_hash": hex.EncodeToString(invoice.RHash), + }).Info("published invoice") + } +}