Skip to content

Commit

Permalink
⬇️ Concurrently pull from all upstreams (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
database64128 authored Aug 15, 2021
1 parent 5096557 commit 494c281
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 33 deletions.
72 changes: 50 additions & 22 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ type Config struct {
ConfPath string `json:"-"`
Groups []Group `json:"groups"`
}

type Server struct {
Name string `json:"name"`
Target string `json:"target"`
Method string `json:"method"`
Password string `json:"password"`
MasterKey []byte `json:"-"`
UpstreamConf *UpstreamConf `json:"-"`
}

type Group struct {
Name string `json:"name"`
Port int `json:"port"`
Servers []Server `json:"servers"`
Upstreams []UpstreamConf `json:"upstreams"`
UserContextPool *UserContextPool `json:"-"`
}

type UpstreamConf map[string]string

const (
Expand Down Expand Up @@ -78,6 +83,7 @@ func (g *Group) BuildMasterKeys() {
s.MasterKey = cipher.EVPBytesToKey(s.Password, cipher.CiphersConf[s.Method].KeyLen)
}
}

func (g *Group) BuildUserContextPool(timeout time.Duration) {
g.UserContextPool = (*UserContextPool)(lru.New(lru.FixedTimeout, int64(timeout)))
}
Expand All @@ -92,6 +98,7 @@ func (config *Config) CheckMethodSupported() error {
}
return nil
}

