Skip to content

Commit

Permalink
fix: utp initial in test case
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod authored and GrapeBaBa committed Nov 19, 2024
1 parent 814fee9 commit 8bb4364
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 47 deletions.
21 changes: 6 additions & 15 deletions cmd/shisui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ func shisui(ctx *cli.Context) error {
if err != nil {
return err
}
go func() {
debug.Setup(ctx)
}()

// Start metrics export if enabled
utils.SetupMetrics(ctx)
Expand Down Expand Up @@ -397,11 +394,9 @@ func initHistory(config Config, server *rpc.Server, conn discover.UDPConn, local
conn,
localNode,
discV5,
utp,
contentStorage,
contentQueue,
func(p *discover.PortalProtocol) {
p.Utp = utp
})
contentQueue)

if err != nil {
return nil, err
Expand Down Expand Up @@ -450,11 +445,9 @@ func initBeacon(config Config, server *rpc.Server, conn discover.UDPConn, localN
conn,
localNode,
discV5,
utp,
contentStorage,
contentQueue,
func(p *discover.PortalProtocol) {
p.Utp = utp
})
contentQueue)

if err != nil {
return nil, err
Expand Down Expand Up @@ -496,11 +489,9 @@ func initState(config Config, server *rpc.Server, conn discover.UDPConn, localNo
conn,
localNode,
discV5,
utp,
stateStore,
contentQueue,
func(p *discover.PortalProtocol) {
p.Utp = utp
})
contentQueue)

if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/debug/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ func Setup(ctx *cli.Context) error {
glogger = log.NewGlogHandler(handler)

// logging
//verbosity := log.FromLegacyLevel(ctx.Int(verbosityFlag.Name))
//glogger.Verbosity(verbosity)
verbosity := log.FromLegacyLevel(ctx.Int(verbosityFlag.Name))
glogger.Verbosity(verbosity)
vmodule := ctx.String(logVmoduleFlag.Name)
if vmodule == "" {
// Retain backwards compatibility with `--vmodule` flag if `--log.vmodule` not set
Expand All @@ -264,7 +264,7 @@ func Setup(ctx *cli.Context) error {
}
glogger.Vmodule(vmodule)

//log.SetDefault(log.NewLogger(glogger))
log.SetDefault(log.NewLogger(glogger))

// profiling, tracing
runtime.MemProfileRate = memprofilerateFlag.Value
Expand Down Expand Up @@ -312,7 +312,7 @@ func StartPProf(address string, withMetrics bool) {
}
log.Info("Starting pprof server", "addr", fmt.Sprintf("http://%s/debug/pprof", address))
go func() {
if err := http.ListenAndServe(address, nil); err != nil {
if err := http.ListenAndServe("0.0.0.0:8080", nil); err != nil {
log.Error("Failure in running pprof server", "err", err)
}
}()
Expand Down
15 changes: 10 additions & 5 deletions p2p/discover/portal_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func defaultContentIdFunc(contentKey []byte) []byte {
return digest[:]
}

func NewPortalProtocol(config *PortalProtocolConfig, protocolId portalwire.ProtocolId, privateKey *ecdsa.PrivateKey, conn UDPConn, localNode *enode.LocalNode, discV5 *UDPv5, storage storage.ContentStorage, contentQueue chan *ContentElement, opts ...PortalProtocolOption) (*PortalProtocol, error) {
func NewPortalProtocol(config *PortalProtocolConfig, protocolId portalwire.ProtocolId, privateKey *ecdsa.PrivateKey, conn UDPConn, localNode *enode.LocalNode, discV5 *UDPv5, utp *PortalUtp, storage storage.ContentStorage, contentQueue chan *ContentElement, opts ...PortalProtocolOption) (*PortalProtocol, error) {
closeCtx, cancelCloseCtx := context.WithCancel(context.Background())

protocol := &PortalProtocol{
Expand All @@ -222,6 +222,7 @@ func NewPortalProtocol(config *PortalProtocolConfig, protocolId portalwire.Proto
offerQueue: make(chan *OfferRequestWithNode, concurrentOffers),
conn: conn,
DiscV5: discV5,
Utp: utp,
NAT: config.NAT,
clock: config.clock,
connIdGen: libutp.NewConnIdGenerator(),
Expand All @@ -247,7 +248,9 @@ func (p *PortalProtocol) Start() error {
}

p.DiscV5.RegisterTalkHandler(p.protocolId, p.handleTalkRequest)
err = p.Utp.Start()
if p.Utp != nil {
err = p.Utp.Start()
}
if err != nil {
return err
}
Expand All @@ -268,7 +271,9 @@ func (p *PortalProtocol) Stop() {
p.cancelCloseCtx()
p.table.close()
p.DiscV5.Close()
p.Utp.Stop()
if p.Utp != nil {
p.Utp.Stop()
}
}
func (p *PortalProtocol) RoutingTableInfo() [][]string {
p.table.mutex.Lock()
Expand Down Expand Up @@ -1146,7 +1151,7 @@ func (p *PortalProtocol) handleFindContent(id enode.ID, addr *net.UDPAddr, reque
}(p.closeCtx, connectionId)

idBuffer := make([]byte, 2)
binary.BigEndian.PutUint16(idBuffer, uint16(connectionId.SendId()))
binary.BigEndian.PutUint16(idBuffer, connectionId.SendId())
connIdMsg := &portalwire.ConnectionId{
Id: idBuffer,
}
Expand Down Expand Up @@ -1286,7 +1291,7 @@ func (p *PortalProtocol) handleOffer(id enode.ID, addr *net.UDPAddr, request *po
}
}(p.closeCtx, connectionId)

binary.BigEndian.PutUint16(idBuffer, uint16(connectionId.SendId()))
binary.BigEndian.PutUint16(idBuffer, connectionId.SendId())
} else {
binary.BigEndian.PutUint16(idBuffer, uint16(0))
}
Expand Down
10 changes: 4 additions & 6 deletions p2p/discover/portal_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ func setupLocalPortalNode(addr string, bootNodes []*enode.Node) (*PortalProtocol
conn,
localNode,
discV5,
utpSocket,
&storage.MockStorage{Db: make(map[string][]byte)},
contentQueue,
func(p *PortalProtocol) {
p.Utp = utpSocket
})
contentQueue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -195,7 +193,7 @@ func TestPortalWireProtocolUdp(t *testing.T) {
_ = connWithConnId.Close()
}
}()
connWithConnId, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), uint16(cid1.SendId()))
connWithConnId, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), cid1.SendId())
if err != nil {
panic(err)
}
Expand All @@ -218,7 +216,7 @@ func TestPortalWireProtocolUdp(t *testing.T) {
_ = ConnId2Conn.Close()
}
}()
ConnId2Conn, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), uint16(cid2.SendId()))
ConnId2Conn, err = node2.Utp.DialWithCid(context.Background(), node1.localNode.Node(), cid2.SendId())
if err != nil && err != io.EOF {
panic(err)
}
Expand Down
20 changes: 6 additions & 14 deletions p2p/discover/portal_utp.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,20 @@ func NewPortalUtp(ctx context.Context, config *PortalProtocolConfig, discV5 *UDP
}

