diff --git a/README.md b/README.md index 83f52c059..06c32f51f 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ - [x] HTTP API - [x] 反向HTTP POST - [x] 正向Websocket -- [ ] 反向Websocket (开发中) +- [x] 反向Websocket (测试中) #### 实现
已实现API diff --git a/coolq/bot.go b/coolq/bot.go index a6dbda66f..0c1c7ce9c 100644 --- a/coolq/bot.go +++ b/coolq/bot.go @@ -38,7 +38,7 @@ func NewQQBot(cli *client.QQClient, conf *global.JsonConfig) *CQBot { opt.EntryIdxMode = nutsdb.HintBPTSparseIdxMode db, err := nutsdb.Open(opt) if err != nil { - log.Fatalf("打开数据库失败, 如果频繁遇到此问题请关闭数据库功能。") + log.Fatalf("打开数据库失败, 如果频繁遇到此问题请清理 data/db 文件夹或关闭数据库功能。") } bot.db = db gob.Register(message.Sender{}) diff --git a/global/config.go b/global/config.go index 16c79226d..8f8831309 100644 --- a/global/config.go +++ b/global/config.go @@ -6,14 +6,15 @@ import ( ) type JsonConfig struct { - Uin int64 `json:"uin"` - Password string `json:"password"` - EnableDB bool `json:"enable_db"` - AccessToken string `json:"access_token"` - Reconnect bool `json:"reconnect"` - ReconnectDelay int `json:"reconnect_delay"` - HttpConfig *GoCQHttpConfig `json:"http_config"` - WSConfig *GoCQWebsocketConfig `json:"ws_config"` + Uin int64 `json:"uin"` + Password string `json:"password"` + EnableDB bool `json:"enable_db"` + AccessToken string `json:"access_token"` + ReLogin bool `json:"relogin"` + ReLoginDelay int `json:"relogin_delay"` + HttpConfig *GoCQHttpConfig `json:"http_config"` + WSConfig *GoCQWebsocketConfig `json:"ws_config"` + ReverseServers []*GoCQReverseWebsocketConfig `json:"ws_reverse_servers"` } type CQHttpApiConfig struct { @@ -48,6 +49,14 @@ type GoCQWebsocketConfig struct { Port uint16 `json:"port"` } +type GoCQReverseWebsocketConfig struct { + Enabled bool `json:"enabled"` + ReverseUrl string `json:"reverse_url"` + ReverseApiUrl string `json:"reverse_api_url"` + ReverseEventUrl string `json:"reverse_event_url"` + ReverseReconnectInterval uint16 `json:"reverse_reconnect_interval"` +} + func DefaultConfig() *JsonConfig { return &JsonConfig{ EnableDB: true, @@ -62,6 +71,15 @@ func DefaultConfig() *JsonConfig { Host: "0.0.0.0", Port: 6700, }, + ReverseServers: []*GoCQReverseWebsocketConfig{ + { + Enabled: false, + ReverseUrl: "ws://you_websocket_universal.server", + ReverseApiUrl: "ws://you_websocket_api.server", + ReverseEventUrl: "ws://you_websocket_event.server", + ReverseReconnectInterval: 3000, + }, + }, } } diff --git a/go.mod b/go.mod index 166d3a883..c5116a4e0 100644 --- a/go.mod +++ b/go.mod @@ -15,5 +15,6 @@ require ( github.com/tidwall/gjson v1.6.0 github.com/xujiajun/nutsdb v0.5.0 golang.org/x/image v0.0.0-20200618115811-c13761719519 + golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect ) diff --git a/main.go b/main.go index f225c9e94..449c84622 100644 --- a/main.go +++ b/main.go @@ -131,18 +131,21 @@ func main() { if conf.HttpConfig != nil && conf.HttpConfig.Enabled { server.HttpServer.Run(fmt.Sprintf("%s:%d", conf.HttpConfig.Host, conf.HttpConfig.Port), conf.AccessToken, b) for k, v := range conf.HttpConfig.PostUrls { - server.NewClient().Run(k, v, b) + server.NewHttpClient().Run(k, v, b) } } if conf.WSConfig != nil && conf.WSConfig.Enabled { server.WebsocketServer.Run(fmt.Sprintf("%s:%d", conf.WSConfig.Host, conf.WSConfig.Port), conf.AccessToken, b) } + for _, rc := range conf.ReverseServers { + server.NewWebsocketClient(rc, conf.AccessToken, b).Run() + } log.Info("资源初始化完成, 开始处理信息.") log.Info("アトリは、高性能ですから!") cli.OnDisconnected(func(bot *client.QQClient, e *client.ClientDisconnectedEvent) { - if conf.Reconnect { - log.Warnf("Bot已离线,将在 %v 秒后尝试重连.", conf.ReconnectDelay) - time.Sleep(time.Second * time.Duration(conf.ReconnectDelay)) + if conf.ReLogin { + log.Warnf("Bot已离线,将在 %v 秒后尝试重连.", conf.ReLoginDelay) + time.Sleep(time.Second * time.Duration(conf.ReLoginDelay)) rsp, err := cli.Login() if err != nil { log.Fatalf("重连失败: %v", err) diff --git a/server/http.go b/server/http.go index 5177a85f0..56d06bb85 100644 --- a/server/http.go +++ b/server/http.go @@ -150,7 +150,7 @@ func (s *httpServer) Run(addr, authToken string, bot *coolq.CQBot) { }() } -func NewClient() *httpClient { +func NewHttpClient() *httpClient { return &httpClient{} } @@ -166,9 +166,8 @@ func (c *httpClient) onBotPushEvent(m coolq.MSG) { var res string err := gout.POST(c.addr).SetJSON(m).BindBody(&res).SetHeader(func() gout.H { h := gout.H{ - "X-Self_ID": c.bot.Client.Uin, - "X-Client-Role": "Universal", - "User-Agent": "CQHttp/4.15.0", + "X-Self-ID": c.bot.Client.Uin, + "User-Agent": "CQHttp/4.15.0", } if c.secret != "" { mac := hmac.New(sha1.New, []byte(c.secret)) diff --git a/server/websocket.go b/server/websocket.go index 65c7011be..cafae224b 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -3,10 +3,13 @@ package server import ( "fmt" "github.com/Mrs4s/go-cqhttp/coolq" + "github.com/Mrs4s/go-cqhttp/global" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" + wsc "golang.org/x/net/websocket" "net/http" + "strconv" "strings" "sync" "time" @@ -21,6 +24,13 @@ type websocketServer struct { } type websocketClient struct { + conf *global.GoCQReverseWebsocketConfig + token string + bot *coolq.CQBot + + pushLock *sync.Mutex + universalConn *wsc.Conn + eventConn *wsc.Conn } var WebsocketServer = &websocketServer{} @@ -46,6 +56,149 @@ func (s *websocketServer) Run(addr, authToken string, b *coolq.CQBot) { }() } +func NewWebsocketClient(conf *global.GoCQReverseWebsocketConfig, authToken string, b *coolq.CQBot) *websocketClient { + return &websocketClient{conf: conf, token: authToken, bot: b, pushLock: new(sync.Mutex)} +} + +func (c *websocketClient) Run() { + if !c.conf.Enabled { + return + } + if c.conf.ReverseApiUrl != "" { + c.connectApi() + } + if c.conf.ReverseEventUrl != "" { + c.connectEvent() + } + if c.conf.ReverseUrl != "" { + c.connectUniversal() + } + c.bot.OnEventPush(c.onBotPushEvent) +} + +func (c *websocketClient) connectApi() { + log.Infof("开始尝试连接到反向Websocket API服务器: %v", c.conf.ReverseApiUrl) + wsConf, err := wsc.NewConfig(c.conf.ReverseApiUrl, c.conf.ReverseApiUrl) + if err != nil { + log.Warnf("连接到反向Websocket API服务器 %v 时出现致命错误: %v", c.conf.ReverseApiUrl, err) + return + } + wsConf.Header["X-Client-Role"] = []string{"API"} + wsConf.Header["X-Self-ID"] = []string{strconv.FormatInt(c.bot.Client.Uin, 10)} + wsConf.Header["User-Agent"] = []string{"CQHttp/4.15.0"} + if c.token != "" { + wsConf.Header["Authorization"] = []string{"Token " + c.token} + } + conn, err := wsc.DialConfig(wsConf) + if err != nil { + log.Warnf("连接到反向Websocket API服务器 %v 时出现错误: %v", c.conf.ReverseApiUrl, err) + if c.conf.ReverseReconnectInterval != 0 { + time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval)) + c.connectApi() + } + return + } + log.Infof("已连接到反向Websocket API服务器 %v", c.conf.ReverseApiUrl) + go c.listenApi(conn, false) +} + +func (c *websocketClient) connectEvent() { + log.Infof("开始尝试连接到反向Websocket Event服务器: %v", c.conf.ReverseEventUrl) + wsConf, err := wsc.NewConfig(c.conf.ReverseEventUrl, c.conf.ReverseEventUrl) + if err != nil { + log.Warnf("连接到反向Websocket Event服务器 %v 时出现致命错误: %v", c.conf.ReverseApiUrl, err) + return + } + wsConf.Header["X-Client-Role"] = []string{"Event"} + wsConf.Header["X-Self-ID"] = []string{strconv.FormatInt(c.bot.Client.Uin, 10)} + wsConf.Header["User-Agent"] = []string{"CQHttp/4.15.0"} + if c.token != "" { + wsConf.Header["Authorization"] = []string{"Token " + c.token} + } + conn, err := wsc.DialConfig(wsConf) + if err != nil { + log.Warnf("连接到反向Websocket API服务器 %v 时出现错误: %v", c.conf.ReverseApiUrl, err) + if c.conf.ReverseReconnectInterval != 0 { + time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval)) + c.connectApi() + } + return + } + log.Infof("已连接到反向Websocket Event服务器 %v", c.conf.ReverseEventUrl) + c.eventConn = conn +} + +func (c *websocketClient) connectUniversal() { + log.Infof("开始尝试连接到反向Websocket Universal服务器: %v", c.conf.ReverseUrl) + wsConf, err := wsc.NewConfig(c.conf.ReverseUrl, c.conf.ReverseUrl) + if err != nil { + log.Warnf("连接到反向Websocket Universal服务器 %v 时出现致命错误: %v", c.conf.ReverseUrl, err) + return + } + wsConf.Header["X-Client-Role"] = []string{"Universal"} + wsConf.Header["X-Self-ID"] = []string{strconv.FormatInt(c.bot.Client.Uin, 10)} + wsConf.Header["User-Agent"] = []string{"CQHttp/4.15.0"} + if c.token != "" { + wsConf.Header["Authorization"] = []string{"Token " + c.token} + } + conn, err := wsc.DialConfig(wsConf) + if err != nil { + log.Warnf("连接到反向Websocket Universal服务器 %v 时出现错误: %v", c.conf.ReverseUrl, err) + if c.conf.ReverseReconnectInterval != 0 { + time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval)) + c.connectUniversal() + } + return + } + go c.listenApi(conn, true) + c.universalConn = conn +} + +func (c *websocketClient) listenApi(conn *wsc.Conn, u bool) { + defer conn.Close() + for { + buf := make([]byte, 10240) + l, err := conn.Read(buf) + if err != nil { + break + } + j := gjson.ParseBytes(buf[:l]) + t := strings.ReplaceAll(j.Get("action").Str, "_async", "") + if f, ok := wsApi[t]; ok { + ret := f(c.bot, j.Get("params")) + if j.Get("echo").Exists() { + ret["echo"] = j.Get("echo").Value() + } + _, _ = conn.Write([]byte(ret.ToJson())) + } + } + if c.conf.ReverseReconnectInterval != 0 { + time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval)) + if u { + c.connectUniversal() + return + } + c.connectApi() + } +} + +func (c *websocketClient) onBotPushEvent(m coolq.MSG) { + c.pushLock.Lock() + defer c.pushLock.Unlock() + if c.eventConn != nil { + if _, err := c.eventConn.Write([]byte(m.ToJson())); err != nil { + _ = c.eventConn.Close() + if c.conf.ReverseReconnectInterval != 0 { + time.Sleep(time.Millisecond * time.Duration(c.conf.ReverseReconnectInterval)) + c.connectEvent() + } + } + } + if c.universalConn != nil { + _, _ = c.universalConn.Write([]byte(m.ToJson())) + } +} + func (s *websocketServer) event(w http.ResponseWriter, r *http.Request) { if s.token != "" { if r.URL.Query().Get("access_token") != s.token && strings.SplitN(r.Header.Get("Authorization"), " ", 2)[1] != s.token { @@ -114,7 +267,7 @@ func (s *websocketServer) listenApi(c *websocket.Conn) { if t == websocket.TextMessage { j := gjson.ParseBytes(payload) t := strings.ReplaceAll(j.Get("action").Str, "_async", "") //TODO: async support - log.Infof("API调用: %v", j.Get("action").Str) + //log.Infof("API调用: %v", j.Get("action").Str) if f, ok := wsApi[t]; ok { ret := f(s.bot, j.Get("params")) if j.Get("echo").Exists() {