From 6463002a5d8436c063b5fad9f246477da4ae3363 Mon Sep 17 00:00:00 2001 From: halulu Date: Sat, 28 Jul 2018 17:44:39 +0800 Subject: [PATCH 1/5] client redesign --- templates/client/client.html | 120 +++++++++++++++-------------------- 1 file changed, 51 insertions(+), 69 deletions(-) diff --git a/templates/client/client.html b/templates/client/client.html index 17da1aa..ce8b89d 100644 --- a/templates/client/client.html +++ b/templates/client/client.html @@ -54,29 +54,64 @@ {% endblock %} {% block body %} -
-
-

- WebSocks Client -

-

- A secure proxy based on WebSocket. -

- -
-
-
+
-
+
+
+
+
+

+ WebSocks Client +

+

+ A secure proxy based on WebSocket. +

+
+
+
+
- +
+
+
+ +
+
+
+ Running…… +
+

DownloadSpeed: {{ DownloadSpeed }}

+
+

UploadSpeed: {{ UploadSpeed }}

+
+

Downloaded: {{ Downloaded }}

+
+

Uploaded: {{ Uploaded }}

+
+
+
+
+
+
+
+ +
+
+
+
+ +
+ +
+
@@ -100,66 +135,13 @@

-
-
-
-

- -

-
-
-

- -

-
-
-
-
-
-
-
-
-

Client Stats

