From ed3da635947966413daa069d1449b5b81fca87b8 Mon Sep 17 00:00:00 2001 From: Ray Green Date: Tue, 6 Apr 2021 16:16:04 +0800 Subject: [PATCH] Merge PR: support the function of calling web3 api in websocket (#795) Co-authored-by: Zhong Qiu <36867992+zhongqiuwood@users.noreply.github.com> --- app/rpc/tests/rpc_test.go | 2 +- app/rpc/websockets/pubsub_api.go | 37 +++++++++++---- app/rpc/websockets/server.go | 78 ++++++++++++-------------------- 3 files changed, 56 insertions(+), 61 deletions(-) diff --git a/app/rpc/tests/rpc_test.go b/app/rpc/tests/rpc_test.go index 08cb363d6e..2d6c7f3ed4 100644 --- a/app/rpc/tests/rpc_test.go +++ b/app/rpc/tests/rpc_test.go @@ -1306,7 +1306,7 @@ func excuteInvalidMessage(t *testing.T, ws *websocket.Conn, message []byte) { var res Response require.NoError(t, json.Unmarshal(msg[:n], &res)) require.Equal(t, -32600, res.Error.Code) - require.Equal(t, 0, res.ID) + require.Equal(t, 1, res.ID) } func TestWebsocket_PendingTransaction(t *testing.T) { diff --git a/app/rpc/websockets/pubsub_api.go b/app/rpc/websockets/pubsub_api.go index 1efbedc77b..64bb337e44 100644 --- a/app/rpc/websockets/pubsub_api.go +++ b/app/rpc/websockets/pubsub_api.go @@ -77,6 +77,7 @@ func (api *PubSubAPI) unsubscribe(id rpc.ID) bool { } close(api.filters[id].unsubscribed) delete(api.filters, id) + api.logger.Debug("unsubscribe client", "ID", id) return true } @@ -102,7 +103,7 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { data, _ := event.Data.(tmtypes.EventDataNewBlockHeader) headerWithBlockHash, err := rpctypes.EthHeaderWithBlockHashFromTendermint(&data.Header) if err != nil { - api.logger.Error("failed to get header with block hash", err) + api.logger.Error("failed to get header with block hash", "error", err) continue } @@ -120,7 +121,9 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error("error writing header") + api.logger.Error("failed to write header", "ID", sub.ID(), "error", err) + } else { + api.logger.Debug("successfully write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number) } } api.filtersMu.Unlock() @@ -128,11 +131,12 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { if err == websocket.ErrCloseSent { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return @@ -222,6 +226,10 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp } logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + if len(logs) == 0 { + api.logger.Debug("no matched logs", "ID", sub.ID()) + continue + } api.filtersMu.Lock() if f, found := api.filters[sub.ID()]; found { @@ -237,9 +245,10 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp res.Params.Result = singleLog err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error(fmt.Sprintf("failed to write header: %s", err)) + api.logger.Error("failed to write log", "ID", sub.ID(), "error", err) break } + api.logger.Debug("successfully write log", "ID", sub.ID(), "txhash", singleLog.TxHash) } } api.filtersMu.Unlock() @@ -247,11 +256,12 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp if err == websocket.ErrCloseSent { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return @@ -372,7 +382,9 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error(fmt.Sprintf("failed to write header: %s", err.Error())) + api.logger.Error("failed to write pending tx", "ID", sub.ID(), "error", err) + } else { + api.logger.Debug("successfully write pending tx", "ID", sub.ID(), "txhash", txHash) } } api.filtersMu.Unlock() @@ -380,11 +392,12 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID if err == websocket.ErrCloseSent { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return @@ -426,7 +439,8 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { newStatus, err := api.clientCtx.Client.Status() if err != nil { - api.logger.Error("error get sync status: %s", err.Error()) + api.logger.Error(fmt.Sprintf("error get sync status: %s", err.Error())) + continue } if !newStatus.SyncInfo.CatchingUp { @@ -453,7 +467,9 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error("error writing syncing") + api.logger.Error("failed to write syncing status", "ID", sub.ID(), "error", err) + } else { + api.logger.Debug("successfully write syncing status", "ID", sub.ID()) } } api.filtersMu.Unlock() @@ -462,11 +478,12 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return diff --git a/app/rpc/websockets/server.go b/app/rpc/websockets/server.go index 6b4019ffcc..18fcb5d82d 100644 --- a/app/rpc/websockets/server.go +++ b/app/rpc/websockets/server.go @@ -1,25 +1,21 @@ package websockets import ( - "bufio" "bytes" "encoding/json" "fmt" "io/ioutil" "math/big" - "net" "net/http" "strings" + "github.com/cosmos/cosmos-sdk/client/context" + "github.com/cosmos/cosmos-sdk/server" + "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/spf13/viper" - "github.com/tendermint/tendermint/libs/log" - - "github.com/ethereum/go-ethereum/rpc" - - context "github.com/cosmos/cosmos-sdk/client/context" ) // Server defines a server that handles Ethereum websockets. @@ -32,8 +28,20 @@ type Server struct { // NewServer creates a new websocket server instance. func NewServer(clientCtx context.CLIContext, log log.Logger, wsAddr string) *Server { + restServerAddr := viper.GetString(server.FlagListenAddr) + parts := strings.SplitN(restServerAddr, "://", 2) + if len(parts) != 2 { + panic(fmt.Errorf("invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", restServerAddr)) + } + url := parts[1] + urlParts := strings.SplitN(url, ":", 2) + if len(urlParts) != 2 { + panic(fmt.Errorf("invalid listening address %s (use ip:port as an url)", url)) + } + port := urlParts[1] + return &Server{ - rpcAddr: viper.GetString("laddr"), + rpcAddr: "http://localhost:" + port, wsAddr: wsAddr, api: NewAPI(clientCtx, log), logger: log.With("module", "websocket-server"), @@ -76,7 +84,7 @@ func (s *Server) sendErrResponse(conn *websocket.Conn, msg string) { Code: big.NewInt(-32600), Message: msg, }, - ID: nil, + ID: big.NewInt(1), } err := conn.WriteJSON(res) if err != nil { @@ -123,10 +131,10 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { err = wsConn.WriteJSON(res) if err != nil { - s.logger.Error("failed to write json response", err) + s.logger.Error("failed to write json response", "ID", id, "error", err) continue } - + s.logger.Debug("successfully subscribe", "ID", id) continue } else if method.(string) == "eth_unsubscribe" { ids, ok := msg["params"].([]interface{}) @@ -134,13 +142,13 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { s.sendErrResponse(wsConn, "invalid parameters") continue } - - if _, idok := ids[0].(string); !ok || !idok { + id, idok := ids[0].(string) + if !ok || !idok { s.sendErrResponse(wsConn, "invalid parameters") continue } - ok = s.api.unsubscribe(rpc.ID(ids[0].(string))) + ok = s.api.unsubscribe(rpc.ID(id)) res := &SubscriptionResponseJSON{ Jsonrpc: "2.0", ID: 1, @@ -149,10 +157,10 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { err = wsConn.WriteJSON(res) if err != nil { - s.logger.Error("failed to write json response", err) + s.logger.Error("failed to write json response", "ID", id, "error", err) continue } - + s.logger.Debug("successfully unsubscribe", "ID", id) continue } @@ -167,47 +175,17 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { // tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response // to the client over websockets func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error { - addr := strings.Split(s.rpcAddr, "tcp://") - if len(addr) != 2 { - return fmt.Errorf("invalid laddr %s", s.rpcAddr) - } - - tcpConn, err := net.Dial("tcp", addr[1]) - if err != nil { - return fmt.Errorf("cannot connect to %s; %s", s.rpcAddr, err) - } - - buf := &bytes.Buffer{} - _, err = buf.Write(mb) - if err != nil { - return fmt.Errorf("failed to write message; %s", err) - } - - req, err := http.NewRequest("POST", s.rpcAddr, buf) + req, err := http.NewRequest(http.MethodPost, s.rpcAddr, bytes.NewReader(mb)) if err != nil { return fmt.Errorf("failed to request; %s", err) } - - req.Header.Set("Content-Type", "application/json;") - err = req.Write(tcpConn) + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("failed to write to rest-server; %s", err) } - respBytes, err := ioutil.ReadAll(tcpConn) - if err != nil { - return fmt.Errorf("error reading response from rest-server; %s", err) - } - - respbuf := &bytes.Buffer{} - respbuf.Write(respBytes) - resp, err := http.ReadResponse(bufio.NewReader(respbuf), req) - if err != nil { - return fmt.Errorf("could not read response; %s", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) if err != nil { return fmt.Errorf("could not read body from response; %s", err) @@ -220,4 +198,4 @@ func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error { } return conn.WriteJSON(wsSend) -} +} \ No newline at end of file