Skip to content

Commit

Permalink
Merge pull request #18 from getAlby/publish-invoices
Browse files Browse the repository at this point in the history
republish invoices cmd
  • Loading branch information
frnandu authored Feb 21, 2024
2 parents 49b5a20 + f2604d5 commit f64c9ca
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 76 deletions.
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ COPY . .
# Build the application
RUN go build -o main

# Build the utility scripts
RUN go build ./cmd/republish-invoices

# Start a new, final image to reduce size.
FROM alpine as final

# Copy the binaries and entrypoint from the builder image.
COPY --from=builder /build/main /bin/
COPY --from=builder /build/republish-invoices /bin/

ENTRYPOINT [ "/bin/main" ]
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,15 @@ Possible missed-while-offline outgoing payments are handled by looking up the ea
- Routing key: `invoice.incoming.settled`
# LND outgoing payments
- Payload [lnrpc.Payment](https://github.com/lightningnetwork/lnd/blob/master/lnrpc/lightning.pb.go#L12612)
- Routing keys `payment.outgoing.settled`, `payment.outgoing.error`
- Routing keys `payment.outgoing.settled`, `payment.outgoing.error`

# Republish Invoices

If you need to republish settled invoices to update state in lndhub, you can use the cmd/republish-invoices by providing all payment hashes separated by commas:
- "REPUBLISH_INVOICE_HASHES" : `<hash_1>,<hash_2>....<hash_n>`

Use this in a job by setting:
```
command:
- /bin/republish-invoices
```
7 changes: 4 additions & 3 deletions check_invoice_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/getAlby/ln-event-publisher/service"
"testing"

"github.com/lightningnetwork/lnd/lnrpc"
Expand All @@ -9,7 +10,7 @@ import (

func TestCheckInvoice(t *testing.T) {
//test non keysend
assert.True(t, shouldPublishInvoice(&lnrpc.Invoice{
assert.True(t, service.ShouldPublishInvoice(&lnrpc.Invoice{
State: lnrpc.Invoice_SETTLED,
IsKeysend: false,
Htlcs: []*lnrpc.InvoiceHTLC{
Expand All @@ -28,7 +29,7 @@ func TestCheckInvoice(t *testing.T) {
},
}))
//test keysend with wallet id tlv
assert.True(t, shouldPublishInvoice(&lnrpc.Invoice{
assert.True(t, service.ShouldPublishInvoice(&lnrpc.Invoice{
State: lnrpc.Invoice_SETTLED,
IsKeysend: true,
Htlcs: []*lnrpc.InvoiceHTLC{
Expand All @@ -50,7 +51,7 @@ func TestCheckInvoice(t *testing.T) {
},
}))
//test keysend without wallet id tlv
assert.False(t, shouldPublishInvoice(&lnrpc.Invoice{
assert.False(t, service.ShouldPublishInvoice(&lnrpc.Invoice{
State: lnrpc.Invoice_SETTLED,
IsKeysend: true,
Htlcs: []*lnrpc.InvoiceHTLC{
Expand Down
80 changes: 80 additions & 0 deletions cmd/republish-invoices/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"context"
"encoding/hex"
"github.com/getAlby/ln-event-publisher/config"
"github.com/getAlby/ln-event-publisher/lnd"
"github.com/getAlby/ln-event-publisher/service"
"github.com/getsentry/sentry-go"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/sirupsen/logrus"
"os"
"os/signal"
)

func main() {
c := &config.Config{}
logrus.SetFormatter(&logrus.JSONFormatter{})

// Load configruation from environment variables
err := godotenv.Load(".env")
if err != nil {
logrus.Warn("Failed to load .env file")
}
err = envconfig.Process("", c)
if err != nil {
logrus.Fatalf("Error loading environment variables: %v", err)
}

// Setup exception tracking with Sentry if configured
if c.SentryDSN != "" {
if err = sentry.Init(sentry.ClientOptions{
Dsn: c.SentryDSN,
}); err != nil {
logrus.Error(err)
}
}
client, err := lnd.NewLNDclient(lnd.LNDoptions{
Address: c.LNDAddress,
MacaroonFile: c.LNDMacaroonFile,
CertFile: c.LNDCertFile,
})
if err != nil {
sentry.CaptureException(err)
logrus.Fatalf("Error loading environment variables: %v", err)
}
resp, err := client.GetInfo(context.Background(), &lnrpc.GetInfoRequest{})
if err != nil {
sentry.CaptureException(err)
logrus.Fatal(err)
}
logrus.Infof("Connected to LND: %s - %s", resp.Alias, resp.IdentityPubkey)
svc := &service.Service{
Cfg: c,
Lnd: client,
}
err = svc.InitRabbitMq()
if err != nil {
sentry.CaptureException(err)
logrus.Fatal(err)
}
backgroundCtx := context.Background()
ctx, _ := signal.NotifyContext(backgroundCtx, os.Interrupt)

for i := 0; i < len(c.RepublishInvoiceHashes); i++ {
hashBytes, err := hex.DecodeString(c.RepublishInvoiceHashes[i])
if err != nil {
logrus.Error("Invalid Hash ", c.RepublishInvoiceHashes[i], " ", err)
continue
}

// Create a PaymentHash struct
paymentHash := &lnrpc.PaymentHash{
RHash: hashBytes,
}
svc.RepublishInvoice(ctx, paymentHash)
}
}
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package config

type Config struct {
LNDAddress string `envconfig:"LND_ADDRESS" required:"true"`
LNDMacaroonFile string `envconfig:"LND_MACAROON_FILE"`
LNDCertFile string `envconfig:"LND_CERT_FILE"`
DatabaseUri string `envconfig:"DATABASE_URI" required:"true"`
DatabaseMaxConns int `envconfig:"DATABASE_MAX_CONNS" default:"10"`
DatabaseMaxIdleConns int `envconfig:"DATABASE_MAX_IDLE_CONNS" default:"5"`
DatabaseConnMaxLifetime int `envconfig:"DATABASE_CONN_MAX_LIFETIME" default:"1800"` // 30 minutes
RabbitMQUri string `envconfig:"RABBITMQ_URI" required:"true"`
RabbitMQTimeoutSeconds int `envconfig:"RABBITMQ_TIMEOUT_SECONDS" default:"10"`
SentryDSN string `envconfig:"SENTRY_DSN"`
RepublishInvoiceHashes []string `envconfig:"REPUBLISH_INVOICE_HASHES"`
}
5 changes: 3 additions & 2 deletions db.go → db/db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main
package db

import (
"github.com/getAlby/ln-event-publisher/config"
"log"
"os"
"time"
Expand All @@ -11,7 +12,7 @@ import (
"gorm.io/gorm/logger"
)

func OpenDB(config *Config) (db *gorm.DB, err error) {
func OpenDB(config *config.Config) (db *gorm.DB, err error) {
//overwrite logger so we don't print warnings for slow sql
//because we use db transactions that span the rabbitmq publish operation
dbLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{
Expand Down
45 changes: 26 additions & 19 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"bytes"
"context"
"encoding/json"
"github.com/getAlby/ln-event-publisher/config"
db2 "github.com/getAlby/ln-event-publisher/db"
"github.com/getAlby/ln-event-publisher/service"
"os"
"testing"
"time"
Expand All @@ -16,9 +19,9 @@ import (
"google.golang.org/grpc"
)

func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) (svc *Service, mlnd *MockLND, msgs <-chan amqp091.Delivery) {
func createTestService(t *testing.T, cfg *config.Config, exchange, routingKey string) (svc *service.Service, mlnd *MockLND, msgs <-chan amqp091.Delivery) {

svc = &Service{cfg: cfg}
svc = &service.Service{Cfg: cfg}
mlnd = &MockLND{
Sub: &MockSubscribeInvoices{invoiceChan: make(chan *lnrpc.Invoice)},
PaymentSub: &MockSubscribePayments{
Expand All @@ -31,7 +34,7 @@ func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) (
assert.NoError(t, err)

//sub to the rabbit exchange ourselves to test e2e
q, err := svc.rabbitChannel.QueueDeclare(
q, err := svc.RabbitChannel.QueueDeclare(
"integration_test",
true,
false,
Expand All @@ -40,18 +43,18 @@ func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) (
nil,
)
assert.NoError(t, err)
err = svc.rabbitChannel.QueueBind(q.Name, routingKey, exchange, false, nil)
err = svc.RabbitChannel.QueueBind(q.Name, routingKey, exchange, false, nil)
assert.NoError(t, err)

// - init PG
db, err := OpenDB(cfg)
db, err := db2.OpenDB(cfg)
assert.NoError(t, err)
svc.db = db
svc.lnd = mlnd
svc.Db = db
svc.Lnd = mlnd

//init rabbit channel
//consume channel to check that invoice was published
m, err := svc.rabbitChannel.Consume(
m, err := svc.RabbitChannel.Consume(
q.Name,
"",
true,
Expand All @@ -64,14 +67,14 @@ func createTestService(t *testing.T, cfg *Config, exchange, routingKey string) (
return svc, mlnd, m
}
func TestInvoicePublish(t *testing.T) {
cfg := &Config{
cfg := &config.Config{
DatabaseUri: os.Getenv("DATABASE_URI"),
RabbitMQUri: os.Getenv("RABBITMQ_URI"),
}
svc, mlnd, m := createTestService(t, cfg, LNDInvoiceExchange, LNDInvoiceRoutingKey)
svc, mlnd, m := createTestService(t, cfg, service.LNDInvoiceExchange, service.LNDInvoiceRoutingKey)
ctx, cancel := context.WithCancel(context.Background())
go func() {
svc.startInvoiceSubscription(ctx)
svc.StartInvoiceSubscription(ctx)
}()
// - mock incoming invoice
// the new invoice that will be saved will have addIndex + 1
Expand All @@ -90,21 +93,21 @@ func TestInvoicePublish(t *testing.T) {

//stop service
cancel()
svc.rabbitChannel.Close()
svc.RabbitChannel.Close()
// - clean up database
svc.db.Exec("delete from invoices;")
svc.Db.Exec("delete from invoices;")
}
func TestPaymentPublish(t *testing.T) {
cfg := &Config{
cfg := &config.Config{
DatabaseUri: os.Getenv("DATABASE_URI"),
RabbitMQUri: os.Getenv("RABBITMQ_URI"),
RabbitMQTimeoutSeconds: 1,
}
svc, mlnd, m := createTestService(t, cfg, LNDPaymentExchange, "payment.outgoing.*")
defer svc.db.Exec("delete from payments;")
svc, mlnd, m := createTestService(t, cfg, service.LNDPaymentExchange, "payment.outgoing.*")
defer svc.Db.Exec("delete from payments;")
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := svc.startPaymentSubscription(ctx)
err := svc.StartPaymentSubscription(ctx)
assert.EqualError(t, err, context.Canceled.Error())
}()
// - mock outgoing payment
Expand Down Expand Up @@ -170,7 +173,7 @@ func TestPaymentPublish(t *testing.T) {
// - start service again,
ctx, cancel2 := context.WithCancel(context.Background())
go func() {
err := svc.startPaymentSubscription(ctx)
err := svc.StartPaymentSubscription(ctx)
assert.EqualError(t, err, context.Canceled.Error())
}()
// test that all new updates are being published
Expand All @@ -187,7 +190,7 @@ func TestPaymentPublish(t *testing.T) {
timedOut, receivedPayment = timeoutOrNewPaymentFromRabbit(t, m)
assert.True(t, timedOut)
cancel2()
svc.rabbitChannel.Close()
svc.RabbitChannel.Close()
}

func timeoutOrNewPaymentFromRabbit(t *testing.T, m <-chan amqp091.Delivery) (timeout bool, payment *lnrpc.Payment) {
Expand Down Expand Up @@ -321,3 +324,7 @@ func (mlnd *MockLND) ListPayments(ctx context.Context, req *lnrpc.ListPaymentsRe
Payments: mlnd.ListPaymentsResponse,
}, nil
}

func (mlnd *MockLND) LookupInvoice(ctx context.Context, req *lnrpc.PaymentHash, options ...grpc.CallOption) (*lnrpc.Invoice, error) {
panic("not implemented") // TODO: Implement
}
1 change: 1 addition & 0 deletions lnd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type LightningClientWrapper interface {
GetInfo(ctx context.Context, req *lnrpc.GetInfoRequest, options ...grpc.CallOption) (*lnrpc.GetInfoResponse, error)
DecodeBolt11(ctx context.Context, bolt11 string, options ...grpc.CallOption) (*lnrpc.PayReq, error)
ListPayments(ctx context.Context, req *lnrpc.ListPaymentsRequest, options ...grpc.CallOption) (*lnrpc.ListPaymentsResponse, error)
LookupInvoice(ctx context.Context, req *lnrpc.PaymentHash, options ...grpc.CallOption) (*lnrpc.Invoice, error)
}

type SubscribeInvoicesWrapper interface {
Expand Down
4 changes: 4 additions & 0 deletions lnd/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ func (wrapper *LNDWrapper) SubscribePayments(ctx context.Context, req *routerrpc
func (wrapper *LNDWrapper) ListPayments(ctx context.Context, req *lnrpc.ListPaymentsRequest, options ...grpc.CallOption) (*lnrpc.ListPaymentsResponse, error) {
return wrapper.client.ListPayments(ctx, req)
}

func (wrapper *LNDWrapper) LookupInvoice(ctx context.Context, req *lnrpc.PaymentHash, options ...grpc.CallOption) (*lnrpc.Invoice, error) {
return wrapper.client.LookupInvoice(ctx, req)
}
19 changes: 11 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package main
import (
"context"
"fmt"
"github.com/getAlby/ln-event-publisher/config"
"github.com/getAlby/ln-event-publisher/db"
"github.com/getAlby/ln-event-publisher/service"
"os"
"os/signal"
"strings"
Expand All @@ -18,7 +21,7 @@ import (
)

func main() {
c := &Config{}
c := &config.Config{}
logrus.SetFormatter(&logrus.JSONFormatter{})

// Load configruation from environment variables
Expand Down Expand Up @@ -55,15 +58,15 @@ func main() {
}
logrus.Infof("Connected to LND: %s - %s", resp.Alias, resp.IdentityPubkey)
logrus.Info("Opening PG database")
db, err := OpenDB(c)
db, err := db.OpenDB(c)
if err != nil {
sentry.CaptureException(err)
logrus.Fatal(err)
}
svc := &Service{
cfg: c,
lnd: client,
db: db,
svc := &service.Service{
Cfg: c,
Lnd: client,
Db: db,
}
err = svc.InitRabbitMq()
if err != nil {
Expand All @@ -77,7 +80,7 @@ func main() {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err = svc.startInvoiceSubscription(ctx)
err = svc.StartInvoiceSubscription(ctx)
if err != nil && !strings.Contains(err.Error(), context.Canceled.Error()) {
logrus.Fatal(err)
}
Expand All @@ -86,7 +89,7 @@ func main() {
}()
wg.Add(1)
go func() {
err = svc.startPaymentSubscription(ctx)
err = svc.StartPaymentSubscription(ctx)
if err != nil && !strings.Contains(err.Error(), context.Canceled.Error()) {
logrus.Fatal(err)
}
Expand Down
Loading

0 comments on commit f64c9ca

Please sign in to comment.