-
- - - - - - - - - - - - - - - - - -
- Downloaded - - Uploaded - - DownloadSpeed - - UploadSpeed -
- 0 - - 0 - - 0 - - 0 -
+
+
From c8b5113ee93b505b745bc8d85659abfbb13cc6e2 Mon Sep 17 00:00:00 2001 From: halulu Date: Mon, 3 Dec 2018 11:40:38 +0800 Subject: [PATCH 2/5] merge master v0.15.0 --- .goreleaser.yml | 2 +- README-en.md | 4 +- README.md | 4 +- client/app.go | 113 -------------------------- client/client.go | 74 ++++++++++------- client/config.go | 3 +- client/conn.go | 4 +- client/mux.go | 48 +---------- client/web.go | 44 ---------- core/mux.go | 132 ------------------------------ core/mux/client.go | 29 +++++++ core/mux/conn.go | 92 +++++++++++++++++++++ core/mux/group.go | 88 ++++++++++++++++++++ core/mux/message.go | 53 ++++++++++++ core/mux/server.go | 42 ++++++++++ core/mux/websocket.go | 55 +++++++++++++ core/muxwebsocket.go | 61 -------------- core/stats.go | 1 + core/websocket.go | 5 +- server/app.go | 94 --------------------- server/config.go | 3 - server/login.go | 36 --------- server/mux.go | 89 -------------------- server/server.go | 44 +++++----- server/web.go | 86 -------------------- templates/base/base.html | 22 ----- templates/client/client.html | 153 ----------------------------------- templates/home.html | 52 ------------ templates/server/login.html | 43 ---------- templates/server/server.html | 126 ----------------------------- websocks.go | 76 +++++------------ 31 files changed, 464 insertions(+), 1214 deletions(-) delete mode 100644 client/app.go delete mode 100644 client/web.go delete mode 100644 core/mux.go create mode 100644 core/mux/client.go create mode 100644 core/mux/conn.go create mode 100644 core/mux/group.go create mode 100644 core/mux/message.go create mode 100644 core/mux/server.go create mode 100644 core/mux/websocket.go delete mode 100644 core/muxwebsocket.go delete mode 100644 server/app.go delete mode 100644 server/login.go delete mode 100644 server/mux.go delete mode 100644 server/web.go delete mode 100644 templates/base/base.html delete mode 100644 templates/client/client.html delete mode 100644 templates/home.html delete mode 100644 templates/server/login.html delete mode 100644 templates/server/server.html diff --git a/.goreleaser.yml b/.goreleaser.yml index 1f9743c..cc1f459 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -29,4 +29,4 @@ archive: - LICENSE - README.md - README-en.md - - templates/**/* +# - templates/**/* diff --git a/README-en.md b/README-en.md index efffe8c..ab2944c 100644 --- a/README-en.md +++ b/README-en.md @@ -2,6 +2,8 @@ A secure proxy based on websocket. +**Websocks will be temporarily suspended for a few more months because I have to apply for university. I'm really sorry and I promise I will come back.** + This project is still working in progress, more features are still in development. If you are interested in this project, please star this project in order to support me. Thank you. If you have any problems or suggestions, please do not hesitate to submit issues or contact me [@halulu](https://t.me/halulu). @@ -27,7 +29,7 @@ The disadvantage is that I have just started development, there is no GUI client #### Client ``` -./websocks client -l :1080 -s wss://websocks.org:443/websocks -n mirror.centos.com --insecure +./websocks client -l :1080 -s wss://websocks.org:443/websocks -sni mirror.centos.com --insecure ``` ### Caddy TLS diff --git a/README.md b/README.md index 9129dd7..3ac5b16 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ 一个基于 WebSocket 的代理工具 +**由于本人学业的原因,websocks暂时停更几个月,各位大佬们对不住了,等我搞定大学一定会填坑的** + 本项目目前还在开发中,更多功能仍在完善中。如果你对这个项目感兴趣,请star它来支持我,蟹蟹 有任何问题或建议可以直接发issue或者联系我 [@halulu](https://t.me/halulu),也可以来[TG群](https://t.me/websocks)水一水,开发记录可以看[我的博客](https://halu.lu/post/websocks-development/) @@ -28,7 +30,7 @@ #### 客户端 ``` -./websocks client -l :1080 -s wss://websocks.org:443/websocks -n mirror.centos.com --insecure +./websocks client -l :1080 -s wss://websocks.org:443/websocks -sni mirror.centos.com --insecure ``` diff --git a/client/app.go b/client/app.go deleted file mode 100644 index 8d43921..0000000 --- a/client/app.go +++ /dev/null @@ -1,113 +0,0 @@ -package client - -import ( - "bytes" - "fmt" - "io" - "net/http" - "os" - "os/exec" - - "encoding/json" - "io/ioutil" - - "github.com/go-macaron/pongo2" - "github.com/sirupsen/logrus" - "gopkg.in/macaron.v1" -) - -type App struct { - //todo - WebListenAddr string - - m macaron.Macaron - - running bool - //todo multiple client - *WebSocksClient -} - -func LoadApp() (app *App, err error) { - app = &App{} - data, err := ioutil.ReadFile("client.json") - if err != nil { - return - } - - err = json.Unmarshal(data, app) - if err != nil { - return - } - return -} - -func NewApp() (app *App) { - app = &App{ - WebListenAddr: ":10801", - } - return -} - -func (app *App) Save() (err error) { - data, err := json.MarshalIndent(app, "", " ") - if err != nil { - return - } - - err = ioutil.WriteFile("client.json", data, 0600) - return -} - -func (app *App) Run() (err error) { - //log setup - buf := make([]byte, 0) - buffer := bytes.NewBuffer(buf) - log.Out = io.MultiWriter(os.Stdout, buffer) - log.SetLevel(logrus.DebugLevel) - - m := macaron.New() - m.Use(pongo2.Pongoer()) - m.Get("/", func(ctx *macaron.Context) { - ctx.HTML(200, "client/client") - return - }) - - //todo pac - m.Get("/pac", func(ctx *macaron.Context) { - return - }) - - //api v0 - m.Group("/api/v0/client", func() { - m.Get("/log", func(ctx *macaron.Context) { - ctx.WriteHeader(200) - ctx.Write(buffer.Bytes()) - return - }) - m.Get("/stats", func(ctx *macaron.Context) { - if app.WebSocksClient == nil { - ctx.Error(403, "websocks client is not running") - return - } - ctx.JSON(200, app.WebSocksClient.Stats) - }) - m.Post("/start", app.StartClient) - m.Post("/stop", app.StopClient) - }) - - go func() { - err := exec.Command("explorer", fmt.Sprintf("http://127.0.0.1%s", app.WebListenAddr)).Run() - if err != nil { - log.Debug(err.Error()) - return - } - }() - - log.Infof("web start to listen at %s", app.WebListenAddr) - err = http.ListenAndServe(app.WebListenAddr, m) - if err != nil { - log.Error(err.Error()) - return - } - return -} diff --git a/client/client.go b/client/client.go index 58f6828..19e6fa7 100644 --- a/client/client.go +++ b/client/client.go @@ -1,30 +1,29 @@ package client import ( + "log" "net" "time" + "github.com/lzjluzijie/websocks/core/mux" + "net/url" "github.com/gorilla/websocket" "github.com/lzjluzijie/websocks/core" - "github.com/sirupsen/logrus" ) -var log = logrus.New() - type WebSocksClient struct { ServerURL *url.URL ListenAddr *net.TCPAddr dialer *websocket.Dialer - //connMutex sync.Mutex - //wsConns []*core.WebSocket - muxWS *core.MuxWebSocket - //todo enable mux Mux bool + muxGroup *mux.Group + + //todo //control stopC chan int @@ -33,39 +32,48 @@ type WebSocksClient struct { } func (client *WebSocksClient) Run() (err error) { + if client.Mux { + client.muxGroup = mux.NewGroup(true) + log.Println("group created") + go func() { + //todo + for { + if len(client.muxGroup.MuxWSs) == 0 { + err := client.OpenMux() + if err != nil { + log.Printf(err.Error()) + continue + } + } + //这个弱智BUG折腾了我一天 + time.Sleep(time.Second) + } + }() + } + listener, err := net.ListenTCP("tcp", client.ListenAddr) if err != nil { return err } - log.Infof("Start to listen at %s", client.ListenAddr.String()) - - if client.Mux { - err := client.OpenMux() - if err != nil { - log.Debugf(err.Error()) - return err - } - - go client.ListenMuxWS(client.muxWS) - } + log.Printf("Start to listen at %s", client.ListenAddr.String()) go func() { client.stopC = make(chan int) <-client.stopC err = listener.Close() if err != nil { - log.Errorf(err.Error()) + log.Printf(err.Error()) return } - log.Infof("stopped") + log.Print("stopped") }() for { conn, err := listener.AcceptTCP() if err != nil { - log.Debugf(err.Error()) + log.Printf(err.Error()) break } @@ -80,21 +88,32 @@ func (client *WebSocksClient) Stop() { } func (client *WebSocksClient) HandleConn(conn *net.TCPConn) { + log.Println("new socks5 conn") + lc, err := NewLocalConn(conn) if err != nil { - log.Debug(err.Error()) + log.Printf(err.Error()) return } - //todo mux + host := lc.Host + if client.Mux { - client.DialMuxConn(lc.Host, conn) + muxConn, err := client.muxGroup.NewMuxConn(host) + if err != nil { + log.Printf(err.Error()) + return + } + + log.Printf("created #%v", muxConn) + + muxConn.Run(conn) return } - ws, err := client.DialWebSocket(core.NewHostHeader(lc.Host)) + ws, err := client.DialWebSocket(core.NewHostHeader(host)) if err != nil { - log.Errorf(err.Error()) + log.Printf(err.Error()) return } @@ -109,8 +128,5 @@ func (client *WebSocksClient) DialWebSocket(header map[string][]string) (ws *cor } ws = core.NewWebSocket(wsConn, client.Stats) - //client.connMutex.Lock() - //client.wsConns = append(client.wsConns, ws) - //client.connMutex.Unlock() return } diff --git a/client/config.go b/client/config.go index 1def8ca..3f44636 100644 --- a/client/config.go +++ b/client/config.go @@ -48,8 +48,7 @@ func (config *Config) GetClient() (client *WebSocksClient, err error) { TLSClientConfig: tlsConfig, }, - //todo mux - + Mux: config.Mux, CreatedAt: time.Now(), Stats: core.NewStats(), } diff --git a/client/conn.go b/client/conn.go index 68392d3..c319f5a 100644 --- a/client/conn.go +++ b/client/conn.go @@ -49,7 +49,7 @@ func (lc *LocalConn) Run(ws *core.WebSocket) { go func() { _, err := io.Copy(lc, ws) if err != nil { - log.Debugf(err.Error()) + //log.Printf(err.Error()) return } return @@ -58,7 +58,7 @@ func (lc *LocalConn) Run(ws *core.WebSocket) { go func() { _, err := io.Copy(ws, lc) if err != nil { - log.Debugf(err.Error()) + //log.Printf(err.Error()) return } }() diff --git a/client/mux.go b/client/mux.go index 8553523..36e5dc3 100644 --- a/client/mux.go +++ b/client/mux.go @@ -1,14 +1,13 @@ package client import ( - "net" - "github.com/lzjluzijie/websocks/core" + "github.com/lzjluzijie/websocks/core/mux" ) func (client *WebSocksClient) OpenMux() (err error) { wsConn, _, err := client.dialer.Dial(client.ServerURL.String(), map[string][]string{ - "WebSocks-Mux": {"mux"}, + "WebSocks-Mux": {"v0.15"}, }) if err != nil { @@ -17,46 +16,7 @@ func (client *WebSocksClient) OpenMux() (err error) { ws := core.NewWebSocket(wsConn, client.Stats) - muxWS := core.NewMuxWebSocket(ws) - client.muxWS = muxWS + muxWS := mux.NewMuxWebSocket(ws) + client.muxGroup.AddMuxWS(muxWS) return } - -func (client *WebSocksClient) DialMuxConn(host string, conn *net.TCPConn) { - muxConn := core.NewMuxConn(client.muxWS) - - err := muxConn.DialMessage(host) - if err != nil { - log.Errorf(err.Error()) - err = client.OpenMux() - if err != nil { - log.Errorf(err.Error()) - } - return - } - - muxConn.MuxWS.PutMuxConn(muxConn) - - log.Debugf("dialed mux for %s", host) - - muxConn.Run(conn) - return -} - -func (client *WebSocksClient) ListenMuxWS(muxWS *core.MuxWebSocket) { - for { - m, err := muxWS.ReceiveMessage() - if err != nil { - log.Debugf(err.Error()) - return - } - - //get conn and send message - conn := muxWS.GetMuxConn(m.ConnID) - err = conn.HandleMessage(m) - if err != nil { - log.Debugf(err.Error()) - continue - } - } -} diff --git a/client/web.go b/client/web.go deleted file mode 100644 index 36d4137..0000000 --- a/client/web.go +++ /dev/null @@ -1,44 +0,0 @@ -package client - -import ( - "encoding/json" - "io/ioutil" - - "gopkg.in/macaron.v1" -) - -func (app *App) StartClient(ctx *macaron.Context) { - config := &Config{} - data, err := ioutil.ReadAll(ctx.Req.Body().ReadCloser()) - if err != nil { - ctx.Error(403, err.Error()) - } - - err = json.Unmarshal(data, config) - if err != nil { - ctx.Error(403, err.Error()) - } - - websocksClient, err := config.GetClient() - if err != nil { - ctx.Error(403, err.Error()) - } - - app.WebSocksClient = websocksClient - app.running = true - - go func() { - err = websocksClient.Run() - if err != nil { - log.Error(err.Error()) - } - }() - return -} - -func (app *App) StopClient(ctx *macaron.Context) { - app.WebSocksClient.Stop() - ctx.WriteHeader(200) - ctx.Write([]byte("stopped")) - return -} diff --git a/core/mux.go b/core/mux.go deleted file mode 100644 index c3c471b..0000000 --- a/core/mux.go +++ /dev/null @@ -1,132 +0,0 @@ -package core - -import ( - "io" - "math/rand" - "net" - "sync" - "sync/atomic" -) - -const ( - MessageMethodData = iota - MessageMethodDial -) - -type Message struct { - Method byte - ConnID uint64 - MessageID uint64 - Data []byte -} - -type MuxConn struct { - ID uint64 - MuxWS *MuxWebSocket - - mutex sync.Mutex - buf []byte - wait chan int - - receiveMessageID uint64 - sendMessageID *uint64 -} - -//NewMuxConn create new mux connection for client -func NewMuxConn(muxWS *MuxWebSocket) (conn *MuxConn) { - return &MuxConn{ - ID: rand.Uint64(), - MuxWS: muxWS, - wait: make(chan int), - sendMessageID: new(uint64), - } -} - -func (conn *MuxConn) Write(p []byte) (n int, err error) { - m := &Message{ - Method: MessageMethodData, - ConnID: conn.ID, - MessageID: conn.SendMessageID(), - Data: p, - } - - err = conn.MuxWS.SendMessage(m) - if err != nil { - return 0, err - } - return len(p), nil -} - -func (conn *MuxConn) Read(p []byte) (n int, err error) { - if len(conn.buf) == 0 { - logger.Debugf("%d buf is 0, waiting", conn.ID) - <-conn.wait - } - - conn.mutex.Lock() - logger.Debugf("%d buf: %v", conn.buf) - n = copy(p, conn.buf) - conn.buf = conn.buf[n:] - conn.mutex.Unlock() - return -} - -func (conn *MuxConn) HandleMessage(m *Message) (err error) { - logger.Debugf("handle message %d %d", m.ConnID, m.MessageID) - for { - if conn.receiveMessageID == m.MessageID { - conn.mutex.Lock() - conn.buf = append(conn.buf, m.Data...) - conn.receiveMessageID++ - close(conn.wait) - conn.wait = make(chan int) - conn.mutex.Unlock() - logger.Debugf("handled message %d %d", m.ConnID, m.MessageID) - return - } - <-conn.wait - } - return -} - -func (conn *MuxConn) SendMessageID() (id uint64) { - id = atomic.LoadUint64(conn.sendMessageID) - atomic.AddUint64(conn.sendMessageID, 1) - return -} - -func (conn *MuxConn) Run(c *net.TCPConn) { - go func() { - _, err := io.Copy(c, conn) - if err != nil { - logger.Debugf(err.Error()) - } - }() - - _, err := io.Copy(conn, c) - if err != nil { - logger.Debugf(err.Error()) - } - - return -} - -//client dial remote -func (conn *MuxConn) DialMessage(host string) (err error) { - m := &Message{ - Method: MessageMethodDial, - MessageID: 18446744073709551615, - ConnID: conn.ID, - Data: []byte(host), - } - - logger.Debugf("dial for %s", host) - - err = conn.MuxWS.SendMessage(m) - if err != nil { - return - } - - logger.Debugf("%d %s", conn.ID, host) - return -} diff --git a/core/mux/client.go b/core/mux/client.go new file mode 100644 index 0000000..c67500e --- /dev/null +++ b/core/mux/client.go @@ -0,0 +1,29 @@ +package mux + +import "math/rand" + +//NewMuxConn creates a new mux connection for client +func (group *Group) NewMuxConn(host string) (conn *Conn, err error) { + conn = &Conn{ + ID: rand.Uint32(), + wait: make(chan int), + sendMessageID: new(uint32), + group: group, + } + + m := &Message{ + Method: MessageMethodDial, + MessageID: 4294967295, + ConnID: conn.ID, + Length: uint32(len(host)), + Data: []byte(host), + } + + err = group.Send(m) + if err != nil { + return + } + + group.Conns = append(group.Conns, conn) + return +} diff --git a/core/mux/conn.go b/core/mux/conn.go new file mode 100644 index 0000000..d9ea4b7 --- /dev/null +++ b/core/mux/conn.go @@ -0,0 +1,92 @@ +package mux + +import ( + "io" + "log" + "net" + "sync" + "sync/atomic" +) + +type Conn struct { + ID uint32 + + group *Group + + mutex sync.Mutex + buf []byte + wait chan int + + receiveMessageID uint32 + sendMessageID *uint32 +} + +func (conn *Conn) Write(p []byte) (n int, err error) { + m := &Message{ + Method: MessageMethodData, + ConnID: conn.ID, + MessageID: conn.SendMessageID(), + Length: uint32(len(p)), + Data: p, + } + + err = conn.group.Send(m) + if err != nil { + return 0, err + } + return len(p), nil +} + +func (conn *Conn) Read(p []byte) (n int, err error) { + if len(conn.buf) == 0 { + //log.Printf("%d buf is 0, waiting", conn.ID) + <-conn.wait + } + + conn.mutex.Lock() + //log.Printf("%d buf: %v",conn.ID, conn.buf) + n = copy(p, conn.buf) + conn.buf = conn.buf[n:] + conn.mutex.Unlock() + return +} + +func (conn *Conn) HandleMessage(m *Message) (err error) { + //log.Printf("handle message %d %d", m.ConnID, m.MessageID) + for { + if conn.receiveMessageID == m.MessageID { + conn.mutex.Lock() + conn.buf = append(conn.buf, m.Data...) + conn.receiveMessageID++ + close(conn.wait) + conn.wait = make(chan int) + conn.mutex.Unlock() + log.Printf("handled message %d %d", m.ConnID, m.MessageID) + return + } + <-conn.wait + } + return +} + +func (conn *Conn) SendMessageID() (id uint32) { + id = atomic.LoadUint32(conn.sendMessageID) + atomic.AddUint32(conn.sendMessageID, 1) + return +} + +func (conn *Conn) Run(c *net.TCPConn) { + go func() { + _, err := io.Copy(c, conn) + if err != nil { + log.Printf(err.Error()) + } + }() + + _, err := io.Copy(conn, c) + if err != nil { + log.Printf(err.Error()) + } + + return +} diff --git a/core/mux/group.go b/core/mux/group.go new file mode 100644 index 0000000..8580be5 --- /dev/null +++ b/core/mux/group.go @@ -0,0 +1,88 @@ +package mux + +import ( + "errors" + "log" + "time" +) + +type Group struct { + client bool + + MuxWSs []*MuxWebSocket + + Conns []*Conn +} + +//true: client group +//false: server group +func NewGroup(client bool) (group *Group) { + group = &Group{ + client: client, + } + return +} + +func (group *Group) Send(m *Message) (err error) { + //todo + for group.MuxWSs != nil { + err = group.MuxWSs[0].Send(m) + return + } + return +} + +func (group *Group) Handle(m *Message) { + //log.Printf("group received %#v", m) + + if !group.client && m.Method != MessageMethodData { + group.ServerHandleMessage(m) + return + } + + //get conn and send message + //todo better way to find conn + for { + t := time.Now() + for _, conn := range group.Conns { + if conn.ID == m.ConnID { + log.Printf("find conn id %x", conn.ID) + err := conn.HandleMessage(m) + if err != nil { + log.Println(err.Error()) + return + } + return + } + } + if time.Now().After(t.Add(time.Second * 3)) { + err := errors.New("conn does not exist") + log.Println(err.Error()) + return + } + } + return +} + +func (group *Group) AddMuxWS(muxWS *MuxWebSocket) (err error) { + muxWS.group = group + group.MuxWSs = append(group.MuxWSs, muxWS) + group.Listen(muxWS) + return +} + +func (group *Group) Listen(muxWS *MuxWebSocket) { + go func() { + for { + log.Println("ready to receive") + m, err := muxWS.Receive() + if err != nil { + log.Printf(err.Error()) + return + } + + go group.Handle(m) + } + return + }() +} diff --git a/core/mux/message.go b/core/mux/message.go new file mode 100644 index 0000000..7559f4e --- /dev/null +++ b/core/mux/message.go @@ -0,0 +1,53 @@ +package mux + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" +) + +const ( + MessageMethodData = iota + MessageMethodDial +) + +//MessageHeadLength = 13 +type Message struct { + Method uint8 + ConnID uint32 + MessageID uint32 + Length uint32 + Data []byte + + r io.Reader + buf []byte +} + +func (m *Message) Read(p []byte) (n int, err error) { + if m.r == nil { + h := make([]byte, 13) + h[0] = m.Method + binary.BigEndian.PutUint32(h[1:5], m.ConnID) + binary.BigEndian.PutUint32(h[5:9], m.MessageID) + binary.BigEndian.PutUint32(h[9:13], m.Length) + m.r = bytes.NewReader(append(h, m.Data...)) + } + + return m.r.Read(p) +} + +func LoadMessage(h []byte) (m *Message) { + if len(h) != 13 { + panic(fmt.Sprintf("wrong head length: %d", len(h))) + return + } + + m = &Message{ + Method: h[0], + ConnID: binary.BigEndian.Uint32(h[1:5]), + MessageID: binary.BigEndian.Uint32(h[5:9]), + Length: binary.BigEndian.Uint32(h[9:13]), + } + return +} diff --git a/core/mux/server.go b/core/mux/server.go new file mode 100644 index 0000000..14ae369 --- /dev/null +++ b/core/mux/server.go @@ -0,0 +1,42 @@ +package mux + +import ( + "log" + "net" +) + +//ServerHandleMessage is a server group function +func (group *Group) ServerHandleMessage(m *Message) (err error) { + //accept new conn + if m.Method == MessageMethodDial { + host := string(m.Data) + log.Printf("start to dial %s", host) + conn := &Conn{ + ID: m.ConnID, + wait: make(chan int), + sendMessageID: new(uint32), + group: group, + } + + //add to group before receive data + group.Conns = append(group.Conns, conn) + + tcpAddr, err := net.ResolveTCPAddr("tcp", host) + if err != nil { + log.Printf(err.Error()) + return err + } + + tcpConn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Printf(err.Error()) + return err + } + + log.Printf("Accepted mux conn %s", host) + + conn.Run(tcpConn) + return err + } + return +} diff --git a/core/mux/websocket.go b/core/mux/websocket.go new file mode 100644 index 0000000..b9a84d0 --- /dev/null +++ b/core/mux/websocket.go @@ -0,0 +1,55 @@ +package mux + +import ( + "io" + "sync" + + "github.com/lzjluzijie/websocks/core" +) + +type MuxWebSocket struct { + *core.WebSocket + + group *Group + + mutex sync.Mutex +} + +func NewMuxWebSocket(ws *core.WebSocket) (muxWS *MuxWebSocket) { + muxWS = &MuxWebSocket{ + WebSocket: ws, + } + return +} + +func (muxWS *MuxWebSocket) Send(m *Message) (err error) { + muxWS.mutex.Lock() + _, err = io.Copy(muxWS, m) + if err != nil { + return + } + + //log.Printf("sent %#v", m) + muxWS.mutex.Unlock() + return +} + +func (muxWS *MuxWebSocket) Receive() (m *Message, err error) { + h := make([]byte, 13) + _, err = muxWS.Read(h) + if err != nil { + return + } + + m = LoadMessage(h) + data := make([]byte, m.Length) + + _, err = muxWS.Read(data) + if err != nil { + return + } + + m.Data = data + //log.Printf("received %#v", m) + return +} diff --git a/core/muxwebsocket.go b/core/muxwebsocket.go deleted file mode 100644 index 099fc88..0000000 --- a/core/muxwebsocket.go +++ /dev/null @@ -1,61 +0,0 @@ -package core - -import ( - "encoding/gob" - "sync" -) - -type MuxWebSocket struct { - *WebSocket - Decoder *gob.Decoder - Encoder *gob.Encoder - - muxConns []*MuxConn - muxConnID []uint64 - mutex sync.Mutex -} - -func NewMuxWebSocket(ws *WebSocket) (muxWS *MuxWebSocket) { - dec := gob.NewDecoder(ws) - enc := gob.NewEncoder(ws) - - muxWS = &MuxWebSocket{ - WebSocket: ws, - Decoder: dec, - Encoder: enc, - } - return -} - -func (muxWS *MuxWebSocket) SendMessage(m *Message) (err error) { - err = muxWS.Encoder.Encode(m) - logger.Debugf("sent %#v", m) - return -} - -func (muxWS *MuxWebSocket) ReceiveMessage() (m *Message, err error) { - m = &Message{} - err = muxWS.Decoder.Decode(m) - logger.Debugf("received %#v", m) - return -} - -func (muxWS *MuxWebSocket) PutMuxConn(conn *MuxConn) { - muxWS.mutex.Lock() - muxWS.muxConns = append(muxWS.muxConns, conn) - muxWS.muxConnID = append(muxWS.muxConnID, conn.ID) - muxWS.mutex.Unlock() - return -} - -func (muxWS *MuxWebSocket) GetMuxConn(connID uint64) (conn *MuxConn) { - muxWS.mutex.Lock() - for n, id := range muxWS.muxConnID { - if id == connID { - conn = muxWS.muxConns[n] - break - } - } - muxWS.mutex.Unlock() - return -} diff --git a/core/stats.go b/core/stats.go index f9b24fa..77cae24 100644 --- a/core/stats.go +++ b/core/stats.go @@ -5,6 +5,7 @@ import ( "time" ) +//todo better stats type Stats struct { Downloaded uint64 DownloadSpeed uint64 diff --git a/core/websocket.go b/core/websocket.go index 0c77483..f2f4ad2 100644 --- a/core/websocket.go +++ b/core/websocket.go @@ -2,14 +2,12 @@ package core import ( "errors" + "log" "time" "github.com/gorilla/websocket" - "github.com/juju/loggo" ) -var logger = loggo.GetLogger("core") - type WebSocket struct { conn *websocket.Conn buf []byte @@ -35,6 +33,7 @@ func (ws *WebSocket) Read(p []byte) (n int, err error) { } if len(ws.buf) == 0 { + log.Println("empty buf, waiting") _, ws.buf, err = ws.conn.ReadMessage() if err != nil { return diff --git a/server/app.go b/server/app.go deleted file mode 100644 index 7738e46..0000000 --- a/server/app.go +++ /dev/null @@ -1,94 +0,0 @@ -package server - -import ( - "crypto/tls" - "net/http" - - "encoding/json" - "io/ioutil" - - "github.com/gorilla/sessions" - "gopkg.in/macaron.v1" -) - -type App struct { - //todo multiple servers - *WebSocksServer - - WebListenAddr string - TLS bool - CertPath string - KeyPath string - - s http.Server - store sessions.Store - m *macaron.Macaron -} - -func LoadApp() (app *App, err error) { - app = &App{} - data, err := ioutil.ReadFile("server.json") - if err != nil { - return - } - - err = json.Unmarshal(data, app) - if err != nil { - return - } - return -} - -func NewApp() (app *App) { - app = &App{ - WebListenAddr: ":23333", - TLS: false, - CertPath: "websocks.cer", - KeyPath: "websocks.key", - } - return -} - -func (app *App) Save() (err error) { - data, err := json.MarshalIndent(app, "", " ") - if err != nil { - return - } - - err = ioutil.WriteFile("server.json", data, 0600) - return -} - -func (app *App) Run() (err error) { - m := app.Macaron() - app.m = m - app.s = http.Server{ - Addr: app.WebListenAddr, - Handler: m, - } - - if !app.TLS { - err = app.s.ListenAndServe() - if err != nil { - return - } - return - } - - app.s.TLSConfig = &tls.Config{ - CipherSuites: []uint16{ - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - }, - } - - err = app.s.ListenAndServeTLS(app.CertPath, app.KeyPath) - if err != nil { - return err - } - return -} diff --git a/server/config.go b/server/config.go index f41980c..3460e87 100644 --- a/server/config.go +++ b/server/config.go @@ -4,7 +4,6 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/juju/loggo" "github.com/lzjluzijie/websocks/core" ) @@ -28,7 +27,5 @@ func (config *Config) GetServer() (server *WebSocksServer) { CreatedAt: time.Now(), Stats: core.NewStats(), } - - logger.SetLogLevel(loggo.DEBUG) return } diff --git a/server/login.go b/server/login.go deleted file mode 100644 index d4b26fa..0000000 --- a/server/login.go +++ /dev/null @@ -1,36 +0,0 @@ -package server - -import ( - "encoding/json" - "io/ioutil" - - "gopkg.in/macaron.v1" -) - -type Login struct { - Name string - Password string -} - -func (app *App) Login(ctx *macaron.Context) { - l := &Login{} - data, err := ioutil.ReadAll(ctx.Req.Body().ReadCloser()) - if err != nil { - ctx.Error(403, err.Error()) - } - - err = json.Unmarshal(data, l) - if err != nil { - ctx.Error(403, err.Error()) - } - - if l.Name == "halulu" && l.Password == "websocks" { - session, _ := app.store.Get(ctx.Req.Request, "cookie") - session.Values["authenticated"] = true - session.Save(ctx.Req.Request, ctx) - ctx.HTML(200, "ok") - return - } - - ctx.Error(403, "not halulu") -} diff --git a/server/mux.go b/server/mux.go deleted file mode 100644 index 7ba9405..0000000 --- a/server/mux.go +++ /dev/null @@ -1,89 +0,0 @@ -package server - -//import ( -// "errors" -// "fmt" -// "net" -// "time" -//) -// -//func (muxWS *MuxWebSocket) ServerListen() { -// //block and listen -// for { -// m, err := muxWS.ReceiveMessage() -// if err != nil { -// logger.Debugf(err.Error()) -// return -// } -// -// go muxWS.ServerHandleMessage(m) -// } -// return -//} -// -//func (muxWS *MuxWebSocket) ServerHandleMessage(m *Message) { -// //check message -// if m.Data == nil { -// return -// } -// -// //accept new conn -// if m.Method == MessageMethodDial { -// conn, host, err := muxWS.AcceptMuxConn(m) -// if err != nil { -// logger.Debugf(err.Error()) -// return -// } -// -// tcpAddr, err := net.ResolveTCPAddr("tcp", host) -// if err != nil { -// logger.Debugf(err.Error()) -// return -// } -// -// tcpConn, err := net.DialTCP("tcp", nil, tcpAddr) -// if err != nil { -// logger.Debugf(err.Error()) -// return -// } -// -// logger.Debugf("Accepted mux conn %s", host) -// -// conn.Run(tcpConn) -// return -// } -// -// //get conn and send message -// conn := muxWS.GetMuxConn(m.ConnID) -// if conn == nil { -// time.Sleep(time.Second) -// conn = muxWS.GetMuxConn(m.ConnID) -// if conn == nil { -// logger.Debugf("conn %d do not exist", m.ConnID) -// return -// } -// } -// err := conn.HandleMessage(m) -// if err != nil { -// logger.Debugf(err.Error()) -// return -// } -//} -// -//func (muxWS *MuxWebSocket) AcceptMuxConn(m *Message) (conn *MuxConn, host string, err error) { -// if m.Method != MessageMethodDial { -// err = errors.New(fmt.Sprintf("wrong message method %d", m.Method)) -// return -// } -// -// host = string(m.Data) -// -// conn = &MuxConn{ -// ID: m.ConnID, -// MuxWS: muxWS, -// wait: make(chan int), -// sendMessageID: new(uint64), -// } -// muxWS.PutMuxConn(conn) -// return -//} diff --git a/server/server.go b/server/server.go index acdd1db..3063343 100644 --- a/server/server.go +++ b/server/server.go @@ -2,11 +2,13 @@ package server import ( "io" + "log" "net" "net/http" - "sync" "time" + "github.com/lzjluzijie/websocks/core/mux" + "net/http/httputil" "net/url" @@ -18,16 +20,14 @@ import ( "github.com/lzjluzijie/websocks/core" ) -//todo -var logger = loggo.GetLogger("server") - type WebSocksServer struct { *Config LogLevel loggo.Level - Upgrader *websocket.Upgrader - muxConnMap sync.Map - mutex sync.Mutex + Upgrader *websocket.Upgrader + + //todo multiple clients + muxGroup *mux.Group CreatedAt time.Time Stats *core.Stats @@ -36,40 +36,44 @@ type WebSocksServer struct { func (server *WebSocksServer) HandleWebSocket(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { wsConn, err := server.Upgrader.Upgrade(w, r, nil) if err != nil { - logger.Debugf(err.Error()) + log.Printf(err.Error()) return } defer wsConn.Close() ws := core.NewWebSocket(wsConn, server.Stats) - //todo conns - ////mux - //if r.Header.Get("WebSocks-Mux") == "mux" { - // muxWS := NewMuxWebSocket(ws) - // muxWS.ServerListen() - // return - //} + //mux + //todo multiple clients + if r.Header.Get("WebSocks-Mux") == "v0.15" { + if server.muxGroup == nil { + server.muxGroup = mux.NewGroup(false) + } + muxWS := mux.NewMuxWebSocket(ws) + server.muxGroup.AddMuxWS(muxWS) + time.Sleep(time.Hour) + return + } host := r.Header.Get("WebSocks-Host") - logger.Debugf("Dial %s", host) + log.Printf("Dial %s", host) conn, err := server.DialRemote(host) if err != nil { - logger.Debugf(err.Error()) + log.Printf(err.Error()) return } go func() { _, err = io.Copy(conn, ws) if err != nil { - logger.Debugf(err.Error()) + log.Printf(err.Error()) return } }() _, err = io.Copy(ws, conn) if err != nil { - logger.Debugf(err.Error()) + log.Printf(err.Error()) return } @@ -102,7 +106,7 @@ func (server *WebSocksServer) Run() (err error) { Handler: r, } - logger.Infof("Start to listen at %s", server.ListenAddr) + log.Printf("Start to listen at %s", server.ListenAddr) if !server.TLS { err = s.ListenAndServe() diff --git a/server/web.go b/server/web.go deleted file mode 100644 index 52b2186..0000000 --- a/server/web.go +++ /dev/null @@ -1,86 +0,0 @@ -package server - -import ( - "encoding/json" - "io/ioutil" - "log" - "os/exec" - - "github.com/go-macaron/pongo2" - "github.com/gorilla/sessions" - "gopkg.in/macaron.v1" -) - -func (app *App) Macaron() (m *macaron.Macaron) { - app.store = sessions.NewCookieStore([]byte("just a test")) - - m = macaron.New() - m.Use(pongo2.Pongoer()) - - //login check - m.Use(func(ctx *macaron.Context) { - if ctx.Req.RequestURI != "" { - - } - - session, _ := app.store.Get(ctx.Req.Request, "cookie") - if auth, ok := session.Values["authenticated"].(bool); !ok || !auth { - ctx.Error(403, "not halulu") - return - } - }) - - m.Get("/", func(ctx *macaron.Context) { - ctx.HTML(200, "server/server") - }) - - m.Get("/login", func(ctx *macaron.Context) { - ctx.HTML(200, "server/login") - }) - - //api v0 - m.Group("/api/v0/server", func() { - m.Get("/stats", func(ctx *macaron.Context) { - if app.WebSocksServer == nil { - ctx.Error(403, "websocks server is not running") - return - } - - stats := app.Stats - ctx.JSON(200, stats) - }) - m.Post("/start", app.StartServer) - //m.Post("/stop", app.StopServer) - - m.Post("/login", app.Login) - }) - - go func() { - err := exec.Command("explorer", "http://127.0.0.1:23333").Run() - if err != nil { - log.Println(err.Error()) - return - } - }() - - return -} - -func (app *App) StartServer(ctx *macaron.Context) { - config := &Config{} - data, err := ioutil.ReadAll(ctx.Req.Body().ReadCloser()) - if err != nil { - ctx.Error(403, err.Error()) - } - - err = json.Unmarshal(data, config) - if err != nil { - ctx.Error(403, err.Error()) - } - ctx.JSON(200, config) - - webSocksServer := config.GetServer() - app.WebSocksServer = webSocksServer - app.m.Get(webSocksServer.Pattern, webSocksServer.HandleWebSocket) - return -} diff --git a/templates/base/base.html b/templates/base/base.html deleted file mode 100644 index 370bdd2..0000000 --- a/templates/base/base.html +++ /dev/null @@ -1,22 +0,0 @@ - - - - - - WebSocks - - {% block head %} {% endblock %} - - -{% block body %} {% endblock %} - - - - - \ No newline at end of file diff --git a/templates/client/client.html b/templates/client/client.html deleted file mode 100644 index ce8b89d..0000000 --- a/templates/client/client.html +++ /dev/null @@ -1,153 +0,0 @@ -{% extends "base/base.html" %} -{% block head %} - - - -{% endblock %} -{% block body %} - -
-
-
-
-
-
-
-

