From 3616768682707ad4513c7a320b29e7f90da6bfa8 Mon Sep 17 00:00:00 2001 From: TenderIronh Date: Thu, 21 Nov 2024 10:29:06 +0800 Subject: [PATCH] rename --- core/holepunch.go | 30 ++++++++++----------- core/p2papp.go | 2 +- core/p2pnetwork.go | 2 +- core/p2ptunnel.go | 63 ++++++++++++++++++++++---------------------- core/underlay_tcp.go | 4 +-- 5 files changed, 50 insertions(+), 51 deletions(-) diff --git a/core/holepunch.go b/core/holepunch.go index ebda78a..2024847 100644 --- a/core/holepunch.go +++ b/core/holepunch.go @@ -13,12 +13,12 @@ import ( func handshakeC2C(t *P2PTunnel) (err error) { gLog.Printf(LvDEBUG, "handshakeC2C %s:%d:%d to %s:%d", gConf.Network.Node, t.coneLocalPort, t.coneNatPort, t.config.peerIP, t.config.peerConeNatPort) defer gLog.Printf(LvDEBUG, "handshakeC2C end") - conn, err := net.ListenUDP("udp", t.la) + conn, err := net.ListenUDP("udp", t.localHoleAddr) if err != nil { return err } defer conn.Close() - _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) + _, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) if err != nil { gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) return err @@ -28,7 +28,7 @@ func handshakeC2C(t *P2PTunnel) (err error) { gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) return err } - t.ra, _ = net.ResolveUDPAddr("udp", ra.String()) + t.remoteHoleAddr, _ = net.ResolveUDPAddr("udp", ra.String()) var tunnelID uint64 if len(buff) > openP2PHeaderSize { req := P2PHandshakeReq{} @@ -40,7 +40,7 @@ func handshakeC2C(t *P2PTunnel) (err error) { } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id { gLog.Printf(LvDEBUG, "read %d handshake ", t.id) - UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) _, head, _, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) @@ -49,7 +49,7 @@ func handshakeC2C(t *P2PTunnel) (err error) { } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck && tunnelID == t.id { gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id) - _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + _, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) if err != nil { gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) return err @@ -70,7 +70,7 @@ func handshakeC2S(t *P2PTunnel) error { startTime := time.Now() r := rand.New(rand.NewSource(time.Now().UnixNano())) randPorts := r.Perm(65532) - conn, err := net.ListenUDP("udp", t.la) + conn, err := net.ListenUDP("udp", t.localHoleAddr) if err != nil { return err } @@ -111,7 +111,7 @@ func handshakeC2S(t *P2PTunnel) error { gLog.Println(LvERROR, "parse p2pheader error:", err) return err } - t.ra, _ = net.ResolveUDPAddr("udp", dst.String()) + t.remoteHoleAddr, _ = net.ResolveUDPAddr("udp", dst.String()) var tunnelID uint64 if len(buff) > openP2PHeaderSize { req := P2PHandshakeReq{} @@ -123,7 +123,7 @@ func handshakeC2S(t *P2PTunnel) error { } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id { gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ", t.id) - UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) for { _, head, buff, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { @@ -146,8 +146,8 @@ func handshakeC2S(t *P2PTunnel) error { } } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, t.ra.String()) - _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, t.remoteHoleAddr.String()) + _, err = UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) return err } else { gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck") @@ -178,7 +178,7 @@ func handshakeS2C(t *P2PTunnel) error { return err } defer conn.Close() - UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) + UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) _, head, buff, _, err := UDPRead(conn, HandshakeTimeout) if err != nil { // gLog.Println(LevelDEBUG, "one of the handshake error:", err) @@ -199,7 +199,7 @@ func handshakeS2C(t *P2PTunnel) error { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake && tunnelID == t.id { gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id) - UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) // may read several MsgPunchHandshake for { _, head, buff, _, err = UDPRead(conn, HandshakeTimeout) @@ -224,7 +224,7 @@ func handshakeS2C(t *P2PTunnel) error { } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String()) - UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + UDPWrite(conn, t.remoteHoleAddr, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) gotIt = true la, _ := net.ResolveUDPAddr("udp", conn.LocalAddr().String()) gotCh <- la @@ -238,14 +238,14 @@ func handshakeS2C(t *P2PTunnel) error { gLog.Printf(LvDEBUG, "send symmetric handshake end") if compareVersion(t.config.peerVersion, SymmetricSimultaneouslySendVersion) < 0 { // compatible with old client gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect") - t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) + GNetwork.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) } select { case <-time.After(HandshakeTimeout): return fmt.Errorf("wait handshake timeout") case la := <-gotCh: - t.la = la + t.localHoleAddr = la gLog.Println(LvDEBUG, "symmetric handshake ok", la) gLog.Printf(LvINFO, "handshakeS2C ok. cost %dms", time.Since(startTime)/time.Millisecond) } diff --git a/core/p2papp.go b/core/p2papp.go index e375deb..40d9736 100644 --- a/core/p2papp.go +++ b/core/p2papp.go @@ -174,7 +174,7 @@ func (app *p2pApp) buildDirectTunnel() error { pn := GNetwork initErr := pn.requestPeerInfo(&app.config) if initErr != nil { - gLog.Printf(LvERROR, "%s init error:%s", app.config.LogPeerNode(), initErr) + gLog.Printf(LvERROR, "%s requestPeerInfo error:%s", app.config.LogPeerNode(), initErr) return initErr } t, err = pn.addDirectTunnel(app.config, 0) diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index 3ae14b3..82eef50 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -468,7 +468,7 @@ func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t } } - t = &P2PTunnel{pn: pn, + t = &P2PTunnel{ config: config, id: tid, writeData: make(chan []byte, WriteDataChanSize), diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index e4ed9ce..9eb7f45 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -19,13 +19,12 @@ const WriteDataChanSize int = 3000 var buildTunnelMtx sync.Mutex type P2PTunnel struct { - pn *P2PNetwork conn underlay hbTime time.Time hbMtx sync.Mutex config AppConfig - la *net.UDPAddr // local hole address - ra *net.UDPAddr // remote hole address + localHoleAddr *net.UDPAddr // local hole address + remoteHoleAddr *net.UDPAddr // remote hole address overlayConns sync.Map // both TCP and UDP id uint64 // client side alloc rand.uint64 = server side running bool @@ -58,7 +57,7 @@ func (t *P2PTunnel) initPort() { t.coneLocalPort = localPort2 t.coneNatPort = natPort } - t.la = &net.UDPAddr{IP: net.ParseIP(gConf.Network.localIP), Port: t.coneLocalPort} + t.localHoleAddr = &net.UDPAddr{IP: net.ParseIP(gConf.Network.localIP), Port: t.coneLocalPort} gLog.Printf(LvDEBUG, "prepare punching port %d:%d", t.coneLocalPort, t.coneNatPort) } @@ -85,8 +84,8 @@ func (t *P2PTunnel) connect() error { if req.Token == 0 { // no relay token req.Token = gConf.Network.Token } - t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) - head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, UnderlayConnectTimeout*3) + GNetwork.push(t.config.PeerNode, MsgPushConnectReq, req) + head, body := GNetwork.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, UnderlayConnectTimeout*3) if head == nil { return errors.New("connect error") } @@ -161,7 +160,7 @@ func (t *P2PTunnel) checkActive() bool { // call when user delete tunnel func (t *P2PTunnel) close() { - t.pn.NotifyTunnelClose(t) + GNetwork.NotifyTunnelClose(t) if !t.running { return } @@ -169,7 +168,7 @@ func (t *P2PTunnel) close() { if t.conn != nil { t.conn.Close() } - t.pn.allTunnels.Delete(t.id) + GNetwork.allTunnels.Delete(t.id) gLog.Printf(LvINFO, "%d p2ptunnel close %s ", t.id, t.config.LogPeerNode()) } @@ -190,7 +189,7 @@ func (t *P2PTunnel) start() error { func (t *P2PTunnel) handshake() error { if t.config.peerConeNatPort > 0 { // only peer is cone should prepare t.ra var err error - t.ra, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort)) + t.remoteHoleAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort)) if err != nil { return err } @@ -198,7 +197,7 @@ func (t *P2PTunnel) handshake() error { if compareVersion(t.config.peerVersion, SyncServerTimeVersion) < 0 { gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) } else { - ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + ts := time.Duration(int64(t.punchTs) + GNetwork.dt + GNetwork.ddtma*int64(time.Since(GNetwork.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) if ts > PunchTsDelay || ts < 0 { ts = PunchTsDelay } @@ -267,11 +266,11 @@ func (t *P2PTunnel) connectUnderlayUDP() (c underlay, err error) { } if t.config.isUnderlayServer == 1 { time.Sleep(time.Millisecond * 10) // punching udp port will need some times in some env - go t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) + go GNetwork.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) if t.config.UnderlayProtocol == "kcp" { - ul, err = listenKCP(t.la.String(), TunnelIdleTimeout) + ul, err = listenKCP(t.localHoleAddr.String(), TunnelIdleTimeout) } else { - ul, err = listenQuic(t.la.String(), TunnelIdleTimeout) + ul, err = listenQuic(t.localHoleAddr.String(), TunnelIdleTimeout) } if err != nil { @@ -293,24 +292,24 @@ func (t *P2PTunnel) connectUnderlayUDP() (c underlay, err error) { } //else - conn, errL := net.ListenUDP("udp", t.la) + conn, errL := net.ListenUDP("udp", t.localHoleAddr) if errL != nil { time.Sleep(time.Millisecond * 10) - conn, errL = net.ListenUDP("udp", t.la) + conn, errL = net.ListenUDP("udp", t.localHoleAddr) if errL != nil { return nil, fmt.Errorf("%s listen error:%s", underlayProtocol, errL) } } - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) - gLog.Printf(LvDEBUG, "%s dial to %s", underlayProtocol, t.ra.String()) + GNetwork.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) + gLog.Printf(LvDEBUG, "%s dial to %s", underlayProtocol, t.remoteHoleAddr.String()) if t.config.UnderlayProtocol == "kcp" { - ul, errL = dialKCP(conn, t.ra, TunnelIdleTimeout) + ul, errL = dialKCP(conn, t.remoteHoleAddr, TunnelIdleTimeout) } else { - ul, errL = dialQuic(conn, t.ra, TunnelIdleTimeout) + ul, errL = dialQuic(conn, t.remoteHoleAddr, TunnelIdleTimeout) } if errL != nil { - return nil, fmt.Errorf("%s dial to %s error:%s", underlayProtocol, t.ra.String(), errL) + return nil, fmt.Errorf("%s dial to %s error:%s", underlayProtocol, t.remoteHoleAddr.String(), errL) } handshakeBegin := time.Now() ul.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) @@ -353,12 +352,12 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { // client side if t.config.linkMode == LinkModeTCP4 { - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) + GNetwork.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) } else { //tcp punch should sleep for punch the same time if compareVersion(t.config.peerVersion, SyncServerTimeVersion) < 0 { gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) } else { - ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + ts := time.Duration(int64(t.punchTs) + GNetwork.dt + GNetwork.ddtma*int64(time.Since(GNetwork.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) if ts > PunchTsDelay || ts < 0 { ts = PunchTsDelay } @@ -394,7 +393,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { func (t *P2PTunnel) connectUnderlayTCPSymmetric() (c underlay, err error) { gLog.Printf(LvDEBUG, "connectUnderlayTCPSymmetric %s start ", t.config.LogPeerNode()) defer gLog.Printf(LvDEBUG, "connectUnderlayTCPSymmetric %s end ", t.config.LogPeerNode()) - ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + ts := time.Duration(int64(t.punchTs) + GNetwork.dt + GNetwork.ddtma*int64(time.Since(GNetwork.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) if ts > PunchTsDelay || ts < 0 { ts = PunchTsDelay } @@ -490,7 +489,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { defer gLog.Printf(LvDEBUG, "connectUnderlayTCP6 %s end ", t.config.LogPeerNode()) var ul *underlayTCP6 if t.config.isUnderlayServer == 1 { - t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) + GNetwork.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) ul, err = listenTCP6(t.coneNatPort, UnderlayConnectTimeout) if err != nil { return nil, fmt.Errorf("listen TCP6 error:%s", err) @@ -509,7 +508,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { } //else - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) + GNetwork.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6) ul, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) if err != nil || ul == nil { @@ -596,7 +595,7 @@ func (t *P2PTunnel) readLoop() { } tunnelID := binary.LittleEndian.Uint64(body[:8]) gLog.Printf(LvDev, "relay data to %d, len=%d", tunnelID, head.DataLen-RelayHeaderSize) - if err := t.pn.relay(tunnelID, body[RelayHeaderSize:]); err != nil { + if err := GNetwork.relay(tunnelID, body[RelayHeaderSize:]); err != nil { gLog.Printf(LvERROR, "%s:%d relay to %d len=%d error:%s", t.config.LogPeerNode(), t.id, tunnelID, len(body), ErrRelayTunnelNotFound) } case MsgRelayHeartbeat: @@ -608,7 +607,7 @@ func (t *P2PTunnel) readLoop() { // TODO: debug relay heartbeat gLog.Printf(LvDEBUG, "read MsgRelayHeartbeat from rtid:%d,appid:%d", req.RelayTunnelID, req.AppID) // update app hbtime - t.pn.updateAppHeartbeat(req.AppID) + GNetwork.updateAppHeartbeat(req.AppID) req.From = gConf.Network.Node t.WriteMessage(req.RelayTunnelID, MsgP2P, MsgRelayHeartbeatAck, &req) case MsgRelayHeartbeatAck: @@ -620,7 +619,7 @@ func (t *P2PTunnel) readLoop() { } // TODO: debug relay heartbeat gLog.Printf(LvDEBUG, "read MsgRelayHeartbeatAck to appid:%d", req.AppID) - t.pn.updateAppHeartbeat(req.AppID) + GNetwork.updateAppHeartbeat(req.AppID) case MsgOverlayConnectReq: req := OverlayConnectReq{} if err := json.Unmarshal(body, &req); err != nil { @@ -733,7 +732,7 @@ func (t *P2PTunnel) listen() error { FromIP: gConf.Network.publicIP, ConeNatPort: t.coneNatPort, ID: t.id, - PunchTs: uint64(time.Now().UnixNano() + int64(PunchTsDelay) - t.pn.dt), + PunchTs: uint64(time.Now().UnixNano() + int64(PunchTsDelay) - GNetwork.dt), Version: OpenP2PVersion, } t.punchTs = rsp.PunchTs @@ -742,7 +741,7 @@ func (t *P2PTunnel) listen() error { rsp.IPv6 = gConf.IPv6() } - t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp) + GNetwork.push(t.config.PeerNode, MsgPushConnectRsp, rsp) gLog.Printf(LvDEBUG, "p2ptunnel wait for connecting") t.tunnelServer = true return t.start() @@ -760,9 +759,9 @@ func (t *P2PTunnel) closeOverlayConns(appID uint64) { func (t *P2PTunnel) handleNodeData(head *openP2PHeader, body []byte, isRelay bool) { gLog.Printf(LvDev, "%d tunnel read node data bodylen=%d, relay=%t", t.id, head.DataLen, isRelay) - ch := t.pn.nodeData + ch := GNetwork.nodeData // if body[9] == 1 { // TODO: deal relay - // ch = t.pn.nodeDataSmall + // ch = GNetwork.nodeDataSmall // gLog.Printf(LvDEBUG, "read icmp %d", time.Now().Unix()) // } if isRelay { diff --git a/core/underlay_tcp.go b/core/underlay_tcp.go index 4eb7b2d..c4a6eb2 100644 --- a/core/underlay_tcp.go +++ b/core/underlay_tcp.go @@ -51,7 +51,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) if compareVersion(t.config.peerVersion, SyncServerTimeVersion) < 0 { gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) } else { - ts := time.Duration(int64(t.punchTs) + t.pn.dt - time.Now().UnixNano()) + ts := time.Duration(int64(t.punchTs) + GNetwork.dt - time.Now().UnixNano()) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) time.Sleep(ts) } @@ -72,7 +72,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff) return utcp, nil } - t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) + GNetwork.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) tid := t.id if compareVersion(t.config.peerVersion, PublicIPVersion) < 0 { // old version ipBytes := net.ParseIP(t.config.peerIP).To4()