func (p *PortalUtp) Start() error {
errCh := make(chan error, 1)
p.startOnce.Do(func() {
defer func() {
close(errCh)
}()
laddr := p.getLocalAddr()

p.packetRouter = utp.NewPacketRouter(p.packetRouterFunc)

var err error
go p.startOnce.Do(func() {
var logger *zap.Logger
var err error
if p.log.Enabled(p.ctx, log.LevelDebug) || p.log.Enabled(p.ctx, log.LevelTrace) {
logger, err = zap.NewDevelopmentConfig().Build()
} else {
logger, err = zap.NewProductionConfig().Build()
}
if err != nil {
errCh <- err
return
}

laddr := p.getLocalAddr()
p.packetRouter = utp.NewPacketRouter(p.packetRouterFunc)
p.utpSm, err = utp.NewSocketManagerWithOptions(
"utp",
laddr,
Expand All @@ -70,12 +64,10 @@ func (p *PortalUtp) Start() error {
utp.WithPacketRouter(p.packetRouter),
utp.WithMaxPacketSize(1145))
if err != nil {
errCh <- err
return
}
p.listener, err = utp.ListenUTPOptions("utp", (*utp.Addr)(laddr), utp.WithSocketManager(p.utpSm))
if err != nil {
errCh <- err
return
}
p.lAddr = p.listener.Addr().(*utp.Addr)
Expand All @@ -84,7 +76,7 @@ func (p *PortalUtp) Start() error {
p.discV5.RegisterTalkHandler(string(portalwire.Utp), p.handleUtpTalkRequest)
})

return <-errCh
return err
}

func (p *PortalUtp) Stop() {
Expand Down
4 changes: 3 additions & 1 deletion portalnetwork/beacon/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package beacon

import (
"bytes"
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -65,7 +66,8 @@ func SetupBeaconNetwork(addr string, bootNodes []*enode.Node) (*BeaconNetwork, e

contentQueue := make(chan *discover.ContentElement, 50)

portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.Beacon, privKey, conn, localNode, discV5, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue)
utpSocket := discover.NewPortalUtp(context.Background(), conf, discV5, conn)
portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.Beacon, privKey, conn, localNode, discV5, utpSocket, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions portalnetwork/history/history_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"bytes"
"context"
"crypto/sha256"
_ "embed"
"encoding/json"
Expand Down Expand Up @@ -334,8 +335,8 @@ func genHistoryNetwork(addr string, bootNodes []*enode.Node) (*HistoryNetwork, e
}

contentQueue := make(chan *discover.ContentElement, 50)

portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.History, privKey, conn, localNode, discV5, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue)
utpSocket := discover.NewPortalUtp(context.Background(), conf, discV5, conn)
portalProtocol, err := discover.NewPortalProtocol(conf, portalwire.History, privKey, conn, localNode, discV5, utpSocket, &storage.MockStorage{Db: make(map[string][]byte)}, contentQueue)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8bb4364

Please sign in to comment.