- WebSocks Client -

-

- A secure proxy based on WebSocket. -

-
-
-
- -
-
- -
- -
-
-
-
- -
-
-
- Running…… -
-

DownloadSpeed: {{ DownloadSpeed }}

-
-

UploadSpeed: {{ UploadSpeed }}

-
-

Downloaded: {{ Downloaded }}

-
-

Uploaded: {{ Uploaded }}

-
-
-
-
-
-
-
- -
-
-
-
- -
- -
-
-
- -
- -
-
- -
- -
- -
-
- -
-
- - -
-
-
- -
-
- -
-
- -
-
-
-

Logs

- -
-
-
-{% endblock %} \ No newline at end of file diff --git a/templates/home.html b/templates/home.html deleted file mode 100644 index 3d9971f..0000000 --- a/templates/home.html +++ /dev/null @@ -1,52 +0,0 @@ -{% extends "base/base.html" %} -{% block body %} -
-
-

- WebSocks -

-

- A secure proxy based on websocket. -

-

- This project is still working in progress, more features are still in - development. If you are interested in this project, please star this project - in order to support me. Thank you. -

-

- If you have any problems or suggestions, please do not hesitate to submit - issues or contact me - - @halulu - - . We also have a - - telegram group - - (mostly Chinese, English is ok). -

