From b119fa0b55d28af9e515cc49c017f958f431ae1c Mon Sep 17 00:00:00 2001 From: jholdstock Date: Thu, 12 Oct 2023 09:34:39 +0100 Subject: [PATCH 1/6] Introduce crawler struct. The new crawler struct contains its own reference to an address manager instance rather than using the global reference. This opens the possibility of multiple crawlers using different address managers in the future. --- decred.go | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/decred.go b/decred.go index 08cc4f1a..9906700d 100644 --- a/decred.go +++ b/decred.go @@ -30,13 +30,25 @@ const ( var amgr *Manager -func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params) { +type crawler struct { + params *chaincfg.Params + amgr *Manager +} + +func newCrawler(params *chaincfg.Params, amgr *Manager) *crawler { + return &crawler{ + params: params, + amgr: amgr, + } +} + +func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) { onaddr := make(chan struct{}, 1) verack := make(chan struct{}, 1) config := peer.Config{ UserAgentName: appName, UserAgentVersion: "0.0.1", - Net: netParams.Net, + Net: c.params.Net, DisableRelayTx: true, Listeners: peer.MessageListeners{ @@ -48,7 +60,7 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params n = append(n, addrPort) } } - added := amgr.AddAddresses(n) + added := c.amgr.AddAddresses(n) log.Printf("Peer %v sent %v addresses, %d new", p.Addr(), len(msg.AddrList), added) onaddr <- struct{}{} @@ -70,7 +82,7 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params // Time stamp the attempt after disconnect or dial error so we don't prune // this peer before or during its test. - defer amgr.Attempt(ip) + defer c.amgr.Attempt(ip) ctxTimeout, cancel := context.WithTimeout(ctx, defaultNodeTimeout) defer cancel() @@ -86,7 +98,7 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params select { case <-verack: // Mark this peer as a good node. - amgr.Good(ip, p.Services(), p.ProtocolVersion()) + c.amgr.Good(ip, p.Services(), p.ProtocolVersion()) // Ask peer for some addresses. p.QueueMessage(wire.NewMsgGetAddr(), nil) @@ -106,13 +118,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params } } -func creep(ctx context.Context, netParams *chaincfg.Params) { +func (c *crawler) run(ctx context.Context) { for { if ctx.Err() != nil { return } - ips := amgr.Addresses() + ips := c.amgr.Addresses() if len(ips) == 0 { log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout) select { @@ -128,7 +140,7 @@ func creep(ctx context.Context, netParams *chaincfg.Params) { for _, ip := range ips { go func(ip netip.AddrPort) { defer wg.Done() - testPeer(ctx, ip, netParams) + c.testPeer(ctx, ip) }(ip) } wg.Wait() @@ -158,19 +170,21 @@ func main() { } amgr.AddAddresses([]netip.AddrPort{seeder}) + c := newCrawler(cfg.netParams, amgr) + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - amgr.run(ctx) // only returns on context cancellation + amgr.run(ctx) // Only returns on context cancellation. log.Print("Address manager done.") }() wg.Add(1) go func() { defer wg.Done() - creep(ctx, cfg.netParams) // only returns on context cancellation + c.run(ctx) // Only returns on context cancellation. log.Print("Crawler done.") }() From 0a4ad2cbaab2e243204b2427a6354167e72b2d5d Mon Sep 17 00:00:00 2001 From: jholdstock Date: Fri, 13 Oct 2023 09:42:54 +0100 Subject: [PATCH 2/6] Remove global address manager var. The web server only needs a reference to an address manager once (during initialization) so it can be passed as a parameter and the global var can be removed. --- decred.go | 6 ++---- http.go | 10 ++++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/decred.go b/decred.go index 9906700d..16a1c7fa 100644 --- a/decred.go +++ b/decred.go @@ -28,8 +28,6 @@ const ( defaultNodeTimeout = time.Second * 3 ) -var amgr *Manager - type crawler struct { params *chaincfg.Params amgr *Manager @@ -157,7 +155,7 @@ func main() { } dataDir := filepath.Join(defaultHomeDir, cfg.netParams.Name) - amgr, err = NewManager(dataDir) + amgr, err := NewManager(dataDir) if err != nil { fmt.Fprintf(os.Stderr, "NewManager: %v\n", err) os.Exit(1) @@ -191,7 +189,7 @@ func main() { wg.Add(1) go func() { defer wg.Done() - if err := serveHTTP(ctx, cfg.Listen); err != nil { + if err := serveHTTP(ctx, cfg.Listen, amgr); err != nil { log.Fatal(err) } log.Print("HTTP server done.") diff --git a/http.go b/http.go index c8ea105a..fbb02809 100644 --- a/http.go +++ b/http.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021 The Decred developers +// Copyright (c) 2018-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -22,7 +22,7 @@ import ( const defaultHTTPTimeout = 10 * time.Second -func httpGetAddrs(w http.ResponseWriter, r *http.Request) { +func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager) { var wantedIP uint32 var wantedPV uint32 var wantedSF wire.ServiceFlag @@ -81,14 +81,16 @@ func httpGetAddrs(w http.ResponseWriter, r *http.Request) { } } -func serveHTTP(ctx context.Context, addr string) error { +func serveHTTP(ctx context.Context, addr string, amgr *Manager) error { listener, err := net.Listen("tcp", addr) if err != nil { return fmt.Errorf("can't listen on %s. web server quitting: %w", addr, err) } mux := http.NewServeMux() - mux.HandleFunc(api.GetAddrsPath, httpGetAddrs) + mux.HandleFunc(api.GetAddrsPath, func(w http.ResponseWriter, r *http.Request) { + httpGetAddrs(w, r, amgr) + }) srv := &http.Server{ Handler: mux, From 9adcd030c9099abc65dfcd184ea8b0fe129c736b Mon Sep 17 00:00:00 2001 From: jholdstock Date: Thu, 12 Oct 2023 09:35:13 +0100 Subject: [PATCH 3/6] Introduce server struct. The new server struct is initialized and run as two distinct steps. Any errors during initialization (eg. port already in use) will cause dcrseeder to exit. The run function is blocking and returns when the provided context is canceled. --- decred.go | 9 ++++++--- http.go | 47 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/decred.go b/decred.go index 16a1c7fa..5faaa133 100644 --- a/decred.go +++ b/decred.go @@ -170,6 +170,11 @@ func main() { c := newCrawler(cfg.netParams, amgr) + server, err := newServer(cfg.Listen, amgr) + if err != nil { + log.Fatal(err) + } + var wg sync.WaitGroup wg.Add(1) @@ -189,9 +194,7 @@ func main() { wg.Add(1) go func() { defer wg.Done() - if err := serveHTTP(ctx, cfg.Listen, amgr); err != nil { - log.Fatal(err) - } + server.run(ctx) // Only returns on context cancellation. log.Print("HTTP server done.") }() diff --git a/http.go b/http.go index fbb02809..c4bbde85 100644 --- a/http.go +++ b/http.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log" "net" "net/http" @@ -81,10 +80,15 @@ func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager) { } } -func serveHTTP(ctx context.Context, addr string, amgr *Manager) error { +type server struct { + srv *http.Server + listener net.Listener +} + +func newServer(addr string, amgr *Manager) (*server, error) { listener, err := net.Listen("tcp", addr) if err != nil { - return fmt.Errorf("can't listen on %s. web server quitting: %w", addr, err) + return nil, err } mux := http.NewServeMux() @@ -98,24 +102,37 @@ func serveHTTP(ctx context.Context, addr string, amgr *Manager) error { WriteTimeout: defaultHTTPTimeout, // request to response time } - // Shutdown the server on context cancellation. + return &server{ + srv: srv, + listener: listener, + }, nil +} + +func (h *server) run(ctx context.Context) { var wg sync.WaitGroup + + // Add the graceful shutdown to the waitgroup. wg.Add(1) go func() { defer wg.Done() + // Wait until context is canceled before shutting down the server. <-ctx.Done() - ctxShutdown, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout) - defer cancel() - err := srv.Shutdown(ctxShutdown) - if err != nil { - log.Printf("Trouble shutting down HTTP server: %v", err) + _ = h.srv.Shutdown(ctx) + }() + + // Start webserver. + wg.Add(1) + go func() { + defer wg.Done() + + log.Printf("Listening on %s", h.listener.Addr()) + err := h.srv.Serve(h.listener) + // ErrServerClosed is expected from a graceful server shutdown, it can + // be ignored. Anything else should be logged. + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Printf("unexpected (http.Server).Serve error: %v", err) } }() - defer wg.Wait() - err = srv.Serve(listener) // blocking - if !errors.Is(err, http.ErrServerClosed) { - return fmt.Errorf("unexpected (http.Server).Serve error: %w", err) - } - return nil // Shutdown called + wg.Wait() } From 4ad8eeebc5d1f86334619be3de7fc55dbba3294e Mon Sep 17 00:00:00 2001 From: jholdstock Date: Thu, 12 Oct 2023 09:35:28 +0100 Subject: [PATCH 4/6] Add network prefix to log lines. Rather than using the default global logger provided by the log package, initialize a logger with a prefix to indicate the network dcrseeder is running on ("[mainnet]" or "[testnet]"). Pass that logger around as a parameter and use it in all subsystems. --- decred.go | 26 ++++++++++++++++---------- http.go | 12 +++++++----- manager.go | 18 ++++++++++-------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/decred.go b/decred.go index 5faaa133..326cd49a 100644 --- a/decred.go +++ b/decred.go @@ -31,12 +31,14 @@ const ( type crawler struct { params *chaincfg.Params amgr *Manager + log *log.Logger } -func newCrawler(params *chaincfg.Params, amgr *Manager) *crawler { +func newCrawler(params *chaincfg.Params, amgr *Manager, log *log.Logger) *crawler { return &crawler{ params: params, amgr: amgr, + log: log, } } @@ -59,12 +61,12 @@ func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) { } } added := c.amgr.AddAddresses(n) - log.Printf("Peer %v sent %v addresses, %d new", + c.log.Printf("Peer %v sent %v addresses, %d new", p.Addr(), len(msg.AddrList), added) onaddr <- struct{}{} }, OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { - log.Printf("Adding peer %v with services %v pver %d", + c.log.Printf("Adding peer %v with services %v pver %d", p.NA().IP.String(), p.Services(), p.ProtocolVersion()) verack <- struct{}{} }, @@ -74,7 +76,7 @@ func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) { host := ip.String() p, err := peer.NewOutboundPeer(&config, host) if err != nil { - log.Printf("NewOutboundPeer on %v: %v", host, err) + c.log.Printf("NewOutboundPeer on %v: %v", host, err) return } @@ -102,7 +104,7 @@ func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) { p.QueueMessage(wire.NewMsgGetAddr(), nil) case <-time.After(defaultNodeTimeout): - log.Printf("verack timeout on peer %v", p.Addr()) + c.log.Printf("verack timeout on peer %v", p.Addr()) return case <-ctx.Done(): return @@ -111,7 +113,7 @@ func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) { select { case <-onaddr: case <-time.After(defaultNodeTimeout): - log.Printf("getaddr timeout on peer %v", p.Addr()) + c.log.Printf("getaddr timeout on peer %v", p.Addr()) case <-ctx.Done(): } } @@ -124,7 +126,7 @@ func (c *crawler) run(ctx context.Context) { ips := c.amgr.Addresses() if len(ips) == 0 { - log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout) + c.log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout) select { case <-time.After(defaultAddressTimeout): case <-ctx.Done(): @@ -154,8 +156,12 @@ func main() { os.Exit(1) } + // Prefix log lines with current network, e.g. "[mainnet]" or "[testnet]". + logPrefix := fmt.Sprintf("[%.7s] ", cfg.netParams.Name) + log := log.New(os.Stdout, logPrefix, log.LstdFlags|log.Lmsgprefix) + dataDir := filepath.Join(defaultHomeDir, cfg.netParams.Name) - amgr, err := NewManager(dataDir) + amgr, err := NewManager(dataDir, log) if err != nil { fmt.Fprintf(os.Stderr, "NewManager: %v\n", err) os.Exit(1) @@ -168,9 +174,9 @@ func main() { } amgr.AddAddresses([]netip.AddrPort{seeder}) - c := newCrawler(cfg.netParams, amgr) + c := newCrawler(cfg.netParams, amgr, log) - server, err := newServer(cfg.Listen, amgr) + server, err := newServer(cfg.Listen, amgr, log) if err != nil { log.Fatal(err) } diff --git a/http.go b/http.go index c4bbde85..15a5e427 100644 --- a/http.go +++ b/http.go @@ -21,7 +21,7 @@ import ( const defaultHTTPTimeout = 10 * time.Second -func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager) { +func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager, log *log.Logger) { var wantedIP uint32 var wantedPV uint32 var wantedSF wire.ServiceFlag @@ -83,9 +83,10 @@ func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager) { type server struct { srv *http.Server listener net.Listener + log *log.Logger } -func newServer(addr string, amgr *Manager) (*server, error) { +func newServer(addr string, amgr *Manager, log *log.Logger) (*server, error) { listener, err := net.Listen("tcp", addr) if err != nil { return nil, err @@ -93,7 +94,7 @@ func newServer(addr string, amgr *Manager) (*server, error) { mux := http.NewServeMux() mux.HandleFunc(api.GetAddrsPath, func(w http.ResponseWriter, r *http.Request) { - httpGetAddrs(w, r, amgr) + httpGetAddrs(w, r, amgr, log) }) srv := &http.Server{ @@ -105,6 +106,7 @@ func newServer(addr string, amgr *Manager) (*server, error) { return &server{ srv: srv, listener: listener, + log: log, }, nil } @@ -125,12 +127,12 @@ func (h *server) run(ctx context.Context) { go func() { defer wg.Done() - log.Printf("Listening on %s", h.listener.Addr()) + h.log.Printf("Listening on %s", h.listener.Addr()) err := h.srv.Serve(h.listener) // ErrServerClosed is expected from a graceful server shutdown, it can // be ignored. Anything else should be logged. if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Printf("unexpected (http.Server).Serve error: %v", err) + h.log.Printf("unexpected (http.Server).Serve error: %v", err) } }() diff --git a/manager.go b/manager.go index 46d4d67b..9729e7e2 100644 --- a/manager.go +++ b/manager.go @@ -36,6 +36,7 @@ type Manager struct { nodes map[string]*Node peersFile string + log *log.Logger } const ( @@ -62,7 +63,7 @@ const ( pruneExpireTimeout = time.Hour * 24 ) -func NewManager(dataDir string) (*Manager, error) { +func NewManager(dataDir string, log *log.Logger) (*Manager, error) { err := os.MkdirAll(dataDir, 0o700) if err != nil { return nil, err @@ -71,6 +72,7 @@ func NewManager(dataDir string) (*Manager, error) { amgr := Manager{ nodes: make(map[string]*Node), peersFile: filepath.Join(dataDir, peersFilename), + log: log, } err = amgr.deserializePeers() @@ -276,7 +278,7 @@ func (m *Manager) prunePeers() { l := len(m.nodes) m.mtx.Unlock() - log.Printf("Pruned %d addresses: %d remaining", count, l) + m.log.Printf("Pruned %d addresses: %d remaining", count, l) } func (m *Manager) deserializePeers() error { @@ -304,7 +306,7 @@ func (m *Manager) deserializePeers() error { m.nodes = nodes m.mtx.Unlock() - log.Printf("%d nodes loaded from %s", l, filePath) + m.log.Printf("%d nodes loaded from %s", l, filePath) return nil } @@ -316,23 +318,23 @@ func (m *Manager) savePeers() { tmpfile := m.peersFile + ".new" w, err := os.Create(tmpfile) if err != nil { - log.Printf("Error opening file %s: %v", tmpfile, err) + m.log.Printf("Error opening file %s: %v", tmpfile, err) return } enc := json.NewEncoder(w) if err := enc.Encode(&m.nodes); err != nil { w.Close() - log.Printf("Failed to encode file %s: %v", tmpfile, err) + m.log.Printf("Failed to encode file %s: %v", tmpfile, err) return } if err := w.Close(); err != nil { - log.Printf("Error closing file %s: %v", tmpfile, err) + m.log.Printf("Error closing file %s: %v", tmpfile, err) return } if err := os.Rename(tmpfile, m.peersFile); err != nil { - log.Printf("Error writing file %s: %v", m.peersFile, err) + m.log.Printf("Error writing file %s: %v", m.peersFile, err) return } - log.Printf("%d nodes saved to %s", len(m.nodes), m.peersFile) + m.log.Printf("%d nodes saved to %s", len(m.nodes), m.peersFile) } From 76dc336891fdf2f7e45b3dec661493c83085cd84 Mon Sep 17 00:00:00 2001 From: jholdstock Date: Thu, 12 Oct 2023 09:17:32 +0100 Subject: [PATCH 5/6] Consolidate config validation. Config validation/prep was split between the loadConfig func and the main func. This consolidates it into loadConfig. --- config.go | 19 +++++++++++++++---- decred.go | 11 ++--------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/config.go b/config.go index 4b1ec106..e66f8fce 100644 --- a/config.go +++ b/config.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021 The Decred developers +// Copyright (c) 2018-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net" + "net/netip" "os" "path/filepath" "strings" @@ -33,10 +34,13 @@ var ( // // See loadConfig for details on the configuration load process. type config struct { - Listen string `long:"httplisten" description:"HTTP listen on address:port"` - Seeder string `short:"s" description:"IP address of a working node"` - TestNet bool `long:"testnet" description:"Use testnet"` + Listen string `long:"httplisten" description:"HTTP listen on address:port"` + Seeder string `short:"s" description:"IP address of a working node"` + TestNet bool `long:"testnet" description:"Use testnet"` + netParams *chaincfg.Params + seederIP netip.AddrPort + dataDir string } func loadConfig() (*config, error) { @@ -103,6 +107,8 @@ func loadConfig() (*config, error) { cfg.netParams = chaincfg.MainNetParams() } + cfg.dataDir = filepath.Join(defaultHomeDir, cfg.netParams.Name) + if cfg.Listen == "" { return nil, fmt.Errorf("no listeners specified") } @@ -113,6 +119,11 @@ func loadConfig() (*config, error) { } cfg.Seeder = normalizeAddress(cfg.Seeder, cfg.netParams.DefaultPort) + cfg.seederIP, err = netip.ParseAddrPort(cfg.Seeder) + if err != nil { + return nil, fmt.Errorf("invalid seeder ip: %v", err) + } + return &cfg, nil } diff --git a/decred.go b/decred.go index 326cd49a..cbdb0f5b 100644 --- a/decred.go +++ b/decred.go @@ -11,7 +11,6 @@ import ( "net" "net/netip" "os" - "path/filepath" "sync" "time" @@ -160,19 +159,13 @@ func main() { logPrefix := fmt.Sprintf("[%.7s] ", cfg.netParams.Name) log := log.New(os.Stdout, logPrefix, log.LstdFlags|log.Lmsgprefix) - dataDir := filepath.Join(defaultHomeDir, cfg.netParams.Name) - amgr, err := NewManager(dataDir, log) + amgr, err := NewManager(cfg.dataDir, log) if err != nil { fmt.Fprintf(os.Stderr, "NewManager: %v\n", err) os.Exit(1) } - seeder, err := netip.ParseAddrPort(cfg.Seeder) - if err != nil { - fmt.Fprintf(os.Stderr, "Invalid seeder ip: %v\n", err) - os.Exit(1) - } - amgr.AddAddresses([]netip.AddrPort{seeder}) + amgr.AddAddresses([]netip.AddrPort{cfg.seederIP}) c := newCrawler(cfg.netParams, amgr, log) From b66f907f7c632dc5d6d4eda7271f41b66c56f159 Mon Sep 17 00:00:00 2001 From: jholdstock Date: Thu, 12 Oct 2023 09:58:40 +0100 Subject: [PATCH 6/6] Run deferred tasks on os.Exit. Any tasks deferred in the main func will not execute if os.Exit is called. Moving application logic down into a new func which is called by main works around this limitation. --- decred.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/decred.go b/decred.go index cbdb0f5b..75ddcb24 100644 --- a/decred.go +++ b/decred.go @@ -147,12 +147,18 @@ func (c *crawler) run(ctx context.Context) { } func main() { + os.Exit(run()) +} + +// run is the real main function for dcrseeder. It is necessary to work around +// the fact that deferred functions do not run when os.Exit() is called. +func run() int { ctx := shutdownListener() cfg, err := loadConfig() if err != nil { fmt.Fprintf(os.Stderr, "loadConfig: %v\n", err) - os.Exit(1) + return 1 } // Prefix log lines with current network, e.g. "[mainnet]" or "[testnet]". @@ -162,7 +168,7 @@ func main() { amgr, err := NewManager(cfg.dataDir, log) if err != nil { fmt.Fprintf(os.Stderr, "NewManager: %v\n", err) - os.Exit(1) + return 1 } amgr.AddAddresses([]netip.AddrPort{cfg.seederIP}) @@ -171,7 +177,8 @@ func main() { server, err := newServer(cfg.Listen, amgr, log) if err != nil { - log.Fatal(err) + fmt.Fprint(os.Stderr, err) + return 1 } var wg sync.WaitGroup @@ -200,4 +207,6 @@ func main() { // Wait for crawler and http server, then stop address manager. wg.Wait() log.Print("Bye!") + + return 0 }