From f1cd341e8900ec1615ebe06355a9262b38c466a0 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Fri, 31 Mar 2023 17:45:17 -0400 Subject: [PATCH] [DVT-437] node crawler (#55) * DVT-437 add node crawl function * create crawl command * add crawl logic and enodes * more crawl function changes * attempting to connect to a node using devp2p to get node name informatino * connecting to node with ethclient * save updates to the crawler * add rlpx to node crawler * fix lint * add filter and ping commands, refactor crawl and p2p * update ping and crawl, remove filter * fix lint * add deadline to handshake * fix shadow --------- Co-authored-by: Jesse Lee --- .gitignore | 5 +- Makefile | 2 +- cmd/p2p/crawl/crawl.go | 136 +++++++++++++++ cmd/p2p/crawl/crawl_util.go | 256 +++++++++++++++++++++++++++ cmd/p2p/p2p.go | 18 ++ cmd/p2p/ping/ping.go | 116 ++++++++++++ cmd/root.go | 5 +- go.mod | 3 +- go.sum | 5 +- p2p/nodeset.go | 84 +++++++++ p2p/p2p.go | 81 +++++++++ p2p/rlpx.go | 138 +++++++++++++++ p2p/types.go | 339 ++++++++++++++++++++++++++++++++++++ 13 files changed, 1182 insertions(+), 6 deletions(-) create mode 100644 cmd/p2p/crawl/crawl.go create mode 100644 cmd/p2p/crawl/crawl_util.go create mode 100644 cmd/p2p/p2p.go create mode 100644 cmd/p2p/ping/ping.go create mode 100644 p2p/nodeset.go create mode 100644 p2p/p2p.go create mode 100644 p2p/rlpx.go create mode 100644 p2p/types.go diff --git a/.gitignore b/.gitignore index 7053aa07..0406bcfd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ out/ .DS_Store -coverage.out \ No newline at end of file +coverage.out + +.vscode +*.json \ No newline at end of file diff --git a/Makefile b/Makefile index 47b7ee97..d41018cb 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -INSTALL_DIR:=~/go/bin/ +INSTALL_DIR:=~/go/bin BIN_NAME:=polycli BUILD_DIR:=./out diff --git a/cmd/p2p/crawl/crawl.go b/cmd/p2p/crawl/crawl.go new file mode 100644 index 00000000..58777532 --- /dev/null +++ b/cmd/p2p/crawl/crawl.go @@ -0,0 +1,136 @@ +/* +Copyright © 2022 Polygon + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +package crawl + +import ( + "net" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/maticnetwork/polygon-cli/p2p" +) + +type ( + crawlParams struct { + Bootnodes string + Timeout string + Threads int + NetworkID int + NodesFile string + Database string + } +) + +var ( + inputCrawlParams crawlParams +) + +// crawlCmd represents the crawl command +var CrawlCmd = &cobra.Command{ + Use: "crawl [nodes file]", + Short: "Crawl a network", + Long: `This is a basic function to crawl a network.`, + Args: cobra.MinimumNArgs(1), + PreRun: func(cmd *cobra.Command, args []string) { + inputCrawlParams.NodesFile = args[0] + }, + RunE: func(cmd *cobra.Command, args []string) error { + inputSet, err := p2p.LoadNodesJSON(inputCrawlParams.NodesFile) + if err != nil { + return err + } + + var cfg discover.Config + cfg.PrivateKey, _ = crypto.GenerateKey() + bn, err := p2p.ParseBootnodes(inputCrawlParams.Bootnodes) + if err != nil { + log.Error().Err(err).Msg("Unable to parse bootnodes") + return err + } + cfg.Bootnodes = bn + + db, err := enode.OpenDB(inputCrawlParams.Database) + if err != nil { + return err + } + + ln := enode.NewLocalNode(db, cfg.PrivateKey) + socket, err := listen(ln) + if err != nil { + return err + } + + disc, err := discover.ListenV4(socket, ln, cfg) + if err != nil { + return err + } + defer disc.Close() + + c := newCrawler(inputSet, disc, disc.RandomNodes()) + c.revalidateInterval = 10 * time.Minute + + timeout, err := time.ParseDuration(inputCrawlParams.Timeout) + if err != nil { + return err + } + + log.Info().Msg("Starting crawl") + + output := c.run(timeout, inputCrawlParams.Threads) + return p2p.WriteNodesJSON(inputCrawlParams.NodesFile, output) + }, +} + +func init() { + CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Bootnodes, "bootnodes", "b", "", "Comma separated nodes used for bootstrapping. At least one bootnode is required, so other nodes in the network can discover each other.") + if err := CrawlCmd.MarkPersistentFlagRequired("bootnodes"); err != nil { + log.Error().Err(err).Msg("Failed to mark bootnodes as required persistent flag") + } + CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Timeout, "timeout", "t", "30m0s", "Time limit for the crawl.") + CrawlCmd.PersistentFlags().IntVarP(&inputCrawlParams.Threads, "parallel", "p", 16, "How many parallel discoveries to attempt.") + CrawlCmd.PersistentFlags().IntVarP(&inputCrawlParams.NetworkID, "network-id", "n", 0, "Filter discovered nodes by this network id.") + CrawlCmd.PersistentFlags().StringVarP(&inputCrawlParams.Database, "database", "d", "", "Node database for updating and storing client information.") +} + +func listen(ln *enode.LocalNode) (*net.UDPConn, error) { + addr := "0.0.0.0:0" + + socket, err := net.ListenPacket("udp4", addr) + if err != nil { + return nil, err + } + + // Configure UDP endpoint in ENR from listener address. + usocket := socket.(*net.UDPConn) + uaddr := socket.LocalAddr().(*net.UDPAddr) + + if uaddr.IP.IsUnspecified() { + ln.SetFallbackIP(net.IP{127, 0, 0, 1}) + } else { + ln.SetFallbackIP(uaddr.IP) + } + + ln.SetFallbackUDP(uaddr.Port) + + return usocket, nil +} diff --git a/cmd/p2p/crawl/crawl_util.go b/cmd/p2p/crawl/crawl_util.go new file mode 100644 index 00000000..e8807a04 --- /dev/null +++ b/cmd/p2p/crawl/crawl_util.go @@ -0,0 +1,256 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package crawl + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/rs/zerolog/log" + + "github.com/maticnetwork/polygon-cli/p2p" +) + +type crawler struct { + input p2p.NodeSet + output p2p.NodeSet + disc resolver + iters []enode.Iterator + inputIter enode.Iterator + ch chan *enode.Node + closed chan struct{} + + // settings + revalidateInterval time.Duration + mu sync.RWMutex +} + +const ( + nodeRemoved = iota + nodeSkipRecent + nodeSkipIncompat + nodeAdded + nodeUpdated +) + +type resolver interface { + RequestENR(*enode.Node) (*enode.Node, error) +} + +func newCrawler(input p2p.NodeSet, disc resolver, iters ...enode.Iterator) *crawler { + c := &crawler{ + input: input, + output: make(p2p.NodeSet, len(input)), + disc: disc, + iters: iters, + inputIter: enode.IterNodes(input.Nodes()), + ch: make(chan *enode.Node), + closed: make(chan struct{}), + } + c.iters = append(c.iters, c.inputIter) + // Copy input to output initially. Any nodes that fail validation + // will be dropped from output during the run. + for id, n := range input { + c.output[id] = n + } + return c +} + +func (c *crawler) run(timeout time.Duration, nthreads int) p2p.NodeSet { + var ( + timeoutTimer = time.NewTimer(timeout) + timeoutCh <-chan time.Time + statusTicker = time.NewTicker(time.Second * 8) + doneCh = make(chan enode.Iterator, len(c.iters)) + liveIters = len(c.iters) + ) + if nthreads < 1 { + nthreads = 1 + } + defer timeoutTimer.Stop() + defer statusTicker.Stop() + for _, it := range c.iters { + go c.runIterator(doneCh, it) + } + var ( + added uint64 + updated uint64 + skipped uint64 + recent uint64 + removed uint64 + wg sync.WaitGroup + ) + wg.Add(nthreads) + for i := 0; i < nthreads; i++ { + go func() { + defer wg.Done() + for { + select { + case n := <-c.ch: + switch c.updateNode(n) { + case nodeSkipIncompat: + atomic.AddUint64(&skipped, 1) + case nodeSkipRecent: + atomic.AddUint64(&recent, 1) + case nodeRemoved: + atomic.AddUint64(&removed, 1) + case nodeAdded: + atomic.AddUint64(&added, 1) + default: + atomic.AddUint64(&updated, 1) + } + case <-c.closed: + return + } + } + }() + } + +loop: + for { + select { + case it := <-doneCh: + if it == c.inputIter { + // Enable timeout when we're done revalidating the input nodes. + log.Info().Int("len", len(c.input)).Msg("Revalidation of input set is done") + if timeout > 0 { + timeoutCh = timeoutTimer.C + } + } + if liveIters--; liveIters == 0 { + break loop + } + case <-timeoutCh: + break loop + case <-statusTicker.C: + log.Info(). + Uint64("added", atomic.LoadUint64(&added)). + Uint64("updated", atomic.LoadUint64(&updated)). + Uint64("removed", atomic.LoadUint64(&removed)). + Uint64("ignored(recent)", atomic.LoadUint64(&removed)). + Uint64("ignored(incompatible)", atomic.LoadUint64(&skipped)). + Msg("Crawling in progress") + } + } + + close(c.closed) + for _, it := range c.iters { + it.Close() + } + for ; liveIters > 0; liveIters-- { + <-doneCh + } + wg.Wait() + return c.output +} + +func (c *crawler) runIterator(done chan<- enode.Iterator, it enode.Iterator) { + defer func() { done <- it }() + for it.Next() { + select { + case c.ch <- it.Node(): + case <-c.closed: + return + } + } +} + +// shouldSkipNode filters out nodes by their network id. If there is a status +// message, skip nodes that don't have the correct network id. Otherwise, skip +// nodes that are unable to peer. +func shouldSkipNode(n *enode.Node) bool { + if inputCrawlParams.NetworkID <= 0 { + return false + } + + conn, err := p2p.Dial(n) + if err != nil { + log.Error().Err(err).Msg("Dial failed") + return true + } + defer conn.Close() + + hello, message, err := conn.Peer() + if err != nil { + log.Error().Err(err).Msg("Peer failed") + return true + } + + log.Debug().Interface("hello", hello).Interface("status", message).Msg("Message received") + return inputCrawlParams.NetworkID != int(message.NetworkID) +} + +// updateNode updates the info about the given node, and returns a status about +// what changed. +func (c *crawler) updateNode(n *enode.Node) int { + c.mu.RLock() + node, ok := c.output[n.ID()] + c.mu.RUnlock() + + // Skip validation of recently-seen nodes. + if ok && time.Since(node.LastCheck) < c.revalidateInterval { + log.Debug().Str("id", n.ID().String()).Msg("Skipping node") + return nodeSkipRecent + } + + // Filter out incompatible nodes. + if shouldSkipNode(n) { + return nodeSkipIncompat + } + + // Request the node record. + status := nodeUpdated + node.LastCheck = truncNow() + + if nn, err := c.disc.RequestENR(n); err != nil { + if node.Score == 0 { + // Node doesn't implement EIP-868. + log.Debug().Str("id", n.ID().String()).Msg("Skipping node") + return nodeSkipIncompat + } + node.Score /= 2 + } else { + node.N = nn + node.Seq = nn.Seq() + node.Score++ + if node.FirstResponse.IsZero() { + node.FirstResponse = node.LastCheck + status = nodeAdded + } + node.LastResponse = node.LastCheck + } + + // Store/update node in output set. + c.mu.Lock() + defer c.mu.Unlock() + + if node.Score <= 0 { + log.Debug().Str("id", n.ID().String()).Msg("Removing node") + delete(c.output, n.ID()) + return nodeRemoved + } + + log.Debug().Str("id", n.ID().String()).Uint64("seq", n.Seq()).Int("score", node.Score).Msg("Updating node") + c.output[n.ID()] = node + return status +} + +func truncNow() time.Time { + return time.Now().UTC().Truncate(1 * time.Second) +} diff --git a/cmd/p2p/p2p.go b/cmd/p2p/p2p.go new file mode 100644 index 00000000..9f13e9cf --- /dev/null +++ b/cmd/p2p/p2p.go @@ -0,0 +1,18 @@ +package p2p + +import ( + "github.com/spf13/cobra" + + "github.com/maticnetwork/polygon-cli/cmd/p2p/crawl" + "github.com/maticnetwork/polygon-cli/cmd/p2p/ping" +) + +var P2pCmd = &cobra.Command{ + Use: "p2p", + Short: "Commands related to devp2p", +} + +func init() { + P2pCmd.AddCommand(crawl.CrawlCmd) + P2pCmd.AddCommand(ping.PingCmd) +} diff --git a/cmd/p2p/ping/ping.go b/cmd/p2p/ping/ping.go new file mode 100644 index 00000000..5dc3e3b5 --- /dev/null +++ b/cmd/p2p/ping/ping.go @@ -0,0 +1,116 @@ +package ping + +import ( + "encoding/json" + "os" + "sync" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/maticnetwork/polygon-cli/p2p" +) + +type ( + pingParams struct { + Threads int + OutputFile string + NodesFile string + } + pingNodeJSON struct { + Record *enode.Node `json:"record"` + Hello *p2p.Hello `json:"hello,omitempty"` + Status *p2p.Status `json:"status,omitempty"` + Error string `json:"error,omitempty"` + } + pingNodeSet map[enode.ID]pingNodeJSON +) + +var ( + inputPingParams pingParams +) + +var PingCmd = &cobra.Command{ + Use: "ping [enode/enr or nodes file]", + Short: "Ping node(s) and return the Hello and Status messages", + Long: `Ping nodes by either giving a single enode/enr or an entire nodes file. This command will establish a handshake and status exchange to get the Hello and Status messages and output JSON.`, + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + nodes := []*enode.Node{} + if inputSet, err := p2p.LoadNodesJSON(args[0]); err == nil { + nodes = inputSet.Nodes() + } else if node, err := p2p.ParseNode(args[0]); err == nil { + nodes = append(nodes, node) + } else { + return err + } + + output := make(pingNodeSet) + + var ( + mutex sync.Mutex + wg sync.WaitGroup + ) + + wg.Add(len(nodes)) + sem := make(chan bool, inputPingParams.Threads) + // Ping each node in the slice. + for _, n := range nodes { + sem <- true + go func(node *enode.Node) { + defer func() { + <-sem + wg.Done() + }() + + var ( + hello *p2p.Hello + status *p2p.Status + errStr string + ) + + conn, err := p2p.Dial(node) + if err != nil { + log.Error().Err(err).Msg("Dial failed") + } else { + defer conn.Close() + if hello, status, err = conn.Peer(); err != nil { + log.Error().Err(err).Msg("Peer failed") + } + + log.Debug().Interface("hello", hello).Interface("status", status).Msg("Message received") + } + + if err != nil { + errStr = err.Error() + } + + // Save the results to the output map. + mutex.Lock() + output[node.ID()] = pingNodeJSON{node, hello, status, errStr} + mutex.Unlock() + }(n) + } + wg.Wait() + + // Write the output. + nodesJSON, err := json.MarshalIndent(output, "", " ") + if err != nil { + return err + } + + if inputPingParams.OutputFile == "" { + os.Stdout.Write(nodesJSON) + } else if err := os.WriteFile(inputPingParams.OutputFile, nodesJSON, 0644); err != nil { + return err + } + + return nil + }, +} + +func init() { + PingCmd.PersistentFlags().StringVarP(&inputPingParams.OutputFile, "output", "o", "", "Write ping results to output file. (default stdout)") + PingCmd.PersistentFlags().IntVarP(&inputPingParams.Threads, "parallel", "p", 16, "How many parallel pings to attempt.") +} diff --git a/cmd/root.go b/cmd/root.go index 243d393c..4104c2f5 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -18,9 +18,11 @@ package cmd import ( "fmt" + "os" + "github.com/maticnetwork/polygon-cli/cmd/fork" + "github.com/maticnetwork/polygon-cli/cmd/p2p" "github.com/maticnetwork/polygon-cli/cmd/parseethwallet" - "os" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -87,6 +89,7 @@ func init() { rootCmd.AddCommand(wallet.WalletCmd) rootCmd.AddCommand(fork.ForkCmd) rootCmd.AddCommand(parseethwallet.ParseETHWalletCmd) + rootCmd.AddCommand(p2p.P2pCmd) } // initConfig reads in config file and ENV variables if set. diff --git a/go.mod b/go.mod index 0b847af9..f52cbc97 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.6 github.com/coinbase/kryptology v1.8.0 - github.com/ethereum/go-ethereum v1.10.25 + github.com/ethereum/go-ethereum v1.10.26 github.com/gizak/termui/v3 v3.1.0 github.com/hashicorp/go-hclog v1.3.1 github.com/libp2p/go-libp2p v0.22.0 @@ -86,6 +86,7 @@ require ( github.com/hashicorp/vault/api v1.8.2 // indirect github.com/hashicorp/vault/sdk v0.6.0 // indirect github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect + github.com/holiman/uint256 v1.2.0 // indirect github.com/huin/goupnp v1.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect diff --git a/go.sum b/go.sum index 23715885..830154a1 100644 --- a/go.sum +++ b/go.sum @@ -217,8 +217,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= -github.com/ethereum/go-ethereum v1.10.25 h1:5dFrKJDnYf8L6/5o42abCE6a9yJm9cs4EJVRyYMr55s= -github.com/ethereum/go-ethereum v1.10.25/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= +github.com/ethereum/go-ethereum v1.10.26 h1:i/7d9RBBwiXCEuyduBQzJw/mKmnvzsN14jqBmytw72s= +github.com/ethereum/go-ethereum v1.10.26/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= @@ -426,6 +426,7 @@ github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQg github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.2.0 h1:gpSYcPLWGv4sG43I2mVLiDZCNDh/EpGjSk8tmtxitHM= +github.com/holiman/uint256 v1.2.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goupnp v1.0.3 h1:N8No57ls+MnjlB+JPiCVSOyy/ot7MJTqlo7rn+NYSqQ= diff --git a/p2p/nodeset.go b/p2p/nodeset.go new file mode 100644 index 00000000..43462a47 --- /dev/null +++ b/p2p/nodeset.go @@ -0,0 +1,84 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package p2p + +import ( + "bytes" + "encoding/json" + "os" + "sort" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +const jsonIndent = " " + +// NodeSet is the nodes.json file format. It holds a set of node records +// as a JSON object. +type NodeSet map[enode.ID]NodeJSON + +type NodeJSON struct { + Seq uint64 `json:"seq"` + N *enode.Node `json:"record"` + + // The score tracks how many liveness checks were performed. It is incremented by one + // every time the node passes a check, and halved every time it doesn't. + Score int `json:"score,omitempty"` + // These two track the time of last successful contact. + FirstResponse time.Time `json:"firstResponse,omitempty"` + LastResponse time.Time `json:"lastResponse,omitempty"` + // This one tracks the time of our last attempt to contact the node. + LastCheck time.Time `json:"lastCheck,omitempty"` +} + +func LoadNodesJSON(file string) (NodeSet, error) { + var nodes NodeSet + if err := common.LoadJSON(file, &nodes); err != nil { + return nil, err + } + return nodes, nil +} + +func WriteNodesJSON(file string, nodes NodeSet) error { + nodesJSON, err := json.MarshalIndent(nodes, "", jsonIndent) + if err != nil { + return err + } + if file == "-" { + os.Stdout.Write(nodesJSON) + return nil + } + if err := os.WriteFile(file, nodesJSON, 0644); err != nil { + return err + } + return nil +} + +// Nodes returns the node records contained in the set. +func (ns NodeSet) Nodes() []*enode.Node { + result := make([]*enode.Node, 0, len(ns)) + for _, n := range ns { + result = append(result, n.N) + } + // Sort by ID. + sort.Slice(result, func(i, j int) bool { + return bytes.Compare(result[i].ID().Bytes(), result[j].ID().Bytes()) < 0 + }) + return result +} diff --git a/p2p/p2p.go b/p2p/p2p.go new file mode 100644 index 00000000..05cbf204 --- /dev/null +++ b/p2p/p2p.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "bytes" + "encoding/base64" + "encoding/hex" + "fmt" + "strings" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/rlp" +) + +func decodeRecordHex(b []byte) ([]byte, bool) { + if bytes.HasPrefix(b, []byte("0x")) { + b = b[2:] + } + + dec := make([]byte, hex.DecodedLen(len(b))) + _, err := hex.Decode(dec, b) + + return dec, err == nil +} + +func decodeRecordBase64(b []byte) ([]byte, bool) { + if bytes.HasPrefix(b, []byte("enr:")) { + b = b[4:] + } + + dec := make([]byte, base64.RawURLEncoding.DecodedLen(len(b))) + n, err := base64.RawURLEncoding.Decode(dec, b) + + return dec[:n], err == nil +} + +// parseRecord parses a node record from hex, base64, or raw binary input. +func parseRecord(source string) (*enr.Record, error) { + bin := []byte(source) + + if d, ok := decodeRecordHex(bytes.TrimSpace(bin)); ok { + bin = d + } else if d, ok := decodeRecordBase64(bytes.TrimSpace(bin)); ok { + bin = d + } + + var r enr.Record + err := rlp.DecodeBytes(bin, &r) + + return &r, err +} + +// ParseNode parses a node record and verifies its signature. +func ParseNode(source string) (*enode.Node, error) { + if strings.HasPrefix(source, "enode://") { + return enode.ParseV4(source) + } + + r, err := parseRecord(source) + if err != nil { + return nil, err + } + + return enode.New(enode.ValidSchemes, r) +} + +// ParseBootnodes parses the bootnodes string and returns a node slice. +func ParseBootnodes(bootnodes string) ([]*enode.Node, error) { + s := strings.Split(bootnodes, ",") + + nodes := make([]*enode.Node, len(s)) + var err error + for i, record := range s { + nodes[i], err = ParseNode(record) + if err != nil { + return nil, fmt.Errorf("invalid bootstrap node: %v", err) + } + } + + return nodes, nil +} diff --git a/p2p/rlpx.go b/p2p/rlpx.go new file mode 100644 index 00000000..5abb2f5f --- /dev/null +++ b/p2p/rlpx.go @@ -0,0 +1,138 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package p2p + +import ( + "fmt" + "net" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/rlpx" + "github.com/rs/zerolog/log" +) + +var ( + timeout = 20 * time.Second +) + +// Dial attempts to Dial the given node and perform a handshake, +// returning the created Conn if successful. +func Dial(n *enode.Node) (*Conn, error) { + fd, err := net.Dial("tcp", fmt.Sprintf("%v:%d", n.IP(), n.TCP())) + if err != nil { + return nil, err + } + + conn := Conn{Conn: rlpx.NewConn(fd, n.Pubkey())} + + if conn.ourKey, err = crypto.GenerateKey(); err != nil { + return nil, err + } + + defer func() { _ = conn.SetDeadline(time.Time{}) }() + if err = conn.SetDeadline(time.Now().Add(20 * time.Second)); err != nil { + return nil, err + } + if _, err = conn.Handshake(conn.ourKey); err != nil { + conn.Close() + return nil, err + } + + conn.caps = []p2p.Cap{ + {Name: "eth", Version: 66}, + {Name: "eth", Version: 67}, + {Name: "eth", Version: 68}, + } + + return &conn, nil +} + +// Peer performs both the protocol handshake and the status message +// exchange with the node in order to Peer with it. +func (c *Conn) Peer() (*Hello, *Status, error) { + hello, err := c.handshake() + if err != nil { + return nil, nil, fmt.Errorf("handshake failed: %v", err) + } + status, err := c.status() + if err != nil { + return hello, nil, fmt.Errorf("status exchange failed: %v", err) + } + return hello, status, nil +} + +// handshake performs a protocol handshake with the node. +func (c *Conn) handshake() (*Hello, error) { + defer func() { _ = c.SetDeadline(time.Time{}) }() + if err := c.SetDeadline(time.Now().Add(10 * time.Second)); err != nil { + return nil, err + } + + // write hello to client + pub0 := crypto.FromECDSAPub(&c.ourKey.PublicKey)[1:] + ourHandshake := &Hello{ + Version: 5, + Caps: c.caps, + ID: pub0, + } + if err := c.Write(ourHandshake); err != nil { + return nil, fmt.Errorf("write to connection failed: %v", err) + } + + // read hello from client + switch msg := c.Read().(type) { + case *Hello: + if msg.Version >= 5 { + c.SetSnappy(true) + } + return msg, nil + case *Disconnect: + return nil, fmt.Errorf("disconnect received: %v", msg) + case *Disconnects: + return nil, fmt.Errorf("disconnect received: %v", msg) + default: + return nil, fmt.Errorf("bad handshake: %v", msg) + } +} + +// status gets the `status` message from the given node. +func (c *Conn) status() (*Status, error) { + defer func() { _ = c.SetDeadline(time.Time{}) }() + if err := c.SetDeadline(time.Now().Add(20 * time.Second)); err != nil { + return nil, err + } + + for { + switch msg := c.Read().(type) { + case *Status: + return msg, nil + case *Disconnect: + return nil, fmt.Errorf("disconnect received: %v", msg) + case *Disconnects: + return nil, fmt.Errorf("disconnect received: %v", msg) + case *Ping: + if err := c.Write(&Pong{}); err != nil { + log.Error().Err(err).Msg("Write pong failed") + } + default: + return nil, fmt.Errorf("bad status message: %v", msg) + } + } +} diff --git a/p2p/types.go b/p2p/types.go new file mode 100644 index 00000000..ef36652a --- /dev/null +++ b/p2p/types.go @@ -0,0 +1,339 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package p2p + +import ( + "crypto/ecdsa" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/rlpx" + "github.com/ethereum/go-ethereum/rlp" +) + +type Message interface { + Code() int + ReqID() uint64 +} + +type Error struct { + err error +} + +func (e *Error) Unwrap() error { return e.err } +func (e *Error) Error() string { return e.err.Error() } +func (e *Error) String() string { return e.Error() } + +func (e *Error) Code() int { return -1 } +func (e *Error) ReqID() uint64 { return 0 } + +func errorf(format string, args ...interface{}) *Error { + return &Error{fmt.Errorf(format, args...)} +} + +// Hello is the RLP structure of the protocol handshake. +type Hello struct { + Version uint64 + Name string + Caps []p2p.Cap + ListenPort uint64 + ID []byte // secp256k1 public key + + // Ignore additional fields (for forward compatibility). + Rest []rlp.RawValue `rlp:"tail"` +} + +func (msg Hello) Code() int { return 0x00 } +func (msg Hello) ReqID() uint64 { return 0 } + +// Disconnect is the RLP structure for a disconnect message. +type Disconnect struct { + Reason p2p.DiscReason +} + +func (msg Disconnect) Code() int { return 0x01 } +func (msg Disconnect) ReqID() uint64 { return 0 } + +// Disconnects is the RLP structure for disconnect messages. +type Disconnects []p2p.DiscReason + +func (msg Disconnects) Code() int { return 0x01 } +func (msg Disconnects) ReqID() uint64 { return 0 } + +type Ping struct{} + +func (msg Ping) Code() int { return 0x02 } +func (msg Ping) ReqID() uint64 { return 0 } + +type Pong struct{} + +func (msg Pong) Code() int { return 0x03 } +func (msg Pong) ReqID() uint64 { return 0 } + +// Status is the network packet for the status message for eth/64 and later. +type Status eth.StatusPacket + +func (msg Status) Code() int { return 16 } +func (msg Status) ReqID() uint64 { return 0 } + +// NewBlockHashes is the network packet for the block announcements. +type NewBlockHashes eth.NewBlockHashesPacket + +func (msg NewBlockHashes) Code() int { return 17 } +func (msg NewBlockHashes) ReqID() uint64 { return 0 } + +type Transactions eth.TransactionsPacket + +func (msg Transactions) Code() int { return 18 } +func (msg Transactions) ReqID() uint64 { return 18 } + +// GetBlockHeaders represents a block header query. +type GetBlockHeaders eth.GetBlockHeadersPacket66 + +func (msg GetBlockHeaders) Code() int { return 19 } +func (msg GetBlockHeaders) ReqID() uint64 { return msg.RequestId } + +type BlockHeaders eth.BlockHeadersPacket66 + +func (msg BlockHeaders) Code() int { return 20 } +func (msg BlockHeaders) ReqID() uint64 { return msg.RequestId } + +// GetBlockBodies represents a GetBlockBodies request +type GetBlockBodies eth.GetBlockBodiesPacket66 + +func (msg GetBlockBodies) Code() int { return 21 } +func (msg GetBlockBodies) ReqID() uint64 { return msg.RequestId } + +// BlockBodies is the network packet for block content distribution. +type BlockBodies eth.BlockBodiesPacket66 + +func (msg BlockBodies) Code() int { return 22 } +func (msg BlockBodies) ReqID() uint64 { return msg.RequestId } + +// NewBlock is the network packet for the block propagation message. +type NewBlock eth.NewBlockPacket + +func (msg NewBlock) Code() int { return 23 } +func (msg NewBlock) ReqID() uint64 { return 0 } + +// NewPooledTransactionHashes66 is the network packet for the tx hash propagation message. +// type NewPooledTransactionHashes66 eth.NewPooledTransactionHashesPacket66 +// +// func (msg NewPooledTransactionHashes66) Code() int { return 24 } +// func (msg NewPooledTransactionHashes66) ReqID() uint64 { return 0 } +// +// NewPooledTransactionHashes is the network packet for the tx hash propagation message. +// type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket68 +// +// func (msg NewPooledTransactionHashes) Code() int { return 24 } +// func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 } + +type GetPooledTransactions eth.GetPooledTransactionsPacket66 + +func (msg GetPooledTransactions) Code() int { return 25 } +func (msg GetPooledTransactions) ReqID() uint64 { return msg.RequestId } + +type PooledTransactions eth.PooledTransactionsPacket66 + +func (msg PooledTransactions) Code() int { return 26 } +func (msg PooledTransactions) ReqID() uint64 { return msg.RequestId } + +// Conn represents an individual connection with a peer +type Conn struct { + *rlpx.Conn + ourKey *ecdsa.PrivateKey + caps []p2p.Cap +} + +// Read reads an eth66 packet from the connection. +func (c *Conn) Read() Message { + code, rawData, _, err := c.Conn.Read() + if err != nil { + return errorf("could not read from connection: %v", err) + } + + var msg Message + switch int(code) { + case (Hello{}).Code(): + msg = new(Hello) + case (Ping{}).Code(): + msg = new(Ping) + case (Pong{}).Code(): + msg = new(Pong) + case (Disconnect{}).Code(): + // Because disconnects have different formats, check the multiple one first + // then try the other. + msg = new(Disconnects) + if err := rlp.DecodeBytes(rawData, msg); err != nil { + msg = new(Disconnect) + } + case (Status{}).Code(): + msg = new(Status) + case (GetBlockHeaders{}).Code(): + ethMsg := new(eth.GetBlockHeadersPacket66) + if err := rlp.DecodeBytes(rawData, ethMsg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return (*GetBlockHeaders)(ethMsg) + case (BlockHeaders{}).Code(): + ethMsg := new(eth.BlockHeadersPacket66) + if err := rlp.DecodeBytes(rawData, ethMsg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return (*BlockHeaders)(ethMsg) + case (GetBlockBodies{}).Code(): + ethMsg := new(eth.GetBlockBodiesPacket66) + if err := rlp.DecodeBytes(rawData, ethMsg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return (*GetBlockBodies)(ethMsg) + case (BlockBodies{}).Code(): + ethMsg := new(eth.BlockBodiesPacket66) + if err := rlp.DecodeBytes(rawData, ethMsg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return (*BlockBodies)(ethMsg) + case (NewBlock{}).Code(): + msg = new(NewBlock) + case (NewBlockHashes{}).Code(): + msg = new(NewBlockHashes) + case (Transactions{}).Code(): + msg = new(Transactions) + // case (NewPooledTransactionHashes66{}).Code(): + // // Try decoding to eth68 + // ethMsg := new(NewPooledTransactionHashes) + // if err := rlp.DecodeBytes(rawData, ethMsg); err == nil { + // return ethMsg + // } + // msg = new(NewPooledTransactionHashes66) + case (GetPooledTransactions{}.Code()): + ethMsg := new(eth.GetPooledTransactionsPacket66) + if err := rlp.DecodeBytes(rawData, ethMsg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return (*GetPooledTransactions)(ethMsg) + case (PooledTransactions{}.Code()): + ethMsg := new(eth.PooledTransactionsPacket66) + if err := rlp.DecodeBytes(rawData, ethMsg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return (*PooledTransactions)(ethMsg) + default: + msg = errorf("invalid message code: %d", code) + } + + if msg != nil { + if err := rlp.DecodeBytes(rawData, msg); err != nil { + return errorf("could not rlp decode message: %v", err) + } + return msg + } + return errorf("invalid message: %s", string(rawData)) +} + +// Write writes a eth packet to the connection. +func (c *Conn) Write(msg Message) error { + payload, err := rlp.EncodeToBytes(msg) + if err != nil { + return err + } + _, err = c.Conn.Write(uint64(msg.Code()), payload) + return err +} + +// ReadSnap reads a snap/1 response with the given id from the connection. +func (c *Conn) ReadSnap(id uint64) (Message, error) { + respId := id + 1 + start := time.Now() + for respId != id && time.Since(start) < timeout { + code, rawData, _, err := c.Conn.Read() + if err != nil { + return nil, fmt.Errorf("could not read from connection: %v", err) + } + var snpMsg interface{} + switch int(code) { + case (GetAccountRange{}).Code(): + snpMsg = new(GetAccountRange) + case (AccountRange{}).Code(): + snpMsg = new(AccountRange) + case (GetStorageRanges{}).Code(): + snpMsg = new(GetStorageRanges) + case (StorageRanges{}).Code(): + snpMsg = new(StorageRanges) + case (GetByteCodes{}).Code(): + snpMsg = new(GetByteCodes) + case (ByteCodes{}).Code(): + snpMsg = new(ByteCodes) + case (GetTrieNodes{}).Code(): + snpMsg = new(GetTrieNodes) + case (TrieNodes{}).Code(): + snpMsg = new(TrieNodes) + default: + //return nil, fmt.Errorf("invalid message code: %d", code) + continue + } + if err := rlp.DecodeBytes(rawData, snpMsg); err != nil { + return nil, fmt.Errorf("could not rlp decode message: %v", err) + } + return snpMsg.(Message), nil + } + return nil, fmt.Errorf("request timed out") +} + +// GetAccountRange represents an account range query. +type GetAccountRange snap.GetAccountRangePacket + +func (msg GetAccountRange) Code() int { return 33 } +func (msg GetAccountRange) ReqID() uint64 { return msg.ID } + +type AccountRange snap.AccountRangePacket + +func (msg AccountRange) Code() int { return 34 } +func (msg AccountRange) ReqID() uint64 { return msg.ID } + +type GetStorageRanges snap.GetStorageRangesPacket + +func (msg GetStorageRanges) Code() int { return 35 } +func (msg GetStorageRanges) ReqID() uint64 { return msg.ID } + +type StorageRanges snap.StorageRangesPacket + +func (msg StorageRanges) Code() int { return 36 } +func (msg StorageRanges) ReqID() uint64 { return msg.ID } + +type GetByteCodes snap.GetByteCodesPacket + +func (msg GetByteCodes) Code() int { return 37 } +func (msg GetByteCodes) ReqID() uint64 { return msg.ID } + +type ByteCodes snap.ByteCodesPacket + +func (msg ByteCodes) Code() int { return 38 } +func (msg ByteCodes) ReqID() uint64 { return msg.ID } + +type GetTrieNodes snap.GetTrieNodesPacket + +func (msg GetTrieNodes) Code() int { return 39 } +func (msg GetTrieNodes) ReqID() uint64 { return msg.ID } + +type TrieNodes snap.TrieNodesPacket + +func (msg TrieNodes) Code() int { return 40 } +func (msg TrieNodes) ReqID() uint64 { return msg.ID }