- -
-
-{% endblock %} \ No newline at end of file diff --git a/templates/server/login.html b/templates/server/login.html deleted file mode 100644 index 6b60295..0000000 --- a/templates/server/login.html +++ /dev/null @@ -1,43 +0,0 @@ -{% extends "base/base.html" %} -{% block head %} - - -{% endblock %} -{% block body %} -
-
-
-
-

Login

-

Please login to proceed.

-
-
-
- -
-
- -
-
- -
-
- -
-
-
-
-
-{% endblock %} \ No newline at end of file diff --git a/templates/server/server.html b/templates/server/server.html deleted file mode 100644 index 9a34b74..0000000 --- a/templates/server/server.html +++ /dev/null @@ -1,126 +0,0 @@ -{% extends "base/base.html" %} -{% block head %} - - - -{% endblock %} -{% block body %} -
-
-

- WebSocks Server -

-

- A secure proxy based on WebSocket. -

- -
-
- -
-
-
-
-
-
- -
- -
-
-
-
-
-
-

- -

-
-
-

- -

-
-
-
-
-
- -
-
-
-

Server Stats

-
- - - - - - - - - - - - - - - - - -
- Downloaded - - Uploaded - - DownloadSpeed - - UploadSpeed -
- 0 - - 0 - - 0 - - 0 -
-
-
-
-
-{% endblock %} \ No newline at end of file diff --git a/websocks.go b/websocks.go index b59db71..bedafc8 100644 --- a/websocks.go +++ b/websocks.go @@ -1,16 +1,14 @@ package main import ( - "os" - "errors" "io/ioutil" + "os" "os/exec" "runtime" "log" - "github.com/juju/loggo" "github.com/lzjluzijie/websocks/client" "github.com/lzjluzijie/websocks/core" "github.com/lzjluzijie/websocks/server" @@ -18,35 +16,23 @@ import ( ) func main() { - logger := loggo.GetLogger("websocks") - logger.SetLogLevel(loggo.INFO) - app := cli.App{ - Name: "WebSocks", - Version: "0.13.2", - Usage: "A secure proxy based on WebSocket. Click to start web client.", + Name: "WebSocks", + /* + todo more websocket connections + todo better log + todo better stats + */ + Version: "0.15.0", + Usage: "A secure proxy based on WebSocket.", Description: "websocks.org", Author: "Halulu", Email: "lzjluzijie@gmail.com", - Action: func(c *cli.Context) (err error) { - app, err := client.LoadApp() - if err != nil { - log.Println("can not load client.json, create not one") - app = client.NewApp() - err = app.Save() - if err != nil { - log.Printf("save config: %s", err.Error()) - } - } - - err = app.Run() - return - }, Commands: []cli.Command{ { Name: "client", Aliases: []string{"c"}, - Usage: "start websocks client(cli)", + Usage: "start websocks client", Flags: []cli.Flag{ cli.StringFlag{ Name: "l", @@ -59,8 +45,9 @@ func main() { Usage: "server url", }, cli.BoolFlag{ - Name: "mux", - Usage: "mux mode", + Name: "mux", + //todo + Usage: "mux mode(test)", }, cli.StringFlag{ Name: "sni", @@ -135,7 +122,6 @@ func main() { }, }, Action: func(c *cli.Context) (err error) { - debug := c.GlobalBool("debug") listenAddr := c.String("l") pattern := c.String("p") tls := c.Bool("tls") @@ -143,12 +129,6 @@ func main() { keyPath := c.String("key") reverseProxy := c.String("reverse-proxy") - if debug { - logger.SetLogLevel(loggo.DEBUG) - } - - logger.Infof("Log level %s", logger.LogLevel().String()) - if pattern[0] != '/' { pattern = "/" + pattern } @@ -163,7 +143,7 @@ func main() { } websocksServer := config.GetServer() - logger.Infof("Listening at %s", listenAddr) + log.Printf("Listening at %s", listenAddr) err = websocksServer.Run() if err != nil { return @@ -194,10 +174,10 @@ func main() { var key, cert []byte if ecdsa { key, cert, err = core.GenP256(hosts) - logger.Infof("Generated ecdsa P-256 key and cert") + log.Printf("Generated ecdsa P-256 key and cert") } else { key, cert, err = core.GenRSA2048(hosts) - logger.Infof("Generated rsa 2048 key and cert") + log.Printf("Generated rsa 2048 key and cert") } err = ioutil.WriteFile("websocks.key", key, 0600) @@ -214,7 +194,7 @@ func main() { { Name: "pac", Aliases: []string{"pac"}, - Usage: "set pac for windows", + Usage: "set pac for windows(test)", Action: func(c *cli.Context) (err error) { if runtime.GOOS != "windows" { err = errors.New("not windows") @@ -225,29 +205,11 @@ func main() { return }, }, - { - Name: "webserver", - Aliases: []string{"w"}, - Usage: "web ui server", - Action: func(c *cli.Context) (err error) { - app, err := server.LoadApp() - if err != nil { - log.Println("can not load server.json, create not one") - app = server.NewApp() - err = app.Save() - if err != nil { - log.Printf("save config: %s", err.Error()) - } - } - - err = app.Run() - return - }, - }, }, } + err := app.Run(os.Args) if err != nil { - logger.Errorf(err.Error()) + log.Printf(err.Error()) } } From c68236d1f2fdf0020473e8ccfc1059c554e4e3ad Mon Sep 17 00:00:00 2001 From: halulu Date: Mon, 3 Dec 2018 12:54:46 +0800 Subject: [PATCH 3/5] mux conn map --- client/client.go | 6 ++-- core/mux/client.go | 6 ++-- core/mux/conn.go | 31 +++++++++++++++++- core/mux/group.go | 76 +++++++++++++++++++++++++++++++++---------- core/mux/server.go | 12 +++++-- core/mux/websocket.go | 15 +++++++-- core/websocket.go | 4 +-- websocks.go | 20 ++++++++++-- 8 files changed, 136 insertions(+), 34 deletions(-) diff --git a/client/client.go b/client/client.go index 19e6fa7..d4fdff2 100644 --- a/client/client.go +++ b/client/client.go @@ -88,7 +88,8 @@ func (client *WebSocksClient) Stop() { } func (client *WebSocksClient) HandleConn(conn *net.TCPConn) { - log.Println("new socks5 conn") + //debug log + //log.Println("new socks5 conn") lc, err := NewLocalConn(conn) if err != nil { @@ -105,7 +106,8 @@ func (client *WebSocksClient) HandleConn(conn *net.TCPConn) { return } - log.Printf("created #%v", muxConn) + //debug log + log.Printf("created new mux conn: %x", muxConn.ID) muxConn.Run(conn) return diff --git a/core/mux/client.go b/core/mux/client.go index c67500e..b415673 100644 --- a/core/mux/client.go +++ b/core/mux/client.go @@ -1,11 +1,9 @@ package mux -import "math/rand" - //NewMuxConn creates a new mux connection for client func (group *Group) NewMuxConn(host string) (conn *Conn, err error) { conn = &Conn{ - ID: rand.Uint32(), + ID: group.NextConnID(), wait: make(chan int), sendMessageID: new(uint32), group: group, @@ -24,6 +22,6 @@ func (group *Group) NewMuxConn(host string) (conn *Conn, err error) { return } - group.Conns = append(group.Conns, conn) + group.AddConn(conn) return } diff --git a/core/mux/conn.go b/core/mux/conn.go index d9ea4b7..ada08bd 100644 --- a/core/mux/conn.go +++ b/core/mux/conn.go @@ -1,6 +1,7 @@ package mux import ( + "errors" "io" "log" "net" @@ -8,6 +9,8 @@ import ( "sync/atomic" ) +var ErrConnClosed = errors.New("mux conn closed") + type Conn struct { ID uint32 @@ -17,11 +20,17 @@ type Conn struct { buf []byte wait chan int + closed bool + receiveMessageID uint32 sendMessageID *uint32 } func (conn *Conn) Write(p []byte) (n int, err error) { + if conn.closed { + return 0, ErrConnClosed + } + m := &Message{ Method: MessageMethodData, ConnID: conn.ID, @@ -38,6 +47,10 @@ func (conn *Conn) Write(p []byte) (n int, err error) { } func (conn *Conn) Read(p []byte) (n int, err error) { + if conn.closed { + return 0, ErrConnClosed + } + if len(conn.buf) == 0 { //log.Printf("%d buf is 0, waiting", conn.ID) <-conn.wait @@ -52,7 +65,13 @@ func (conn *Conn) Read(p []byte) (n int, err error) { } func (conn *Conn) HandleMessage(m *Message) (err error) { + if conn.closed { + return ErrConnClosed + } + + //debug log //log.Printf("handle message %d %d", m.ConnID, m.MessageID) + for { if conn.receiveMessageID == m.MessageID { conn.mutex.Lock() @@ -61,7 +80,8 @@ func (conn *Conn) HandleMessage(m *Message) (err error) { close(conn.wait) conn.wait = make(chan int) conn.mutex.Unlock() - log.Printf("handled message %d %d", m.ConnID, m.MessageID) + //debug log + //log.Printf("handled message %d %d", m.ConnID, m.MessageID) return } <-conn.wait @@ -79,14 +99,23 @@ func (conn *Conn) Run(c *net.TCPConn) { go func() { _, err := io.Copy(c, conn) if err != nil { + conn.Close() log.Printf(err.Error()) } }() _, err := io.Copy(conn, c) if err != nil { + conn.Close() log.Printf(err.Error()) } return } + +func (conn *Conn) Close() (err error) { + conn.group.DeleteConn(conn.ID) + //close(conn.wait) + conn.closed = true + return +} diff --git a/core/mux/group.go b/core/mux/group.go index 8580be5..1e2c471 100644 --- a/core/mux/group.go +++ b/core/mux/group.go @@ -2,7 +2,9 @@ package mux import ( "errors" + "fmt" "log" + "sync" "time" ) @@ -11,14 +13,19 @@ type Group struct { MuxWSs []*MuxWebSocket - Conns []*Conn + connMap map[uint32]*Conn + connMapMutex sync.RWMutex + + connID uint32 + connIDMutex sync.Mutex } //true: client group //false: server group func NewGroup(client bool) (group *Group) { group = &Group{ - client: client, + client: client, + connMap: make(map[uint32]*Conn), } return } @@ -41,22 +48,19 @@ func (group *Group) Handle(m *Message) { } //get conn and send message - //todo better way to find conn for { - t := time.Now() - for _, conn := range group.Conns { - if conn.ID == m.ConnID { - log.Printf("find conn id %x", conn.ID) - err := conn.HandleMessage(m) - if err != nil { - log.Println(err.Error()) - return - } - return - } + conn := group.GetConn(m.ConnID) + if conn == nil { + //debug log + err := errors.New(fmt.Sprintf("conn does not exist: %x", m.ConnID)) + log.Println(err.Error()) + log.Println(m) + return } - if time.Now().After(t.Add(time.Second * 3)) { - err := errors.New("conn does not exist") + + //this err should be nil or ErrConnClosed + err := conn.HandleMessage(m) + if err != nil { log.Println(err.Error()) return } @@ -64,6 +68,45 @@ func (group *Group) Handle(m *Message) { return } +func (group *Group) AddConn(conn *Conn) { + group.connMapMutex.Lock() + group.connMap[conn.ID] = conn + group.connMapMutex.Unlock() + return +} + +func (group *Group) DeleteConn(id uint32) { + delete(group.connMap, id) + return +} + +func (group *Group) GetConn(id uint32) (conn *Conn) { + group.connMapMutex.RLock() + conn = group.connMap[id] + group.connMapMutex.RUnlock() + + if conn == nil { + t := time.Now() + for time.Now().Before(t.Add(time.Second)) { + group.connMapMutex.RLock() + conn = group.connMap[id] + group.connMapMutex.RUnlock() + if conn != nil { + return conn + } + } + } + return +} + +func (group *Group) NextConnID() (id uint32) { + group.connIDMutex.Lock() + group.connID++ + id = group.connID + group.connIDMutex.Unlock() + return +} + func (group *Group) AddMuxWS(muxWS *MuxWebSocket) (err error) { muxWS.group = group group.MuxWSs = append(group.MuxWSs, muxWS) @@ -74,7 +117,6 @@ func (group *Group) AddMuxWS(muxWS *MuxWebSocket) (err error) { func (group *Group) Listen(muxWS *MuxWebSocket) { go func() { for { - log.Println("ready to receive") m, err := muxWS.Receive() if err != nil { log.Printf(err.Error()) diff --git a/core/mux/server.go b/core/mux/server.go index 14ae369..2e8a5c7 100644 --- a/core/mux/server.go +++ b/core/mux/server.go @@ -10,7 +10,10 @@ func (group *Group) ServerHandleMessage(m *Message) (err error) { //accept new conn if m.Method == MessageMethodDial { host := string(m.Data) - log.Printf("start to dial %s", host) + + //debug log + //log.Printf("start to dial %s", host) + conn := &Conn{ ID: m.ConnID, wait: make(chan int), @@ -19,21 +22,24 @@ func (group *Group) ServerHandleMessage(m *Message) (err error) { } //add to group before receive data - group.Conns = append(group.Conns, conn) + group.AddConn(conn) tcpAddr, err := net.ResolveTCPAddr("tcp", host) if err != nil { + conn.Close() log.Printf(err.Error()) return err } tcpConn, err := net.DialTCP("tcp", nil, tcpAddr) if err != nil { + conn.Close() log.Printf(err.Error()) return err } - log.Printf("Accepted mux conn %s", host) + //debug log + log.Printf("Accepted mux conn: %x, %s", conn.ID, host) conn.Run(tcpConn) return err diff --git a/core/mux/websocket.go b/core/mux/websocket.go index b9a84d0..0a04aaf 100644 --- a/core/mux/websocket.go +++ b/core/mux/websocket.go @@ -12,7 +12,8 @@ type MuxWebSocket struct { group *Group - mutex sync.Mutex + sMutex sync.Mutex + rMutex sync.Mutex } func NewMuxWebSocket(ws *core.WebSocket) (muxWS *MuxWebSocket) { @@ -23,21 +24,25 @@ func NewMuxWebSocket(ws *core.WebSocket) (muxWS *MuxWebSocket) { } func (muxWS *MuxWebSocket) Send(m *Message) (err error) { - muxWS.mutex.Lock() + muxWS.sMutex.Lock() _, err = io.Copy(muxWS, m) if err != nil { + //muxWS.Close() return } + muxWS.sMutex.Unlock() + //debug log //log.Printf("sent %#v", m) - muxWS.mutex.Unlock() return } func (muxWS *MuxWebSocket) Receive() (m *Message, err error) { + muxWS.rMutex.Lock() h := make([]byte, 13) _, err = muxWS.Read(h) if err != nil { + //muxWS.Close() return } @@ -46,10 +51,14 @@ func (muxWS *MuxWebSocket) Receive() (m *Message, err error) { _, err = muxWS.Read(data) if err != nil { + //muxWS.Close() return } + muxWS.rMutex.Unlock() m.Data = data + + ////debug log //log.Printf("received %#v", m) return } diff --git a/core/websocket.go b/core/websocket.go index f2f4ad2..fd9521c 100644 --- a/core/websocket.go +++ b/core/websocket.go @@ -2,7 +2,6 @@ package core import ( "errors" - "log" "time" "github.com/gorilla/websocket" @@ -33,7 +32,8 @@ func (ws *WebSocket) Read(p []byte) (n int, err error) { } if len(ws.buf) == 0 { - log.Println("empty buf, waiting") + //debug log + //log.Println("empty buf, waiting") _, ws.buf, err = ws.conn.ReadMessage() if err != nil { return diff --git a/websocks.go b/websocks.go index bedafc8..5e04c62 100644 --- a/websocks.go +++ b/websocks.go @@ -3,12 +3,11 @@ package main import ( "errors" "io/ioutil" + "log" "os" "os/exec" "runtime" - "log" - "github.com/lzjluzijie/websocks/client" "github.com/lzjluzijie/websocks/core" "github.com/lzjluzijie/websocks/server" @@ -208,6 +207,23 @@ func main() { }, } + ////pprof debug + //go func() { + // f, err := os.Create(fmt.Sprintf("%d.prof", time.Now().Unix())) + // if err != nil { + // panic(err) + // } + // + // err = pprof.StartCPUProfile(f) + // if err != nil { + // panic(err) + // } + // + // time.Sleep(time.Second * 30) + // pprof.StopCPUProfile() + // os.Exit(0) + //}() + err := app.Run(os.Args) if err != nil { log.Printf(err.Error()) From de17a1d36818702210fa3cedf6092dc2b256b38e Mon Sep 17 00:00:00 2001 From: halulu Date: Mon, 3 Dec 2018 14:25:13 +0800 Subject: [PATCH 4/5] fix mux websocket receive message --- client/client.go | 2 +- core/mux/group.go | 7 ++++--- core/mux/websocket.go | 35 +++++++++++++++++++++++++++++------ core/websocket.go | 8 +++++--- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/client/client.go b/client/client.go index d4fdff2..6efb4bc 100644 --- a/client/client.go +++ b/client/client.go @@ -107,7 +107,7 @@ func (client *WebSocksClient) HandleConn(conn *net.TCPConn) { } //debug log - log.Printf("created new mux conn: %x", muxConn.ID) + log.Printf("created new mux conn: %x %s", muxConn.ID, host) muxConn.Run(conn) return diff --git a/core/mux/group.go b/core/mux/group.go index 1e2c471..e8a876d 100644 --- a/core/mux/group.go +++ b/core/mux/group.go @@ -54,7 +54,7 @@ func (group *Group) Handle(m *Message) { //debug log err := errors.New(fmt.Sprintf("conn does not exist: %x", m.ConnID)) log.Println(err.Error()) - log.Println(m) + log.Printf("%X %X %X %d", m.Method, m.ConnID, m.MessageID, m.Length) return } @@ -76,7 +76,9 @@ func (group *Group) AddConn(conn *Conn) { } func (group *Group) DeleteConn(id uint32) { + group.connMapMutex.Lock() delete(group.connMap, id) + group.connMapMutex.Unlock() return } @@ -119,12 +121,11 @@ func (group *Group) Listen(muxWS *MuxWebSocket) { for { m, err := muxWS.Receive() if err != nil { - log.Printf(err.Error()) + log.Println(err.Error()) return } go group.Handle(m) } - return }() } diff --git a/core/mux/websocket.go b/core/mux/websocket.go index 0a04aaf..cd06aaa 100644 --- a/core/mux/websocket.go +++ b/core/mux/websocket.go @@ -1,7 +1,9 @@ package mux import ( + "bytes" "io" + "log" "sync" "github.com/lzjluzijie/websocks/core" @@ -27,7 +29,10 @@ func (muxWS *MuxWebSocket) Send(m *Message) (err error) { muxWS.sMutex.Lock() _, err = io.Copy(muxWS, m) if err != nil { - //muxWS.Close() + e := muxWS.Close() + if e != nil { + log.Println(e.Error()) + } return } muxWS.sMutex.Unlock() @@ -39,26 +44,44 @@ func (muxWS *MuxWebSocket) Send(m *Message) (err error) { func (muxWS *MuxWebSocket) Receive() (m *Message, err error) { muxWS.rMutex.Lock() + h := make([]byte, 13) + _, err = muxWS.Read(h) if err != nil { - //muxWS.Close() + e := muxWS.Close() + if e != nil { + log.Println(e.Error()) + } return } + //debug log + //log.Printf("%d %x",n, h) + m = LoadMessage(h) - data := make([]byte, m.Length) + buf := &bytes.Buffer{} + r := io.LimitReader(muxWS, int64(m.Length)) - _, err = muxWS.Read(data) + _, err = io.Copy(buf, r) if err != nil { - //muxWS.Close() + e := muxWS.Close() + if e != nil { + log.Println(e.Error()) + } return } muxWS.rMutex.Unlock() - m.Data = data + m.Data = buf.Bytes() ////debug log //log.Printf("received %#v", m) return } + +func (muxWS *MuxWebSocket) Close() (err error) { + muxWS.group.MuxWSs = nil + err = muxWS.WebSocket.Close() + return +} diff --git a/core/websocket.go b/core/websocket.go index fd9521c..f0b8cdd 100644 --- a/core/websocket.go +++ b/core/websocket.go @@ -7,6 +7,8 @@ import ( "github.com/gorilla/websocket" ) +var ErrWebSocketClosed = errors.New("websocket closed") + type WebSocket struct { conn *websocket.Conn buf []byte @@ -28,7 +30,7 @@ func NewWebSocket(conn *websocket.Conn, stats *Stats) (ws *WebSocket) { func (ws *WebSocket) Read(p []byte) (n int, err error) { if ws.closed == true { - return 0, errors.New("websocket closed") + return 0, ErrWebSocketClosed } if len(ws.buf) == 0 { @@ -51,7 +53,7 @@ func (ws *WebSocket) Read(p []byte) (n int, err error) { func (ws *WebSocket) Write(p []byte) (n int, err error) { if ws.closed == true { - return 0, errors.New("websocket closed") + return 0, ErrWebSocketClosed } err = ws.conn.WriteMessage(websocket.BinaryMessage, p) @@ -68,7 +70,7 @@ func (ws *WebSocket) Write(p []byte) (n int, err error) { } func (ws *WebSocket) Close() (err error) { - ws.conn.Close() ws.closed = true + err = ws.conn.Close() return } From a6a2d139fa04da263368dd0260a77590d9dc3f1e Mon Sep 17 00:00:00 2001 From: halulu Date: Mon, 3 Dec 2018 14:58:01 +0800 Subject: [PATCH 5/5] v0.15.1 --- websocks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/websocks.go b/websocks.go index 5e04c62..93c9a88 100644 --- a/websocks.go +++ b/websocks.go @@ -22,7 +22,7 @@ func main() { todo better log todo better stats */ - Version: "0.15.0", + Version: "0.15.1", Usage: "A secure proxy based on WebSocket.", Description: "websocks.org", Author: "Halulu",