From 494c281a904e56229afec6c32bde507670fb0a9b Mon Sep 17 00:00:00 2001 From: database64128 Date: Mon, 16 Aug 2021 00:52:18 +0800 Subject: [PATCH] =?UTF-8?q?=E2=AC=87=EF=B8=8F=20Concurrently=20pull=20from?= =?UTF-8?q?=20all=20upstreams=20(#24)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.go | 72 ++++++++++++++++++++++++++++++------------- config/outline.go | 14 ++++++--- config/upstream.go | 1 + dispatcher/tcp/tcp.go | 6 ++-- example.json | 9 +++++- example_fullview.json | 9 +++++- main.go | 2 +- reload.go | 1 - 8 files changed, 81 insertions(+), 33 deletions(-) diff --git a/config/config.go b/config/config.go index fa7ea36..93c7067 100644 --- a/config/config.go +++ b/config/config.go @@ -18,19 +18,24 @@ type Config struct { ConfPath string `json:"-"` Groups []Group `json:"groups"` } + type Server struct { + Name string `json:"name"` Target string `json:"target"` Method string `json:"method"` Password string `json:"password"` MasterKey []byte `json:"-"` UpstreamConf *UpstreamConf `json:"-"` } + type Group struct { + Name string `json:"name"` Port int `json:"port"` Servers []Server `json:"servers"` Upstreams []UpstreamConf `json:"upstreams"` UserContextPool *UserContextPool `json:"-"` } + type UpstreamConf map[string]string const ( @@ -78,6 +83,7 @@ func (g *Group) BuildMasterKeys() { s.MasterKey = cipher.EVPBytesToKey(s.Password, cipher.CiphersConf[s.Method].KeyLen) } } + func (g *Group) BuildUserContextPool(timeout time.Duration) { g.UserContextPool = (*UserContextPool)(lru.New(lru.FixedTimeout, int64(timeout))) } @@ -92,6 +98,7 @@ func (config *Config) CheckMethodSupported() error { } return nil } + func (config *Config) CheckDiverseCombinations() error { groups := config.Groups type methodPasswd struct { @@ -112,43 +119,63 @@ func (config *Config) CheckDiverseCombinations() error { } return nil } + +func pullFromUpstream(upstream Upstream, upstreamConf *UpstreamConf) ([]Server, error) { + servers, err := upstream.GetServers() + if err != nil { + return nil, err + } + for i := range servers { + servers[i].UpstreamConf = upstreamConf + } + return servers, nil +} + func parseUpstreams(config *Config) (err error) { - logged := false + var wg sync.WaitGroup + for i := range config.Groups { - g := &config.Groups[i] - for j, upstreamConf := range g.Upstreams { + group := &config.Groups[i] + mu := sync.Mutex{} + for i := range group.Upstreams { var upstream Upstream - switch upstreamConf["type"] { + upstreamConf := &group.Upstreams[i] + + switch (*upstreamConf)["type"] { case "outline": var outline Outline - err = Map2Upstream(upstreamConf, &outline) + err = Map2Upstream(*upstreamConf, &outline) if err != nil { - return + return err } upstream = outline default: - return fmt.Errorf("unknown upstream type: %v", upstreamConf["type"]) + return fmt.Errorf("unknown upstream type: %v", (*upstreamConf)["type"]) } - if !logged { - log.Println("pulling configures from upstreams...") - logged = true - } - servers, err := upstream.GetServers() - if err != nil { - if netError := new(net.Error); errors.As(err, netError) { - upstreamConf[PullingErrorKey] = PullingErrorNetError + + wg.Add(1) + go func(group *Group, upstreamConf *UpstreamConf) { + defer wg.Done() + servers, err := pullFromUpstream(upstream, upstreamConf) + if err != nil { + if netError := new(net.Error); errors.As(err, netError) { + (*upstreamConf)[PullingErrorKey] = PullingErrorNetError + } + log.Printf("[warning] Failed to pull from group %s upstream %s: %v\n", group.Name, upstream.GetName(), err) + return } - log.Printf("[warning] Failed to retrieve configure from groups[%d].upstreams[%d]: %v: %v\n", i, j, err, upstreamConf[PullingErrorKey]) - continue - } - for i := range servers { - servers[i].UpstreamConf = &upstreamConf - } - g.Servers = append(g.Servers, servers...) + mu.Lock() + group.Servers = append(group.Servers, servers...) + mu.Unlock() + log.Printf("Pulled %d servers from group %s upstream %s", len(servers), group.Name, upstream.GetName()) + }(group, upstreamConf) } } + + wg.Wait() return nil } + func check(config *Config) (err error) { if err = config.CheckMethodSupported(); err != nil { return @@ -158,6 +185,7 @@ func check(config *Config) (err error) { } return } + func build(config *Config) { for i := range config.Groups { g := &config.Groups[i] diff --git a/config/outline.go b/config/outline.go index 18c820c..3200924 100644 --- a/config/outline.go +++ b/config/outline.go @@ -18,6 +18,7 @@ import ( ) type Outline struct { + Name string `json:"name"` Type string `json:"type"` Server string `json:"server"` Link string `json:"link"` @@ -208,6 +209,10 @@ func sudoCombinedOutput(client *ssh.Client, password string, cmd string) (b []by return b, err } +func (outline Outline) GetName() string { + return outline.Name +} + func (outline Outline) GetServers() (servers []Server, err error) { defer func() { if err != nil { @@ -223,7 +228,7 @@ func (outline Outline) GetServers() (servers []Server, err error) { if err != nil { return } - return conf.ToServers(outline.Server), nil + return conf.ToServers(outline.Name, outline.Server), nil } type AccessKey struct { @@ -235,12 +240,13 @@ type AccessKey struct { Method string `json:"method"` // the alias of EncryptionMethod } -func (key *AccessKey) ToServer(host string) Server { +func (key *AccessKey) ToServer(name, host string) Server { method := key.EncryptionMethod if method == "" { method = key.Method } return Server{ + Name: fmt.Sprintf("%s - %s", name, key.Name), Target: net.JoinHostPort(host, strconv.Itoa(key.Port)), Method: method, Password: key.Password, @@ -251,10 +257,10 @@ type ShadowboxConfig struct { AccessKeys []AccessKey `json:"accessKeys"` } -func (c *ShadowboxConfig) ToServers(host string) []Server { +func (c *ShadowboxConfig) ToServers(name, host string) []Server { var servers []Server for _, k := range c.AccessKeys { - servers = append(servers, k.ToServer(host)) + servers = append(servers, k.ToServer(name, host)) } return servers } diff --git a/config/upstream.go b/config/upstream.go index e4ff077..957c63a 100644 --- a/config/upstream.go +++ b/config/upstream.go @@ -6,6 +6,7 @@ import ( ) type Upstream interface { + GetName() string GetServers() (servers []Server, err error) } diff --git a/dispatcher/tcp/tcp.go b/dispatcher/tcp/tcp.go index 4c0eab2..339ebf1 100644 --- a/dispatcher/tcp/tcp.go +++ b/dispatcher/tcp/tcp.go @@ -108,17 +108,17 @@ func (d *TCP) handleConn(conn net.Conn) error { // dial and relay rc, err := net.Dial("tcp", server.Target) if err != nil { - return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn dial error: %w", conn.RemoteAddr(), conn.LocalAddr(), rc.RemoteAddr(), err) + return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn dial error: %w", conn.RemoteAddr(), conn.LocalAddr(), server.Target, err) } _ = rc.(*net.TCPConn).SetKeepAlive(true) _ = rc.SetDeadline(time.Now().Add(DefaultTimeout)) _, err = rc.Write(data[:n]) if err != nil { - return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn write error: %w", conn.RemoteAddr(), conn.LocalAddr(), rc.RemoteAddr(), err) + return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn write error: %w", conn.RemoteAddr(), conn.LocalAddr(), server.Target, err) } - log.Printf("[tcp] %s <-> %s <-> %s", conn.RemoteAddr(), conn.LocalAddr(), rc.RemoteAddr()) + log.Printf("[tcp] %s <-> %s <-> %s", conn.RemoteAddr(), conn.LocalAddr(), server.Target) if err := relay(conn, rc); err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { diff --git a/example.json b/example.json index d00f793..2283eef 100644 --- a/example.json +++ b/example.json @@ -1,14 +1,17 @@ { "groups": [ { + "name": "Group A", "port": 1090, "servers": [ { + "name": "Server A0", "target": "45.10.10.10:8081", "method": "chacha20-ietf-poly1305", "password": "mypassword" }, { + "name": "Server A1", "target": "jp.myss.cloudflare.com:18080", "method": "aes-128-gcm", "password": "hereismypasswrod" @@ -16,19 +19,23 @@ ] }, { + "name": "Group B", "port": 1091, "servers": [ { + "name": "Server B0", "target": "45.10.10.11:8088", "method": "chacha20-ietf-poly1305", "password": "mypassword" }, { + "name": "Server B1", "target": "45.10.10.12:18088", "method": "chacha20-ietf-poly1305", "password": "mypassword2" }, { + "name": "Server B2", "target": "jp.myss.cloudflare.com:18080", "method": "aes-128-gcm", "password": "hereismypasswrod" @@ -36,4 +43,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/example_fullview.json b/example_fullview.json index 0e033f1..9a80fd6 100644 --- a/example_fullview.json +++ b/example_fullview.json @@ -1,9 +1,11 @@ { "groups": [ { + "name": "Group A", "port": 1090, "upstreams": [ { + "name": "Outline A0", "type": "outline", "server": "23.115.204.133", "sshPort": "22", @@ -11,6 +13,7 @@ "sshPassword": "bt92ew4yTTRfPHL335" }, { + "name": "Outline A1", "type": "outline", "server": "103.10.23.145", "sshPort": "22", @@ -18,11 +21,13 @@ "sshPrivateKey": "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn\nNhAAAAAwEAAQAAAYEAynXZl9Uu+XmZ+tFNWxG6hn0FrIC+BgNvtsiqdLag7P/xpJ+84sSS\nT032JydbYCbkD2E+qLAJXdH6ZFw65HKj31kYo8jWUUjZLe5WZFEVunI027/s496SI6e6o9\n2pRjyAFmh3s1sd/a1HOWLYCeyOHEt2SVB/EKxlf/KMC6z4JDmZmMtUquPYXBCDPbwhG6jW\nIyYqNv7d/G6cno6CK0WuyHueufSEn8AFRRv4ptunR1xWv4JSavqnHubJJxBFMmSE8pNren\nZy94ccLpOTIif+lbq2F3DmzMCM+8xbrv8EfQmwLz+Kb3be8zORcHGgiv2CJ4hoa/joBA7U\n25rQ+KFE4m9T5ro7Gf2vbjuPNDTgHe4twUM0dM3ue5p2eL3VEFpqcDdhz27SVWZXPzXrui\ndt1oQDcv20won8ooUmqVaTWc2E+Sj4ul057VUsaZzRPbinAfwk00Nrxqg/DVTswej1ah6+\nNjwjmISX8or7MYKH/i5C7Q31pa/tJ9S1mp3zEkczAAAFiMT+hhPE/oYTAAAAB3NzaC1yc2\nEAAAABAMp12ZfVLvl5mfrRTVsRuoZ9BayAvgYDb7bIqnS2oOz/8aSfvOLEkk9N9icnW2Am\n5A9hPqiwCV3R+mRcOuhyo+tZGKPI1lFI2S3uVmRRFbpyNNu/7OPekiOnuqPdqUY8gBZod7\nNbHf2tRzli2AnsjhxLdklQfxCsZX/yjAus+CQ5mZjLVKrj2FwQgz28IRuo1iMmKjb+3fxu\nnJ6OgitFrsh7nrn0hJ/ABUUb+Kbbp0dcVr+CUmr6px7myScQUzJkhPKTa3p2cveHHC6Tky\nIn/pW6thdw5szAjPvMW67/BH0JsC8/im923vMzkXBxoIr9gieIaGv46AQO1Nna0PihROJv\nU+a6Oxn6r247jzQ04B3uLcFDNHTN7rOadni91RBaanA3Yc9u0lVmVz8167onbdaEA3L9tM\nKJ/KKFJqlWk1nNhPko+LpdOe1VLGmc0T24pwH8JNNDa8aoPw1U7MHo9WoevjY8I5iEl/KK\n+zGCh/4uQu0N9aWv7SfUtZqd8xJHMwAAAAMBAAEAAAGASIkghD1krwzaFfqW9GHNqhFwzv\nTxH8ZrZ9lM+LPVxBOOx6RTUAuNP8x2vGBlZHWKj9gPUvB+6pYoV3yTvmQURmWNZmC2KDkp\nVkNlwFsspbf1KCYDAUDkqtGVFCB9rSRP37dd62xhulkyg2Tece/GmmyO3IVygM7DLqv/cM\n9vt8rLNOrkUrV+9r0TyDJ2yiobTkyGI138ukwG4Oe9yzMUA9AGdikcuv9Y5AG5fE6GCBMV\nIVfXn2xeI7wbpVs783n/ZF/6ZvnDgaUUr3cA2VqcqdM8J/5Yj+B6g5iiYttoohMn14e90Y\n2SdbL8mR/F1QHuhkrqOHLjoPL7NAbe2hYAq4r1p/pHfbej2GrsJ/o1DYz91uABi1/gix6l\nEiHChD7VMj8S9pi58kjg6Df+LlCGBEu9mfHddB52kwQFxj55XBrWhnQby/e8Yn0gERLWTk\nD8iNPtiUSs4tWPIUlHk1THHY5kGBurDA84XGbYtLfuHAQ8p6jshNcpPScGATAKaw2BABAA\nwQCA8zyHFFGYrpm3KFMho6jnkqAcDVijtiStubX25YHaeQi5e83y1TZvh31zvj3LhApz6d\ngUw546O/5ucMO3B+xOWHITxfHoen5UxjkRZML9Ob8wfZEppB9hAtTJHe985qG1rwfxAXXO\nZYDSyowGBJQ+e/FgaSIm5bQkCxHupTXTLk3rPxEXvZI1+BnYzu7rqj6tcUChJ9tgagJBAy\nJkMkEJha4U6NQi0jXvYPTzQXPASrpg3tFjXMAC1mb5V02s2vwAAADBAPFl5K4CH5aS6/Op\nhuGvaLVMMHthDUCMk5nmrUIX5IOVQSB5Y74bPqst0U7qs56Y40XoAxgV8oti7RpjQiueOx\n2co5usU1oWVVHQSyUT2YTOSMnwOe5CMeigAXlgVokmoxEOB9lRDD6L2i1KpIxJkWMPvtS6\nKTIyWprA9JTyZ2lSo5Raj5ncNcQSpdxPoW25HEgqUi7XA8ftjx7bMYMoHT2wzZHoQvnDpk\nETe3PAyNjwwTVypMuIrTh4YZbLrUDk2wAAAMEA1rT/WjvdgMiup3eFOfqR0Og4YcUBmjwP\nUXcEY65kTDA0U8WoHizq/vUQ2tQhHjM2vASFvO9M079l4+jDj3dzd2qeSw4V5AYJmnhMqZ\n58LXgKQWZswEWnkKKIs5S+OQkcZw1iGTodipFcWW6nUUsaw1TY1esFLMcpeMeAYZ/sgADp\n7XTO1A7WWEG2dQWVVA/SfaFuimgnBOU7EKmbiGjPQFfxGCmgg5pBjNd528ExRw6pGX9b3d\n0hiMiJp6XWOcqJAAAAC216ekBtenprZXRpAQIDBAUGBw==\n-----END OPENSSH PRIVATE KEY-----\n" }, { + "name": "Outline A2", "type": "outline", "server": "131.13.130.120", "link": "https://example.com/3BlbnNzaC1rZXktdjEAAAAABG/shadowbox_config.json" }, { + "name": "Outline A3", "type": "outline", "server": "131.13.130.121", "apiUrl": "https://131.13.130.121:21230/4pn_faGFTa-bci6IA6ctYB", @@ -31,11 +36,13 @@ ], "servers": [ { + "name": "Server A0", "target": "45.10.10.10:8081", "method": "chacha20-ietf-poly1305", "password": "mypassword" }, { + "name": "Server A1", "target": "jp.myss.cloudflare.com:18080", "method": "aes-128-gcm", "password": "hereismypasswrod" @@ -43,4 +50,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/main.go b/main.go index ae20213..181f42e 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,7 @@ type MapPortDispatcher map[int]*[len(protocols)]dispatcher.Dispatcher type SyncMapPortDispatcher struct { sync.Mutex - Map MapPortDispatcher + Map MapPortDispatcher } func NewSyncMapPortDispatcher() *SyncMapPortDispatcher { diff --git a/reload.go b/reload.go index 86c734a..6756be7 100644 --- a/reload.go +++ b/reload.go @@ -5,7 +5,6 @@ import ( "log" ) - func ReloadConfig() { log.Println("Reloading configuration") mPortDispatcher.Lock()