From 9cf4061b252375a62e8aa32816ea88acbe048630 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Fri, 22 Mar 2024 11:31:50 -0300 Subject: [PATCH] server: Fix reconnection to dcrlnd This fixes the server's Run method to properly attempt reconnections to the dcrlnd node in case of errors. Previously, two things were wrong in this method: - The context used in some calls was ctx instead of gctx, which caused some of the subsystems to not exit correctly on failures and thus mask the underlying connection error. - There was no re-attempt at failed operations, when the reason for failure was a connection error (as opposed to a graceful termination of the server). This commit fixes both issues by using the correct context everywhere and ensuring all subsystems re-attempt their functions until the context is canceled, with an appropriate delay. --- server/server.go | 104 ++++++++++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 41 deletions(-) diff --git a/server/server.go b/server/server.go index 228d29f..e6f07c3 100644 --- a/server/server.go +++ b/server/server.go @@ -354,54 +354,74 @@ func (s *Server) openChannel(ctx context.Context, winv waitingInvoice) { // listenToInvoices reacts to invoice events. func (s *Server) listenToInvoices(ctx context.Context) error { - stream, err := s.lc.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{}) - if err != nil { - return err + + delay := func() { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + } } - for { - inv, err := stream.Recv() +nextConn: + for ctx.Err() == nil { + stream, err := s.lc.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{}) if err != nil { - return err + s.log.Errorf("Unable to subscribe to invoices: %v", err) + delay() + continue nextConn } - switch { - case inv.State == lnrpc.Invoice_CANCELED: - fpath := filepath.Join(s.root, invoicesDir, - hex.EncodeToString(inv.RHash)) - if err := s.removeFile(fpath); err != nil { - return err - } - - case inv.State == lnrpc.Invoice_SETTLED: - fpath := filepath.Join(s.root, invoicesDir, - hex.EncodeToString(inv.RHash)) - var winv waitingInvoice - err := s.readJsonFile(fpath, &winv) - if errors.Is(err, errNotExists) { - // Payment for something that isn't channel - // creation. - continue - } + for { + inv, err := stream.Recv() if err != nil { - return err + s.log.Errorf("Unable to receive next invoice update: %v", err) + delay() + continue nextConn } - wantAtoms := int64(s.amountForNewChan(winv.ChannelSize)) - if inv.AmtPaidAtoms < wantAtoms { - s.log.Warnf("Received payment for invoice %x "+ - "lower than required (%d < %d)", - inv.AmtPaidAtoms < wantAtoms) - continue - } + switch { + case inv.State == lnrpc.Invoice_CANCELED: + fpath := filepath.Join(s.root, invoicesDir, + hex.EncodeToString(inv.RHash)) + if err := s.removeFile(fpath); err != nil { + return err + } + + case inv.State == lnrpc.Invoice_SETTLED: + fpath := filepath.Join(s.root, invoicesDir, + hex.EncodeToString(inv.RHash)) + var winv waitingInvoice + err := s.readJsonFile(fpath, &winv) + if errors.Is(err, errNotExists) { + // Payment for something that isn't channel + // creation. + continue + } + if err != nil { + // Fatal failure. + return err + } - // Create channel. - if err := s.removeFile(fpath); err != nil { - return err + wantAtoms := int64(s.amountForNewChan(winv.ChannelSize)) + if inv.AmtPaidAtoms < wantAtoms { + s.log.Warnf("Received payment for invoice %x "+ + "lower than required (%d < %d)", + inv.AmtPaidAtoms < wantAtoms) + continue + } + + // Create channel. + if err := s.removeFile(fpath); err != nil { + // Fatal failure. + return err + } + go s.openChannel(ctx, winv) } - go s.openChannel(ctx, winv) + } } + + return ctx.Err() } // closeChannel closes the given channel. @@ -527,7 +547,7 @@ func (s *Server) FetchManagedChannels(ctx context.Context) (res ManagementInfo, // manage the channels. bal, err := s.lc.WalletBalance(ctx, &lnrpc.WalletBalanceRequest{}) if err != nil { - return res, err + return res, fmt.Errorf("unable to fetch wallet balance: %v", err) } res.WalletBalance = dcrutil.Amount(bal.TotalBalance) @@ -697,7 +717,8 @@ func (s *Server) runManageChannels(ctx context.Context) error { case <-time.After(s.cfg.CloseCheckInterval): err := s.manageChannels(ctx) if err != nil { - return err + s.log.Errorf("Unable to manage channels in "+ + "this inverval: %v", err) } case <-ctx.Done(): return ctx.Err() @@ -717,8 +738,8 @@ func (s *Server) Run(ctx context.Context) error { g.Go(func() error { for { select { - case <-ctx.Done(): - return ctx.Err() + case <-gctx.Done(): + return gctx.Err() case <-time.After(time.Minute): } @@ -740,12 +761,13 @@ func (s *Server) Run(ctx context.Context) error { }) // Close low activity channels. - g.Go(func() error { return s.runManageChannels(ctx) }) + g.Go(func() error { return s.runManageChannels(gctx) }) // Shutdown conn once an error occurrs. This unblocks any outstanding // calls. g.Go(func() error { <-gctx.Done() + s.log.Infof("Closing connection to dcrlnd") if err := s.conn.Close(); err != nil { s.log.Warnf("Error while closing conn: %v", err) }