func (config *Config) CheckDiverseCombinations() error {
groups := config.Groups
type methodPasswd struct {
Expand All @@ -112,43 +119,63 @@ func (config *Config) CheckDiverseCombinations() error {
}
return nil
}

func pullFromUpstream(upstream Upstream, upstreamConf *UpstreamConf) ([]Server, error) {
servers, err := upstream.GetServers()
if err != nil {
return nil, err
}
for i := range servers {
servers[i].UpstreamConf = upstreamConf
}
return servers, nil
}

func parseUpstreams(config *Config) (err error) {
logged := false
var wg sync.WaitGroup

for i := range config.Groups {
g := &config.Groups[i]
for j, upstreamConf := range g.Upstreams {
group := &config.Groups[i]
mu := sync.Mutex{}
for i := range group.Upstreams {
var upstream Upstream
switch upstreamConf["type"] {
upstreamConf := &group.Upstreams[i]

switch (*upstreamConf)["type"] {
case "outline":
var outline Outline
err = Map2Upstream(upstreamConf, &outline)
err = Map2Upstream(*upstreamConf, &outline)
if err != nil {
return
return err
}
upstream = outline
default:
return fmt.Errorf("unknown upstream type: %v", upstreamConf["type"])
return fmt.Errorf("unknown upstream type: %v", (*upstreamConf)["type"])
}
if !logged {
log.Println("pulling configures from upstreams...")
logged = true
}
servers, err := upstream.GetServers()
if err != nil {
if netError := new(net.Error); errors.As(err, netError) {
upstreamConf[PullingErrorKey] = PullingErrorNetError

wg.Add(1)
go func(group *Group, upstreamConf *UpstreamConf) {
defer wg.Done()
servers, err := pullFromUpstream(upstream, upstreamConf)
if err != nil {
if netError := new(net.Error); errors.As(err, netError) {
(*upstreamConf)[PullingErrorKey] = PullingErrorNetError
}
log.Printf("[warning] Failed to pull from group %s upstream %s: %v\n", group.Name, upstream.GetName(), err)
return
}
log.Printf("[warning] Failed to retrieve configure from groups[%d].upstreams[%d]: %v: %v\n", i, j, err, upstreamConf[PullingErrorKey])
continue
}
for i := range servers {
servers[i].UpstreamConf = &upstreamConf
}
g.Servers = append(g.Servers, servers...)
mu.Lock()
group.Servers = append(group.Servers, servers...)
mu.Unlock()
log.Printf("Pulled %d servers from group %s upstream %s", len(servers), group.Name, upstream.GetName())
}(group, upstreamConf)
}
}

wg.Wait()
return nil
}

func check(config *Config) (err error) {
if err = config.CheckMethodSupported(); err != nil {
return
Expand All @@ -158,6 +185,7 @@ func check(config *Config) (err error) {
}
return
}

func build(config *Config) {
for i := range config.Groups {
g := &config.Groups[i]
Expand Down
14 changes: 10 additions & 4 deletions config/outline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
)

type Outline struct {
Name string `json:"name"`
Type string `json:"type"`
Server string `json:"server"`
Link string `json:"link"`
Expand Down Expand Up @@ -208,6 +209,10 @@ func sudoCombinedOutput(client *ssh.Client, password string, cmd string) (b []by
return b, err
}

func (outline Outline) GetName() string {
return outline.Name
}

func (outline Outline) GetServers() (servers []Server, err error) {
defer func() {
if err != nil {
Expand All @@ -223,7 +228,7 @@ func (outline Outline) GetServers() (servers []Server, err error) {
if err != nil {
return
}
return conf.ToServers(outline.Server), nil
return conf.ToServers(outline.Name, outline.Server), nil
}

type AccessKey struct {
Expand All @@ -235,12 +240,13 @@ type AccessKey struct {
Method string `json:"method"` // the alias of EncryptionMethod
}

func (key *AccessKey) ToServer(host string) Server {
func (key *AccessKey) ToServer(name, host string) Server {
method := key.EncryptionMethod
if method == "" {
method = key.Method
}
return Server{
Name: fmt.Sprintf("%s - %s", name, key.Name),
Target: net.JoinHostPort(host, strconv.Itoa(key.Port)),
Method: method,
Password: key.Password,
Expand All @@ -251,10 +257,10 @@ type ShadowboxConfig struct {
AccessKeys []AccessKey `json:"accessKeys"`
}

func (c *ShadowboxConfig) ToServers(host string) []Server {
func (c *ShadowboxConfig) ToServers(name, host string) []Server {
var servers []Server
for _, k := range c.AccessKeys {
servers = append(servers, k.ToServer(host))
servers = append(servers, k.ToServer(name, host))
}
return servers
}
1 change: 1 addition & 0 deletions config/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

type Upstream interface {
GetName() string
GetServers() (servers []Server, err error)
}

Expand Down
6 changes: 3 additions & 3 deletions dispatcher/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ func (d *TCP) handleConn(conn net.Conn) error {
// dial and relay
rc, err := net.Dial("tcp", server.Target)
if err != nil {
return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn dial error: %w", conn.RemoteAddr(), conn.LocalAddr(), rc.RemoteAddr(), err)
return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn dial error: %w", conn.RemoteAddr(), conn.LocalAddr(), server.Target, err)
}
_ = rc.(*net.TCPConn).SetKeepAlive(true)

_ = rc.SetDeadline(time.Now().Add(DefaultTimeout))
_, err = rc.Write(data[:n])
if err != nil {
return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn write error: %w", conn.RemoteAddr(), conn.LocalAddr(), rc.RemoteAddr(), err)
return fmt.Errorf("[tcp] %s <-> %s <-x-> %s handleConn write error: %w", conn.RemoteAddr(), conn.LocalAddr(), server.Target, err)
}

log.Printf("[tcp] %s <-> %s <-> %s", conn.RemoteAddr(), conn.LocalAddr(), rc.RemoteAddr())
log.Printf("[tcp] %s <-> %s <-> %s", conn.RemoteAddr(), conn.LocalAddr(), server.Target)

if err := relay(conn, rc); err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand Down
9 changes: 8 additions & 1 deletion example.json
Original file line number Diff line number Diff line change
@@ -1,39 +1,46 @@
{
"groups": [
{
"name": "Group A",
"port": 1090,
"servers": [
{
"name": "Server A0",
"target": "45.10.10.10:8081",
"method": "chacha20-ietf-poly1305",
"password": "mypassword"
},
{
"name": "Server A1",
"target": "jp.myss.cloudflare.com:18080",
"method": "aes-128-gcm",
"password": "hereismypasswrod"
}
]
},
{
"name": "Group B",
"port": 1091,
"servers": [
{
"name": "Server B0",
"target": "45.10.10.11:8088",
"method": "chacha20-ietf-poly1305",
"password": "mypassword"
},
{
"name": "Server B1",
"target": "45.10.10.12:18088",
"method": "chacha20-ietf-poly1305",
"password": "mypassword2"
},
{
"name": "Server B2",
"target": "jp.myss.cloudflare.com:18080",
"method": "aes-128-gcm",
"password": "hereismypasswrod"
}
]
}
]
}
}
9 changes: 8 additions & 1 deletion example_fullview.json
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
{
"groups": [
{
"name": "Group A",
"port": 1090,
"upstreams": [
{
"name": "Outline A0",
"type": "outline",
"server": "23.115.204.133",
"sshPort": "22",
"sshUsername": "root",
"sshPassword": "bt92ew4yTTRfPHL335"
},
{
"name": "Outline A1",
"type": "outline",
"server": "103.10.23.145",
"sshPort": "22",
"sshUsername": "outline",
"sshPrivateKey": "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn\nNhAAAAAwEAAQAAAYEAynXZl9Uu+XmZ+tFNWxG6hn0FrIC+BgNvtsiqdLag7P/xpJ+84sSS\nT032JydbYCbkD2E+qLAJXdH6ZFw65HKj31kYo8jWUUjZLe5WZFEVunI027/s496SI6e6o9\n2pRjyAFmh3s1sd/a1HOWLYCeyOHEt2SVB/EKxlf/KMC6z4JDmZmMtUquPYXBCDPbwhG6jW\nIyYqNv7d/G6cno6CK0WuyHueufSEn8AFRRv4ptunR1xWv4JSavqnHubJJxBFMmSE8pNren\nZy94ccLpOTIif+lbq2F3DmzMCM+8xbrv8EfQmwLz+Kb3be8zORcHGgiv2CJ4hoa/joBA7U\n25rQ+KFE4m9T5ro7Gf2vbjuPNDTgHe4twUM0dM3ue5p2eL3VEFpqcDdhz27SVWZXPzXrui\ndt1oQDcv20won8ooUmqVaTWc2E+Sj4ul057VUsaZzRPbinAfwk00Nrxqg/DVTswej1ah6+\nNjwjmISX8or7MYKH/i5C7Q31pa/tJ9S1mp3zEkczAAAFiMT+hhPE/oYTAAAAB3NzaC1yc2\nEAAAABAMp12ZfVLvl5mfrRTVsRuoZ9BayAvgYDb7bIqnS2oOz/8aSfvOLEkk9N9icnW2Am\n5A9hPqiwCV3R+mRcOuhyo+tZGKPI1lFI2S3uVmRRFbpyNNu/7OPekiOnuqPdqUY8gBZod7\nNbHf2tRzli2AnsjhxLdklQfxCsZX/yjAus+CQ5mZjLVKrj2FwQgz28IRuo1iMmKjb+3fxu\nnJ6OgitFrsh7nrn0hJ/ABUUb+Kbbp0dcVr+CUmr6px7myScQUzJkhPKTa3p2cveHHC6Tky\nIn/pW6thdw5szAjPvMW67/BH0JsC8/im923vMzkXBxoIr9gieIaGv46AQO1Nna0PihROJv\nU+a6Oxn6r247jzQ04B3uLcFDNHTN7rOadni91RBaanA3Yc9u0lVmVz8167onbdaEA3L9tM\nKJ/KKFJqlWk1nNhPko+LpdOe1VLGmc0T24pwH8JNNDa8aoPw1U7MHo9WoevjY8I5iEl/KK\n+zGCh/4uQu0N9aWv7SfUtZqd8xJHMwAAAAMBAAEAAAGASIkghD1krwzaFfqW9GHNqhFwzv\nTxH8ZrZ9lM+LPVxBOOx6RTUAuNP8x2vGBlZHWKj9gPUvB+6pYoV3yTvmQURmWNZmC2KDkp\nVkNlwFsspbf1KCYDAUDkqtGVFCB9rSRP37dd62xhulkyg2Tece/GmmyO3IVygM7DLqv/cM\n9vt8rLNOrkUrV+9r0TyDJ2yiobTkyGI138ukwG4Oe9yzMUA9AGdikcuv9Y5AG5fE6GCBMV\nIVfXn2xeI7wbpVs783n/ZF/6ZvnDgaUUr3cA2VqcqdM8J/5Yj+B6g5iiYttoohMn14e90Y\n2SdbL8mR/F1QHuhkrqOHLjoPL7NAbe2hYAq4r1p/pHfbej2GrsJ/o1DYz91uABi1/gix6l\nEiHChD7VMj8S9pi58kjg6Df+LlCGBEu9mfHddB52kwQFxj55XBrWhnQby/e8Yn0gERLWTk\nD8iNPtiUSs4tWPIUlHk1THHY5kGBurDA84XGbYtLfuHAQ8p6jshNcpPScGATAKaw2BABAA\nwQCA8zyHFFGYrpm3KFMho6jnkqAcDVijtiStubX25YHaeQi5e83y1TZvh31zvj3LhApz6d\ngUw546O/5ucMO3B+xOWHITxfHoen5UxjkRZML9Ob8wfZEppB9hAtTJHe985qG1rwfxAXXO\nZYDSyowGBJQ+e/FgaSIm5bQkCxHupTXTLk3rPxEXvZI1+BnYzu7rqj6tcUChJ9tgagJBAy\nJkMkEJha4U6NQi0jXvYPTzQXPASrpg3tFjXMAC1mb5V02s2vwAAADBAPFl5K4CH5aS6/Op\nhuGvaLVMMHthDUCMk5nmrUIX5IOVQSB5Y74bPqst0U7qs56Y40XoAxgV8oti7RpjQiueOx\n2co5usU1oWVVHQSyUT2YTOSMnwOe5CMeigAXlgVokmoxEOB9lRDD6L2i1KpIxJkWMPvtS6\nKTIyWprA9JTyZ2lSo5Raj5ncNcQSpdxPoW25HEgqUi7XA8ftjx7bMYMoHT2wzZHoQvnDpk\nETe3PAyNjwwTVypMuIrTh4YZbLrUDk2wAAAMEA1rT/WjvdgMiup3eFOfqR0Og4YcUBmjwP\nUXcEY65kTDA0U8WoHizq/vUQ2tQhHjM2vASFvO9M079l4+jDj3dzd2qeSw4V5AYJmnhMqZ\n58LXgKQWZswEWnkKKIs5S+OQkcZw1iGTodipFcWW6nUUsaw1TY1esFLMcpeMeAYZ/sgADp\n7XTO1A7WWEG2dQWVVA/SfaFuimgnBOU7EKmbiGjPQFfxGCmgg5pBjNd528ExRw6pGX9b3d\n0hiMiJp6XWOcqJAAAAC216ekBtenprZXRpAQIDBAUGBw==\n-----END OPENSSH PRIVATE KEY-----\n"
},
{
"name": "Outline A2",
"type": "outline",
"server": "131.13.130.120",
"link": "https://example.com/3BlbnNzaC1rZXktdjEAAAAABG/shadowbox_config.json"
},
{
"name": "Outline A3",
"type": "outline",
"server": "131.13.130.121",
"apiUrl": "https://131.13.130.121:21230/4pn_faGFTa-bci6IA6ctYB",
Expand All @@ -31,16 +36,18 @@
],
"servers": [
{
"name": "Server A0",
"target": "45.10.10.10:8081",
"method": "chacha20-ietf-poly1305",
"password": "mypassword"
},
{
"name": "Server A1",
"target": "jp.myss.cloudflare.com:18080",
"method": "aes-128-gcm",
"password": "hereismypasswrod"
}
]
}
]
}
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type MapPortDispatcher map[int]*[len(protocols)]dispatcher.Dispatcher

type SyncMapPortDispatcher struct {
sync.Mutex
Map MapPortDispatcher
Map MapPortDispatcher
}

func NewSyncMapPortDispatcher() *SyncMapPortDispatcher {
Expand Down
1 change: 0 additions & 1 deletion reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
)


func ReloadConfig() {
log.Println("Reloading configuration")
mPortDispatcher.Lock()
Expand Down

0 comments on commit 494c281

Please sign in to comment.