From 3dd95b6d292b80ea0cf8a10cfeee6558963ffaaa Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Mon, 23 May 2022 22:50:17 +0800 Subject: [PATCH 01/11] feat: implement pluggable transport interface --- pkg/core/README.md | 15 +++++++ pkg/core/client.go | 25 +++++++++--- pkg/core/server.go | 84 +++++++++++++++++++++++++++++++++++++- pkg/core/transport_test.go | 3 ++ 4 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 pkg/core/README.md create mode 100644 pkg/core/transport_test.go diff --git a/pkg/core/README.md b/pkg/core/README.md new file mode 100644 index 0000000000..4d9c100110 --- /dev/null +++ b/pkg/core/README.md @@ -0,0 +1,15 @@ +# A pluggable transport implementation based on Hysteria + +## Hysteria +[Hysteria](https://github.com/HyNetwork/hysteria) uses a custom version of QUIC protocol ([RFC 9000 - QUIC: A UDP-Based Multiplexed and Secure Transport](https://www.rfc-editor.org/rfc/rfc9000.html)): + +* a custom congestion control ([RFC 9002 - QUIC Loss Detection and Congestion Control](https://www.rfc-editor.org/rfc/rfc9002.html)) +* tweaked QUIC parameters +* an obfuscation layer +* non-standard transports (e.g. [faketcp](https://github.com/wangyu-/udp2raw)) + +## Usage + +## Implementation + +The implementation uses [Pluggable Transport Specification v3.0 - Go Transport API](https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/main/releases/PTSpecV3.0/Pluggable%20Transport%20Specification%20v3.0%20-%20Go%20Transport%20API%20v3.0.md) \ No newline at end of file diff --git a/pkg/core/client.go b/pkg/core/client.go index f4c2ab677f..b8a79f8b22 100644 --- a/pkg/core/client.go +++ b/pkg/core/client.go @@ -6,6 +6,12 @@ import ( "crypto/tls" "errors" "fmt" + "math/rand" + "net" + "strconv" + "sync" + "time" + "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/congestion" "github.com/lunixbochs/struc" @@ -13,11 +19,6 @@ import ( "github.com/tobyxdd/hysteria/pkg/pmtud_fix" "github.com/tobyxdd/hysteria/pkg/transport" "github.com/tobyxdd/hysteria/pkg/utils" - "math/rand" - "net" - "strconv" - "sync" - "time" ) var ( @@ -183,6 +184,20 @@ func (c *Client) openStreamWithReconnect() (quic.Connection, quic.Stream, error) return c.quicSession, &wrappedQUICStream{stream}, err } +// Implement Pluggable Transport Client interface +func (c *Client) Dial() (net.Conn, error) { + session, stream, err := c.openStreamWithReconnect() + if err != nil { + return nil, err + } + + return &quicConn{ + Orig: stream, + PseudoLocalAddr: session.LocalAddr(), + PseudoRemoteAddr: session.RemoteAddr(), + }, nil +} + func (c *Client) DialTCP(addr string) (net.Conn, error) { host, port, err := utils.SplitHostPort(addr) if err != nil { diff --git a/pkg/core/server.go b/pkg/core/server.go index 04d887194a..05b65856bf 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "errors" "fmt" + "net" + "github.com/lucas-clemente/quic-go" "github.com/lunixbochs/struc" "github.com/prometheus/client_golang/prometheus" @@ -12,7 +14,6 @@ import ( "github.com/tobyxdd/hysteria/pkg/obfs" "github.com/tobyxdd/hysteria/pkg/pmtud_fix" "github.com/tobyxdd/hysteria/pkg/transport" - "net" ) type ConnectFunc func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) @@ -42,6 +43,11 @@ type Server struct { listener quic.Listener } +type TransportListener struct { + ql quic.Listener + s *Server +} + func NewServer(addr string, protocol string, tlsConfig *tls.Config, quicConfig *quic.Config, transport *transport.ServerTransport, sendBPS uint64, recvBPS uint64, congestionFactory CongestionFactory, disableUDP bool, aclEngine *acl.Engine, obfuscator obfs.Obfuscator, connectFunc ConnectFunc, disconnectFunc DisconnectFunc, @@ -173,3 +179,79 @@ func (s *Server) handleControlStream(cs quic.Connection, stream quic.Stream) ([] } return ch.Auth, ok, vb[0] == protocolVersionV2, nil } + +// Implement Pluggable Transport Server interface +func (tl *TransportListener) Listen() (net.Listener, error) { + return tl, nil +} + +// Addr returns the listener's network address. +func (tl *TransportListener) Addr() net.Addr { + return tl.ql.Addr() +} + +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. +func (tl *TransportListener) Close() error { + return tl.ql.Close() +} + +func (tl *TransportListener) Accept() (conn net.Conn, err error) { + for { + cs, err := tl.ql.Accept(context.Background()) + if err != nil { + return nil, err + } + go func() { + // Expect the client to create a control stream to send its own information + ctx, ctxCancel := context.WithTimeout(context.Background(), protocolTimeout) + stream, err := cs.AcceptStream(ctx) + ctxCancel() + if err != nil { + _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") + return + } + // Handle the control stream + auth, ok, v2, err := tl.s.handleControlStream(cs, stream) + if err != nil { + _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") + return + } + if !ok { + _ = cs.CloseWithError(closeErrorCodeAuth, "auth error") + return + } + // Start accepting streams and messages + sc := newServerClient(v2, cs, tl.s.transport, auth, tl.s.disableUDP, tl.s.aclEngine, + tl.s.tcpRequestFunc, tl.s.tcpErrorFunc, tl.s.udpRequestFunc, tl.s.udpErrorFunc, + tl.s.upCounterVec, tl.s.downCounterVec, tl.s.connGaugeVec) + + if !sc.DisableUDP { + go func() { + for { + msg, err := sc.CS.ReceiveMessage() + if err != nil { + break + } + sc.handleMessage(msg) + } + }() + } + for { + stream, err := sc.CS.AcceptStream(context.Background()) + if err != nil { + return + } + + conn = &quicConn{ + Orig: stream, + PseudoLocalAddr: cs.LocalAddr(), + PseudoRemoteAddr: cs.RemoteAddr(), + } + + return + } + }() + } + +} diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go new file mode 100644 index 0000000000..107107ca6c --- /dev/null +++ b/pkg/core/transport_test.go @@ -0,0 +1,3 @@ +package core + +// TODO From d919c7232589ef3eee8d056390fff25ae71d594e Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Sun, 29 May 2022 01:16:51 +0800 Subject: [PATCH 02/11] chore: WIP --- pkg/core/transport_test.go | 249 ++++++++++++++++++++++++++++++++++++- 1 file changed, 248 insertions(+), 1 deletion(-) diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index 107107ca6c..e2c953dd36 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -1,3 +1,250 @@ package core -// TODO +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "io/ioutil" + "net" + "testing" + "time" + + "github.com/lucas-clemente/quic-go" + "github.com/lucas-clemente/quic-go/congestion" + "github.com/sirupsen/logrus" + hyCongestion "github.com/tobyxdd/hysteria/pkg/congestion" + "github.com/tobyxdd/hysteria/pkg/obfs" + "github.com/tobyxdd/hysteria/pkg/transport" +) + +// Configs for testing +const ( + server_addr = ":2345" + protocol = "" + certFile = "../../hysteria.server.crt" + keyFile = "../../hysteria.server.key" + obfs_str = "f561508f56ed" + auth_str = "da5438aaa690a5748eb59de8f7bedcb0" + client_up_mbps = 20 + client_down_mbps = 1000 + test_data = "Here we go!" + customCA = "../../hysteria.ca.crt" +) + +// Default config from cmd/config.go +const ( + mbpsToBps = 125000 + minSpeedBPS = 16384 + + DefaultStreamReceiveWindow = 15728640 // 15 MB/s + DefaultConnectionReceiveWindow = 67108864 // 64 MB/s + DefaultMaxIncomingStreams = 1024 + + DefaultALPN = "hysteria" +) + +func TestE2E(t *testing.T) { + // Server and Client share the same obfuscator + obfuscator := obfs.NewXPlusObfuscator([]byte(obfs_str)) + + go runServer(obfuscator) + + time.Sleep(time.Second * 5) + err := runClient(obfuscator) + if err != nil { + t.Fail() + } +} + +// Simulate a server +func runServer(obfuscator *obfs.XPlusObfuscator) error { + // Load TLS server config + cer, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + fmt.Println("Cannot read server cert or key files") + return err + } + + var serverTlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cer}, + MinVersion: tls.VersionTLS13, + } + + // QUIC config + quicConfig := &quic.Config{ + InitialStreamReceiveWindow: DefaultStreamReceiveWindow, + MaxStreamReceiveWindow: DefaultStreamReceiveWindow, + InitialConnectionReceiveWindow: DefaultConnectionReceiveWindow, + MaxConnectionReceiveWindow: DefaultConnectionReceiveWindow, + MaxIncomingStreams: DefaultMaxIncomingStreams, // Client doesn't need this + KeepAlive: true, + DisablePathMTUDiscovery: true, // @TODO: not sure what does this mean yet + EnableDatagrams: true, + } + + // Auth + var authFunc ConnectFunc + authFunc, err = passwordAuthFunc(auth_str) + connectFunc := func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) { + ok, msg := authFunc(addr, auth, sSend, sRecv) + if !ok { + logrus.WithFields(logrus.Fields{ + "src": addr, + "msg": msg, + }).Info("Authentication failed, client rejected") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr, + }).Info("Client connected") + } + return ok, msg + } + + server, err := NewServer(server_addr, protocol, serverTlsConfig, quicConfig, transport.DefaultServerTransport, 0, 0, congestionFactory, false, nil, obfuscator, connectFunc, disconnectFunc, nil, tcpErrorFunc, udpRequestFunc, udpErrorFunc, nil) + defer server.Close() + + listener := TransportListener{ + ql: server.listener, + s: server, + } + + l, err := listener.Listen() + + if err != nil { + fmt.Println("Failed to initialize server") + } + + fmt.Println("Server up and running") + + // We don't do this in a loop because we expect the testing client will only send one message in one connection + serverConn, err := l.Accept() + serverBuffer := make([]byte, len(test_data)) + + _, err = serverConn.Read(serverBuffer) + s := string(serverBuffer) + if s == test_data { + return nil + } + return err +} + +// Simulate a client +func runClient(obfuscator *obfs.XPlusObfuscator) error { + // Load TLS client config + var clientTlsConfig = &tls.Config{ + InsecureSkipVerify: false, + MinVersion: tls.VersionTLS13, + NextProtos: []string{DefaultALPN}, + } + bs, err := ioutil.ReadFile(customCA) + if err != nil { + logrus.WithFields(logrus.Fields{ + "error": err, + "file": customCA, + }).Fatal("Failed to load CA") + } + cp := x509.NewCertPool() + if !cp.AppendCertsFromPEM(bs) { + logrus.WithFields(logrus.Fields{ + "file": customCA, + }).Fatal("Failed to parse CA") + } + clientTlsConfig.RootCAs = cp + + // QUIC config + quicConfig := &quic.Config{ + InitialStreamReceiveWindow: DefaultStreamReceiveWindow, + MaxStreamReceiveWindow: DefaultStreamReceiveWindow, + InitialConnectionReceiveWindow: DefaultConnectionReceiveWindow, + MaxConnectionReceiveWindow: DefaultConnectionReceiveWindow, + KeepAlive: true, + DisablePathMTUDiscovery: true, // @TODO: not sure what does this mean yet + EnableDatagrams: true, + } + + client, err := NewClient(server_addr, protocol, []byte(auth_str), clientTlsConfig, quicConfig, + transport.DefaultClientTransport, client_up_mbps, client_down_mbps, + congestionFactory, obfuscator) + + if err != nil { + fmt.Println("Failed to initialize client") + return err + } + + clientConn, err := client.Dial() + + if err != nil { + fmt.Println("Failed to connect to the server") + return err + } + + clientConn.Write([]byte(test_data)) + //write data from clientConn for server to read + _, clientWriteErr := clientConn.Write([]byte(test_data)) + return clientWriteErr +} + +// Below are default functions copied from cmd/server.go or cmd/client.go + +// Use Hysteria custom congestion +func congestionFactory(refBPS uint64) congestion.CongestionControl { + return hyCongestion.NewBrutalSender(congestion.ByteCount(refBPS)) +} + +func passwordAuthFunc(pwd string) (ConnectFunc, error) { + var pwds []string + pwds = []string{pwd} + return func(addr net.Addr, auth []byte, sSend uint64, sRecv uint64) (bool, string) { + for _, pwd := range pwds { + if string(auth) == pwd { + return true, "Welcome" + } + } + return false, "Wrong password" + }, nil +} + +func disconnectFunc(addr net.Addr, auth []byte, err error) { + logrus.WithFields(logrus.Fields{ + "src": addr, + "error": err, + }).Info("Client disconnected") +} + +func tcpErrorFunc(addr net.Addr, auth []byte, reqAddr string, err error) { + if err != io.EOF { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr, + "error": err, + }).Info("TCP error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "dst": reqAddr, + }).Debug("TCP EOF") + } +} + +func udpRequestFunc(addr net.Addr, auth []byte, sessionID uint32) { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "session": sessionID, + }).Debug("UDP request") +} + +func udpErrorFunc(addr net.Addr, auth []byte, sessionID uint32, err error) { + if err != io.EOF { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "session": sessionID, + "error": err, + }).Info("UDP error") + } else { + logrus.WithFields(logrus.Fields{ + "src": addr.String(), + "session": sessionID, + }).Debug("UDP EOF") + } +} From 7b559a8caf00aa7c3be40d69220647ad9e980ccc Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Sun, 29 May 2022 01:41:13 +0800 Subject: [PATCH 03/11] chore: made the handshake working --- pkg/core/transport_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index e2c953dd36..fe0451d4f7 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -20,7 +20,7 @@ import ( // Configs for testing const ( - server_addr = ":2345" + server_addr = "localhost:2345" protocol = "" certFile = "../../hysteria.server.crt" keyFile = "../../hysteria.server.key" @@ -28,6 +28,7 @@ const ( auth_str = "da5438aaa690a5748eb59de8f7bedcb0" client_up_mbps = 20 client_down_mbps = 1000 + server_name = "www.0e6e852f62bbeb99.com" test_data = "Here we go!" customCA = "../../hysteria.ca.crt" ) @@ -69,6 +70,7 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { var serverTlsConfig = &tls.Config{ Certificates: []tls.Certificate{cer}, MinVersion: tls.VersionTLS13, + NextProtos: []string{DefaultALPN}, } // QUIC config @@ -136,6 +138,7 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { InsecureSkipVerify: false, MinVersion: tls.VersionTLS13, NextProtos: []string{DefaultALPN}, + ServerName: server_name, } bs, err := ioutil.ReadFile(customCA) if err != nil { @@ -172,6 +175,8 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { return err } + fmt.Println("Client up and running") + clientConn, err := client.Dial() if err != nil { @@ -179,7 +184,6 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { return err } - clientConn.Write([]byte(test_data)) //write data from clientConn for server to read _, clientWriteErr := clientConn.Write([]byte(test_data)) return clientWriteErr From cfda0deb827287dfb6700e6f2de2a0008b857727 Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Sun, 29 May 2022 12:10:20 +0800 Subject: [PATCH 04/11] chore: refactoring server constructor --- pkg/core/server.go | 60 ++++++++++++++++++---------- pkg/core/transport_test.go | 82 +++++++++++++------------------------- 2 files changed, 68 insertions(+), 74 deletions(-) diff --git a/pkg/core/server.go b/pkg/core/server.go index 05b65856bf..ae50ea9a11 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -43,9 +43,19 @@ type Server struct { listener quic.Listener } -type TransportListener struct { - ql quic.Listener - s *Server +type HysteriaTransport struct { + addr string + protocol string + tlsConfig *tls.Config + quicConfig *quic.Config + transport *transport.ServerTransport + sendBPS uint64 + recvBPS uint64 + congestionFactory CongestionFactory + disableUDP bool + obfuscator obfs.Obfuscator + connectFunc ConnectFunc + disconnectFunc DisconnectFunc } func NewServer(addr string, protocol string, tlsConfig *tls.Config, quicConfig *quic.Config, transport *transport.ServerTransport, @@ -98,6 +108,8 @@ func (s *Server) Serve() error { } } +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. func (s *Server) Close() error { return s.listener.Close() } @@ -181,24 +193,33 @@ func (s *Server) handleControlStream(cs quic.Connection, stream quic.Stream) ([] } // Implement Pluggable Transport Server interface -func (tl *TransportListener) Listen() (net.Listener, error) { - return tl, nil -} +func (t *HysteriaTransport) Listen() (net.Listener, error) { + listener, err := t.transport.QUICListen(t.protocol, t.addr, t.tlsConfig, t.quicConfig, t.obfuscator) + if err != nil { + return nil, err + } + s := &Server{ + listener: listener, + transport: t.transport, + sendBPS: t.sendBPS, + recvBPS: t.recvBPS, + congestionFactory: t.congestionFactory, + disableUDP: t.disableUDP, + connectFunc: t.connectFunc, + disconnectFunc: t.disconnectFunc, + } -// Addr returns the listener's network address. -func (tl *TransportListener) Addr() net.Addr { - return tl.ql.Addr() + return s, nil } -// Close closes the listener. -// Any blocked Accept operations will be unblocked and return errors. -func (tl *TransportListener) Close() error { - return tl.ql.Close() +// Addr returns the listener's network address. +func (s *Server) Addr() net.Addr { + return s.listener.Addr() } -func (tl *TransportListener) Accept() (conn net.Conn, err error) { +func (s *Server) Accept() (conn net.Conn, err error) { for { - cs, err := tl.ql.Accept(context.Background()) + cs, err := s.listener.Accept(context.Background()) if err != nil { return nil, err } @@ -212,7 +233,7 @@ func (tl *TransportListener) Accept() (conn net.Conn, err error) { return } // Handle the control stream - auth, ok, v2, err := tl.s.handleControlStream(cs, stream) + auth, ok, v2, err := s.handleControlStream(cs, stream) if err != nil { _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") return @@ -222,9 +243,9 @@ func (tl *TransportListener) Accept() (conn net.Conn, err error) { return } // Start accepting streams and messages - sc := newServerClient(v2, cs, tl.s.transport, auth, tl.s.disableUDP, tl.s.aclEngine, - tl.s.tcpRequestFunc, tl.s.tcpErrorFunc, tl.s.udpRequestFunc, tl.s.udpErrorFunc, - tl.s.upCounterVec, tl.s.downCounterVec, tl.s.connGaugeVec) + sc := newServerClient(v2, cs, s.transport, auth, s.disableUDP, s.aclEngine, + s.tcpRequestFunc, s.tcpErrorFunc, s.udpRequestFunc, s.udpErrorFunc, + s.upCounterVec, s.downCounterVec, s.connGaugeVec) if !sc.DisableUDP { go func() { @@ -253,5 +274,4 @@ func (tl *TransportListener) Accept() (conn net.Conn, err error) { } }() } - } diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index fe0451d4f7..6f314eb964 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -4,7 +4,6 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "io" "io/ioutil" "net" "testing" @@ -33,7 +32,7 @@ const ( customCA = "../../hysteria.ca.crt" ) -// Default config from cmd/config.go +// Default config copied from cmd/config.go const ( mbpsToBps = 125000 minSpeedBPS = 16384 @@ -103,15 +102,22 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { return ok, msg } - server, err := NewServer(server_addr, protocol, serverTlsConfig, quicConfig, transport.DefaultServerTransport, 0, 0, congestionFactory, false, nil, obfuscator, connectFunc, disconnectFunc, nil, tcpErrorFunc, udpRequestFunc, udpErrorFunc, nil) - defer server.Close() - - listener := TransportListener{ - ql: server.listener, - s: server, + server := &HysteriaTransport{ + addr: server_addr, + protocol: protocol, + tlsConfig: serverTlsConfig, + quicConfig: quicConfig, + transport: transport.DefaultServerTransport, + sendBPS: 0, + recvBPS: 0, + congestionFactory: congestionFactory, + disableUDP: false, + obfuscator: obfuscator, + connectFunc: connectFunc, + disconnectFunc: disconnectFunc, } - l, err := listener.Listen() + l, err := server.Listen() if err != nil { fmt.Println("Failed to initialize server") @@ -119,16 +125,21 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { fmt.Println("Server up and running") - // We don't do this in a loop because we expect the testing client will only send one message in one connection - serverConn, err := l.Accept() - serverBuffer := make([]byte, len(test_data)) + for { + serverConn, err := l.Accept() + serverBuffer := make([]byte, len(test_data)) + _, err = serverConn.Read(serverBuffer) + + if err != nil { + return err + } - _, err = serverConn.Read(serverBuffer) - s := string(serverBuffer) - if s == test_data { - return nil + s := string(serverBuffer) + if s == test_data { + serverConn.Close() + return nil + } } - return err } // Simulate a client @@ -215,40 +226,3 @@ func disconnectFunc(addr net.Addr, auth []byte, err error) { "error": err, }).Info("Client disconnected") } - -func tcpErrorFunc(addr net.Addr, auth []byte, reqAddr string, err error) { - if err != io.EOF { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "dst": reqAddr, - "error": err, - }).Info("TCP error") - } else { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "dst": reqAddr, - }).Debug("TCP EOF") - } -} - -func udpRequestFunc(addr net.Addr, auth []byte, sessionID uint32) { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "session": sessionID, - }).Debug("UDP request") -} - -func udpErrorFunc(addr net.Addr, auth []byte, sessionID uint32, err error) { - if err != io.EOF { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "session": sessionID, - "error": err, - }).Info("UDP error") - } else { - logrus.WithFields(logrus.Fields{ - "src": addr.String(), - "session": sessionID, - }).Debug("UDP EOF") - } -} From 49d08a09e932aa6461a56713f40af8cfbc0a0569 Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Sun, 29 May 2022 23:40:53 +0800 Subject: [PATCH 05/11] chore: tweaking --- pkg/core/transport_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index 6f314eb964..1310d38d5a 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -80,7 +80,7 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { MaxConnectionReceiveWindow: DefaultConnectionReceiveWindow, MaxIncomingStreams: DefaultMaxIncomingStreams, // Client doesn't need this KeepAlive: true, - DisablePathMTUDiscovery: true, // @TODO: not sure what does this mean yet + DisablePathMTUDiscovery: false, EnableDatagrams: true, } @@ -196,7 +196,9 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { } //write data from clientConn for server to read - _, clientWriteErr := clientConn.Write([]byte(test_data)) + n, clientWriteErr := clientConn.Write([]byte(test_data)) + fmt.Println("Wrote: ", n) + time.Sleep(time.Second * 20) return clientWriteErr } From 0795902ef790f64595bc5895b4c8421beaa22a0c Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Mon, 30 May 2022 00:53:32 +0800 Subject: [PATCH 06/11] chore: more tweaking --- pkg/core/server.go | 85 ++++++++++++++------------------------ pkg/core/transport_test.go | 62 +++++++++++++++++---------- 2 files changed, 70 insertions(+), 77 deletions(-) diff --git a/pkg/core/server.go b/pkg/core/server.go index ae50ea9a11..aa22589ca6 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -217,61 +217,36 @@ func (s *Server) Addr() net.Addr { return s.listener.Addr() } -func (s *Server) Accept() (conn net.Conn, err error) { - for { - cs, err := s.listener.Accept(context.Background()) - if err != nil { - return nil, err - } - go func() { - // Expect the client to create a control stream to send its own information - ctx, ctxCancel := context.WithTimeout(context.Background(), protocolTimeout) - stream, err := cs.AcceptStream(ctx) - ctxCancel() - if err != nil { - _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") - return - } - // Handle the control stream - auth, ok, v2, err := s.handleControlStream(cs, stream) - if err != nil { - _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") - return - } - if !ok { - _ = cs.CloseWithError(closeErrorCodeAuth, "auth error") - return - } - // Start accepting streams and messages - sc := newServerClient(v2, cs, s.transport, auth, s.disableUDP, s.aclEngine, - s.tcpRequestFunc, s.tcpErrorFunc, s.udpRequestFunc, s.udpErrorFunc, - s.upCounterVec, s.downCounterVec, s.connGaugeVec) - - if !sc.DisableUDP { - go func() { - for { - msg, err := sc.CS.ReceiveMessage() - if err != nil { - break - } - sc.handleMessage(msg) - } - }() - } - for { - stream, err := sc.CS.AcceptStream(context.Background()) - if err != nil { - return - } - - conn = &quicConn{ - Orig: stream, - PseudoLocalAddr: cs.LocalAddr(), - PseudoRemoteAddr: cs.RemoteAddr(), - } +func (s *Server) Accept() (net.Conn, error) { + cs, err := s.listener.Accept(context.Background()) + if err != nil { + return nil, err + } + // Expect the client to create a control stream to send its own information + ctx, ctxCancel := context.WithTimeout(context.Background(), protocolTimeout) + stream, err := cs.AcceptStream(ctx) + ctxCancel() + if err != nil { + _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") + return nil, err + } + // Handle the control stream + _, ok, _, err := s.handleControlStream(cs, stream) + if err != nil { + _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") + return nil, err + } + if !ok { + _ = cs.CloseWithError(closeErrorCodeAuth, "auth error") + return nil, err + } - return - } - }() + // Start accepting streams + conn := &quicConn{ + Orig: stream, + PseudoLocalAddr: cs.LocalAddr(), + PseudoRemoteAddr: cs.RemoteAddr(), } + + return conn, nil } diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index 1310d38d5a..9b92bf2fa9 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -3,6 +3,7 @@ package core import ( "crypto/tls" "crypto/x509" + "errors" "fmt" "io/ioutil" "net" @@ -23,12 +24,13 @@ const ( protocol = "" certFile = "../../hysteria.server.crt" keyFile = "../../hysteria.server.key" - obfs_str = "f561508f56ed" - auth_str = "da5438aaa690a5748eb59de8f7bedcb0" + obfs_str = "c561508f56ed" + auth_str = "ga5438aaa690a5748eb59de8f7bedcb0" client_up_mbps = 20 client_down_mbps = 1000 server_name = "www.0e6e852f62bbeb99.com" - test_data = "Here we go!" + test_request = "Here we go!" + test_response = "You got it." customCA = "../../hysteria.ca.crt" ) @@ -125,21 +127,30 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { fmt.Println("Server up and running") - for { - serverConn, err := l.Accept() - serverBuffer := make([]byte, len(test_data)) - _, err = serverConn.Read(serverBuffer) + serverConn, err := l.Accept() + defer serverConn.Close() - if err != nil { - return err - } + if err != nil { + return err + } - s := string(serverBuffer) - if s == test_data { - serverConn.Close() - return nil - } + serverBuffer := make([]byte, len(test_request)) + serverConn.SetReadDeadline(time.Now().Add(time.Second * 20)) + _, err = serverConn.Read(serverBuffer) + + if err != nil { + return err } + + s := string(serverBuffer) + if s == test_request { + serverConn.SetWriteDeadline(time.Now().Add(time.Second * 2)) + serverConn.Write([]byte(test_response)) + serverConn.Close() + return nil + } + + return errors.New("Something is wrong") } // Simulate a client @@ -173,7 +184,7 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { InitialConnectionReceiveWindow: DefaultConnectionReceiveWindow, MaxConnectionReceiveWindow: DefaultConnectionReceiveWindow, KeepAlive: true, - DisablePathMTUDiscovery: true, // @TODO: not sure what does this mean yet + DisablePathMTUDiscovery: false, EnableDatagrams: true, } @@ -187,19 +198,26 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { } fmt.Println("Client up and running") - clientConn, err := client.Dial() + defer clientConn.Close() if err != nil { fmt.Println("Failed to connect to the server") return err } - //write data from clientConn for server to read - n, clientWriteErr := clientConn.Write([]byte(test_data)) - fmt.Println("Wrote: ", n) - time.Sleep(time.Second * 20) - return clientWriteErr + // write data from clientConn for server to read + clientConn.SetWriteDeadline(time.Now().Add(time.Second * 2)) + _, err = clientConn.Write([]byte(test_request)) + clientBuffer := make([]byte, len(test_response)) + clientConn.SetReadDeadline(time.Now().Add(time.Second * 10)) + _, err = clientConn.Read(clientBuffer) + s := string(clientBuffer) + if s == test_response { + return nil + } + + return err } // Below are default functions copied from cmd/server.go or cmd/client.go From fbfe9abf649459cb04e0d16bdbcd96acd1830cc2 Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Tue, 31 May 2022 21:09:15 +0800 Subject: [PATCH 07/11] fix: the server should listen to the data stream instead of control stream --- pkg/core/server.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/core/server.go b/pkg/core/server.go index aa22589ca6..9134ead1e9 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -240,13 +240,22 @@ func (s *Server) Accept() (net.Conn, error) { _ = cs.CloseWithError(closeErrorCodeAuth, "auth error") return nil, err } + // Close the control stream + stream.Close() - // Start accepting streams - conn := &quicConn{ - Orig: stream, - PseudoLocalAddr: cs.LocalAddr(), - PseudoRemoteAddr: cs.RemoteAddr(), - } + for { + // Start accepting data streams + stream, err = cs.AcceptStream(context.Background()) + if err != nil { + return nil, err + } - return conn, nil + conn := &quicConn{ + Orig: stream, + PseudoLocalAddr: cs.LocalAddr(), + PseudoRemoteAddr: cs.RemoteAddr(), + } + + return conn, nil + } } From f39127392949a9d8e066682d022062875d7b7007 Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Wed, 1 Jun 2022 02:09:47 +0800 Subject: [PATCH 08/11] chore: add more flags for debugging --- pkg/core/server.go | 24 +++++++++++------------- pkg/core/transport_test.go | 25 ++++++++++++++++--------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/core/server.go b/pkg/core/server.go index 9134ead1e9..1230cdc030 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -243,19 +243,17 @@ func (s *Server) Accept() (net.Conn, error) { // Close the control stream stream.Close() - for { - // Start accepting data streams - stream, err = cs.AcceptStream(context.Background()) - if err != nil { - return nil, err - } - - conn := &quicConn{ - Orig: stream, - PseudoLocalAddr: cs.LocalAddr(), - PseudoRemoteAddr: cs.RemoteAddr(), - } + // Accept the next stream + stream, err = cs.AcceptStream(context.Background()) + if err != nil { + return nil, err + } - return conn, nil + conn := &quicConn{ + Orig: stream, + PseudoLocalAddr: cs.LocalAddr(), + PseudoRemoteAddr: cs.RemoteAddr(), } + + return conn, nil } diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index 9b92bf2fa9..dd7637d933 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -52,7 +52,6 @@ func TestE2E(t *testing.T) { go runServer(obfuscator) - time.Sleep(time.Second * 5) err := runClient(obfuscator) if err != nil { t.Fail() @@ -134,8 +133,9 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { return err } + time.Sleep(time.Second * 2) serverBuffer := make([]byte, len(test_request)) - serverConn.SetReadDeadline(time.Now().Add(time.Second * 20)) + fmt.Println("Server starts reading from connection") _, err = serverConn.Read(serverBuffer) if err != nil { @@ -144,10 +144,10 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { s := string(serverBuffer) if s == test_request { - serverConn.SetWriteDeadline(time.Now().Add(time.Second * 2)) - serverConn.Write([]byte(test_response)) - serverConn.Close() - return nil + fmt.Println("Server received the expected data from the client") + _, err = serverConn.Write([]byte(test_response)) + fmt.Println("Server sent the response to the client") + return err } return errors.New("Something is wrong") @@ -197,8 +197,8 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { return err } - fmt.Println("Client up and running") clientConn, err := client.Dial() + fmt.Println("Client up and running") defer clientConn.Close() if err != nil { @@ -207,13 +207,20 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { } // write data from clientConn for server to read - clientConn.SetWriteDeadline(time.Now().Add(time.Second * 2)) + time.Sleep(time.Second * 2) _, err = clientConn.Write([]byte(test_request)) + if err != nil { + return err + } + fmt.Println("Client sent the data to the server") + + time.Sleep(time.Second * 5) clientBuffer := make([]byte, len(test_response)) - clientConn.SetReadDeadline(time.Now().Add(time.Second * 10)) + fmt.Println("Client starts reading from connection") _, err = clientConn.Read(clientBuffer) s := string(clientBuffer) if s == test_response { + fmt.Println("Client received the expected response from the server") return nil } From 6edcfff0f1c4a50aef54e349fa108f411c7a5a81 Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Sun, 19 Jun 2022 23:37:20 +0800 Subject: [PATCH 09/11] feat: internalize transport connections handling --- pkg/core/server.go | 139 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 120 insertions(+), 19 deletions(-) diff --git a/pkg/core/server.go b/pkg/core/server.go index 1230cdc030..6f07916632 100644 --- a/pkg/core/server.go +++ b/pkg/core/server.go @@ -58,6 +58,25 @@ type HysteriaTransport struct { disconnectFunc DisconnectFunc } +type TransportServer struct { + transport *transport.ServerTransport + sendBPS, recvBPS uint64 + congestionFactory CongestionFactory + disableUDP bool + aclEngine *acl.Engine + + connectFunc ConnectFunc + disconnectFunc DisconnectFunc + tcpRequestFunc TCPRequestFunc + tcpErrorFunc TCPErrorFunc + udpRequestFunc UDPRequestFunc + udpErrorFunc UDPErrorFunc + + listener quic.Listener + allStreams chan *quicConn + isListening bool +} + func NewServer(addr string, protocol string, tlsConfig *tls.Config, quicConfig *quic.Config, transport *transport.ServerTransport, sendBPS uint64, recvBPS uint64, congestionFactory CongestionFactory, disableUDP bool, aclEngine *acl.Engine, obfuscator obfs.Obfuscator, connectFunc ConnectFunc, disconnectFunc DisconnectFunc, @@ -198,7 +217,7 @@ func (t *HysteriaTransport) Listen() (net.Listener, error) { if err != nil { return nil, err } - s := &Server{ + s := &TransportServer{ listener: listener, transport: t.transport, sendBPS: t.sendBPS, @@ -207,53 +226,135 @@ func (t *HysteriaTransport) Listen() (net.Listener, error) { disableUDP: t.disableUDP, connectFunc: t.connectFunc, disconnectFunc: t.disconnectFunc, + allStreams: make(chan *quicConn), + isListening: false, } return s, nil } // Addr returns the listener's network address. -func (s *Server) Addr() net.Addr { +func (s *TransportServer) Addr() net.Addr { return s.listener.Addr() } -func (s *Server) Accept() (net.Conn, error) { - cs, err := s.listener.Accept(context.Background()) - if err != nil { - return nil, err +func (s *TransportServer) Close() error { + s.isListening = false + return s.listener.Close() +} + +func (s *TransportServer) Accept() (net.Conn, error) { + if !s.isListening { + s.isListening = true + go acceptConn(s) + } + // Return the next stream + select { + case stream := <-s.allStreams: + return stream, nil + } +} + +// An internal goroutine for accepting connections. Then for each accepted +// connection, start a goroutine for handling the control stream & accepting +// streams. Put those streams into a channel +func acceptConn(s *TransportServer) { + for { + cs, err := s.listener.Accept(context.Background()) + if err != nil { + _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") + return + } + go acceptStream(cs, s) } +} + +func acceptStream(cs quic.Connection, s *TransportServer) { // Expect the client to create a control stream to send its own information ctx, ctxCancel := context.WithTimeout(context.Background(), protocolTimeout) stream, err := cs.AcceptStream(ctx) ctxCancel() if err != nil { _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") - return nil, err + return } // Handle the control stream _, ok, _, err := s.handleControlStream(cs, stream) if err != nil { _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") - return nil, err + return } if !ok { _ = cs.CloseWithError(closeErrorCodeAuth, "auth error") - return nil, err + return } // Close the control stream stream.Close() - // Accept the next stream - stream, err = cs.AcceptStream(context.Background()) - if err != nil { - return nil, err - } + for { + // Accept the next stream + stream, err = cs.AcceptStream(context.Background()) + if err != nil { + _ = cs.CloseWithError(closeErrorCodeProtocol, "protocol error") + return + } - conn := &quicConn{ - Orig: stream, - PseudoLocalAddr: cs.LocalAddr(), - PseudoRemoteAddr: cs.RemoteAddr(), + conn := &quicConn{ + Orig: stream, + PseudoLocalAddr: cs.LocalAddr(), + PseudoRemoteAddr: cs.RemoteAddr(), + } + s.allStreams <- conn } +} - return conn, nil +// Auth & negotiate speed +// Copy from (s *Server) handleControlStream, TODO: refactor +func (s *TransportServer) handleControlStream(cs quic.Connection, stream quic.Stream) ([]byte, bool, bool, error) { + // Check version + vb := make([]byte, 1) + _, err := stream.Read(vb) + if err != nil { + return nil, false, false, err + } + if vb[0] != protocolVersion && vb[0] != protocolVersionV2 { + return nil, false, false, fmt.Errorf("unsupported protocol version %d, expecting %d/%d", + vb[0], protocolVersionV2, protocolVersion) + } + // Parse client hello + var ch clientHello + err = struc.Unpack(stream, &ch) + if err != nil { + return nil, false, false, err + } + // Speed + if ch.Rate.SendBPS == 0 || ch.Rate.RecvBPS == 0 { + return nil, false, false, errors.New("invalid rate from client") + } + serverSendBPS, serverRecvBPS := ch.Rate.RecvBPS, ch.Rate.SendBPS + if s.sendBPS > 0 && serverSendBPS > s.sendBPS { + serverSendBPS = s.sendBPS + } + if s.recvBPS > 0 && serverRecvBPS > s.recvBPS { + serverRecvBPS = s.recvBPS + } + // Auth + ok, msg := s.connectFunc(cs.RemoteAddr(), ch.Auth, serverSendBPS, serverRecvBPS) + // Response + err = struc.Pack(stream, &serverHello{ + OK: ok, + Rate: transmissionRate{ + SendBPS: serverSendBPS, + RecvBPS: serverRecvBPS, + }, + Message: msg, + }) + if err != nil { + return nil, false, false, err + } + // Set the congestion accordingly + if ok && s.congestionFactory != nil { + cs.SetCongestionControl(s.congestionFactory(serverSendBPS)) + } + return ch.Auth, ok, vb[0] == protocolVersionV2, nil } From 13e0c0799e3fd983d6550c373bdec7d4633bcacf Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Tue, 21 Jun 2022 23:14:48 +0800 Subject: [PATCH 10/11] chore: refactor test using signaling --- pkg/core/transport_test.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/core/transport_test.go b/pkg/core/transport_test.go index dd7637d933..094bf5d1eb 100644 --- a/pkg/core/transport_test.go +++ b/pkg/core/transport_test.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "net" "testing" - "time" "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/congestion" @@ -49,17 +48,18 @@ const ( func TestE2E(t *testing.T) { // Server and Client share the same obfuscator obfuscator := obfs.NewXPlusObfuscator([]byte(obfs_str)) + signal := make(chan struct{}) - go runServer(obfuscator) + go runServer(obfuscator, signal) - err := runClient(obfuscator) + err := runClient(obfuscator, signal) if err != nil { t.Fail() } } // Simulate a server -func runServer(obfuscator *obfs.XPlusObfuscator) error { +func runServer(obfuscator *obfs.XPlusObfuscator, signal chan struct{}) error { // Load TLS server config cer, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { @@ -124,7 +124,9 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { fmt.Println("Failed to initialize server") } + serverBuffer := make([]byte, len(test_request)) fmt.Println("Server up and running") + signal <- struct{}{} serverConn, err := l.Accept() defer serverConn.Close() @@ -133,8 +135,6 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { return err } - time.Sleep(time.Second * 2) - serverBuffer := make([]byte, len(test_request)) fmt.Println("Server starts reading from connection") _, err = serverConn.Read(serverBuffer) @@ -145,6 +145,7 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { s := string(serverBuffer) if s == test_request { fmt.Println("Server received the expected data from the client") + signal <- struct{}{} _, err = serverConn.Write([]byte(test_response)) fmt.Println("Server sent the response to the client") return err @@ -154,7 +155,7 @@ func runServer(obfuscator *obfs.XPlusObfuscator) error { } // Simulate a client -func runClient(obfuscator *obfs.XPlusObfuscator) error { +func runClient(obfuscator *obfs.XPlusObfuscator, signal chan struct{}) error { // Load TLS client config var clientTlsConfig = &tls.Config{ InsecureSkipVerify: false, @@ -188,6 +189,7 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { EnableDatagrams: true, } + <-signal client, err := NewClient(server_addr, protocol, []byte(auth_str), clientTlsConfig, quicConfig, transport.DefaultClientTransport, client_up_mbps, client_down_mbps, congestionFactory, obfuscator) @@ -207,14 +209,13 @@ func runClient(obfuscator *obfs.XPlusObfuscator) error { } // write data from clientConn for server to read - time.Sleep(time.Second * 2) _, err = clientConn.Write([]byte(test_request)) if err != nil { return err } fmt.Println("Client sent the data to the server") - time.Sleep(time.Second * 5) + <-signal clientBuffer := make([]byte, len(test_response)) fmt.Println("Client starts reading from connection") _, err = clientConn.Read(clientBuffer) From 4b51e9183050674aaffab8f9a80f9f5ad954267c Mon Sep 17 00:00:00 2001 From: CertainTLS Date: Sun, 12 Mar 2023 15:40:11 +0800 Subject: [PATCH 11/11] chore: point the server/client implementation examples to the test file --- pkg/core/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/core/README.md b/pkg/core/README.md index 4d9c100110..c406fb3a02 100644 --- a/pkg/core/README.md +++ b/pkg/core/README.md @@ -10,6 +10,10 @@ ## Usage +* Follow [Custom CA](https://hysteria.network/docs/custom-ca/) doc to generate certificates +* See [server side implementation example](https://github.com/apernet/hysteria/pull/340/files#diff-8a9b6ccee2487fc2b424d9f4b3cad2ebde2cc27b1cf1aa078e0de084872edbaaR62-R155) in the `transport_test.go` file +* See [client side implementation example](https://github.com/apernet/hysteria/pull/340/files#diff-8a9b6ccee2487fc2b424d9f4b3cad2ebde2cc27b1cf1aa078e0de084872edbaaR157-R229) in the `transport_test.go` file + ## Implementation The implementation uses [Pluggable Transport Specification v3.0 - Go Transport API](https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/main/releases/PTSpecV3.0/Pluggable%20Transport%20Specification%20v3.0%20-%20Go%20Transport%20API%20v3.0.md) \ No newline at end of file