diff --git a/app/rpc/namespaces/eth/filters/filter_system.go b/app/rpc/namespaces/eth/filters/filter_system.go index 60136f297c..a7af9a7a7e 100644 --- a/app/rpc/namespaces/eth/filters/filter_system.go +++ b/app/rpc/namespaces/eth/filters/filter_system.go @@ -112,11 +112,11 @@ func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.Canc switch sub.typ { case filters.PendingTransactionsSubscription: - eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event, 1000) case filters.PendingLogsSubscription, filters.MinedAndPendingLogsSubscription: - eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event, 1000) case filters.LogsSubscription: - eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event, 1000) case filters.BlocksSubscription: eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) default: diff --git a/app/rpc/tests/rpc_test.go b/app/rpc/tests/rpc_test.go index dc67017800..8d4e542509 100644 --- a/app/rpc/tests/rpc_test.go +++ b/app/rpc/tests/rpc_test.go @@ -1306,7 +1306,7 @@ func excuteInvalidMessage(t *testing.T, ws *websocket.Conn, message []byte) { var res Response require.NoError(t, json.Unmarshal(msg[:n], &res)) require.Equal(t, -32600, res.Error.Code) - require.Equal(t, 0, res.ID) + require.Equal(t, 1, res.ID) } func TestWebsocket_PendingTransaction(t *testing.T) { diff --git a/app/rpc/websockets/pubsub_api.go b/app/rpc/websockets/pubsub_api.go index 49901bc393..0510bb4975 100644 --- a/app/rpc/websockets/pubsub_api.go +++ b/app/rpc/websockets/pubsub_api.go @@ -77,6 +77,7 @@ func (api *PubSubAPI) unsubscribe(id rpc.ID) bool { } close(api.filters[id].unsubscribed) delete(api.filters, id) + api.logger.Debug("unsubscribe client", "ID", id) return true } @@ -99,10 +100,14 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { for { select { case event := <-headersCh: - data, _ := event.Data.(tmtypes.EventDataNewBlockHeader) + data, ok := event.Data.(tmtypes.EventDataNewBlockHeader) + if !ok { + api.logger.Error(fmt.Sprintf("invalid data type %T, expected EventDataTx", event.Data), "ID", sub.ID()) + continue + } headerWithBlockHash, err := rpctypes.EthHeaderWithBlockHashFromTendermint(&data.Header) if err != nil { - api.logger.Error("failed to get header with block hash", err) + api.logger.Error("failed to get header with block hash", "error", err) continue } @@ -120,7 +125,9 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error("error writing header") + api.logger.Error("failed to write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number, "error", err) + } else { + api.logger.Debug("successfully write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number) } } api.filtersMu.Unlock() @@ -128,11 +135,12 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { if err == websocket.ErrCloseSent { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return @@ -209,49 +217,58 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp for { select { case event := <-ch: - dataTx, ok := event.Data.(tmtypes.EventDataTx) - if !ok { - err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data) - return - } - - var resultData evmtypes.ResultData - resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data) - if err != nil { - return - } + go func(event coretypes.ResultEvent) { + dataTx, ok := event.Data.(tmtypes.EventDataTx) + if !ok { + api.logger.Error(fmt.Sprintf("invalid event data %T, expected EventDataTx", event.Data)) + return + } - logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + var resultData evmtypes.ResultData + resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data) + if err != nil { + api.logger.Error("failed to decode result data", "error", err) + return + } - api.filtersMu.Lock() - if f, found := api.filters[sub.ID()]; found { - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: sub.ID(), - }, + logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + if len(logs) == 0 { + api.logger.Debug("no matched logs", "ID", sub.ID(), "txhash", resultData.TxHash) + return } - for _, singleLog := range logs { - res.Params.Result = singleLog - err = f.conn.WriteJSON(res) - if err != nil { - api.logger.Error(fmt.Sprintf("failed to write header: %s", err)) - break + + api.filtersMu.Lock() + if f, found := api.filters[sub.ID()]; found { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: sub.ID(), + }, + } + for _, singleLog := range logs { + res.Params.Result = singleLog + err = f.conn.WriteJSON(res) + if err != nil { + api.logger.Error("failed to write log", "ID", sub.ID(), "height", singleLog.BlockNumber, "txhash", singleLog.TxHash, "error", err) + break + } + api.logger.Debug("successfully write log", "ID", sub.ID(), "height", singleLog.BlockNumber, "txhash", singleLog.TxHash) } } - } - api.filtersMu.Unlock() + api.filtersMu.Unlock() - if err == websocket.ErrCloseSent { - api.unsubscribe(sub.ID()) - } - case <-errCh: + if err == websocket.ErrCloseSent { + api.unsubscribe(sub.ID()) + } + }(event) + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return @@ -355,7 +372,11 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID for { select { case ev := <-txsCh: - data, _ := ev.Data.(tmtypes.EventDataTx) + data, ok := ev.Data.(tmtypes.EventDataTx) + if !ok { + api.logger.Error(fmt.Sprintf("invalid data type %T, expected EventDataTx", ev.Data), "ID", sub.ID()) + continue + } txHash := common.BytesToHash(data.Tx.Hash()) api.filtersMu.Lock() @@ -372,7 +393,9 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error(fmt.Sprintf("failed to write header: %s", err.Error())) + api.logger.Error("failed to write pending tx", "ID", sub.ID(), "error", err) + } else { + api.logger.Debug("successfully write pending tx", "ID", sub.ID(), "txhash", txHash) } } api.filtersMu.Unlock() @@ -380,11 +403,12 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID if err == websocket.ErrCloseSent { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return @@ -426,7 +450,8 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { newStatus, err := api.clientCtx.Client.Status() if err != nil { - api.logger.Error("error get sync status: %s", err.Error()) + api.logger.Error(fmt.Sprintf("error get sync status: %s", err.Error())) + continue } if !newStatus.SyncInfo.CatchingUp { @@ -453,7 +478,9 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { err = f.conn.WriteJSON(res) if err != nil { - api.logger.Error("error writing syncing") + api.logger.Error("failed to write syncing status", "ID", sub.ID(), "error", err) + } else { + api.logger.Debug("successfully write syncing status", "ID", sub.ID()) } } api.filtersMu.Unlock() @@ -462,11 +489,12 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { api.unsubscribe(sub.ID()) } - case <-errCh: + case err := <-errCh: api.filtersMu.Lock() sub.Unsubscribe(api.events) delete(api.filters, sub.ID()) api.filtersMu.Unlock() + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) return case <-unsubscribed: return diff --git a/app/rpc/websockets/server.go b/app/rpc/websockets/server.go index 6b4019ffcc..fe8b44fbde 100644 --- a/app/rpc/websockets/server.go +++ b/app/rpc/websockets/server.go @@ -1,25 +1,21 @@ package websockets import ( - "bufio" "bytes" "encoding/json" "fmt" "io/ioutil" "math/big" - "net" "net/http" "strings" + "github.com/cosmos/cosmos-sdk/client/context" + "github.com/cosmos/cosmos-sdk/server" + "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/spf13/viper" - "github.com/tendermint/tendermint/libs/log" - - "github.com/ethereum/go-ethereum/rpc" - - context "github.com/cosmos/cosmos-sdk/client/context" ) // Server defines a server that handles Ethereum websockets. @@ -32,8 +28,20 @@ type Server struct { // NewServer creates a new websocket server instance. func NewServer(clientCtx context.CLIContext, log log.Logger, wsAddr string) *Server { + restServerAddr := viper.GetString(server.FlagListenAddr) + parts := strings.SplitN(restServerAddr, "://", 2) + if len(parts) != 2 { + panic(fmt.Errorf("invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", restServerAddr)) + } + url := parts[1] + urlParts := strings.SplitN(url, ":", 2) + if len(urlParts) != 2 { + panic(fmt.Errorf("invalid listening address %s (use ip:port as an url)", url)) + } + port := urlParts[1] + return &Server{ - rpcAddr: viper.GetString("laddr"), + rpcAddr: "http://localhost:" + port, wsAddr: wsAddr, api: NewAPI(clientCtx, log), logger: log.With("module", "websocket-server"), @@ -76,7 +84,7 @@ func (s *Server) sendErrResponse(conn *websocket.Conn, msg string) { Code: big.NewInt(-32600), Message: msg, }, - ID: nil, + ID: big.NewInt(1), } err := conn.WriteJSON(res) if err != nil { @@ -123,10 +131,10 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { err = wsConn.WriteJSON(res) if err != nil { - s.logger.Error("failed to write json response", err) + s.logger.Error("failed to write json response", "ID", id, "error", err) continue } - + s.logger.Debug("successfully subscribe", "ID", id) continue } else if method.(string) == "eth_unsubscribe" { ids, ok := msg["params"].([]interface{}) @@ -134,13 +142,13 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { s.sendErrResponse(wsConn, "invalid parameters") continue } - - if _, idok := ids[0].(string); !ok || !idok { + id, idok := ids[0].(string) + if !ok || !idok { s.sendErrResponse(wsConn, "invalid parameters") continue } - ok = s.api.unsubscribe(rpc.ID(ids[0].(string))) + ok = s.api.unsubscribe(rpc.ID(id)) res := &SubscriptionResponseJSON{ Jsonrpc: "2.0", ID: 1, @@ -149,10 +157,10 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { err = wsConn.WriteJSON(res) if err != nil { - s.logger.Error("failed to write json response", err) + s.logger.Error("failed to write json response", "ID", id, "error", err) continue } - + s.logger.Debug("successfully unsubscribe", "ID", id) continue } @@ -167,47 +175,17 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { // tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response // to the client over websockets func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error { - addr := strings.Split(s.rpcAddr, "tcp://") - if len(addr) != 2 { - return fmt.Errorf("invalid laddr %s", s.rpcAddr) - } - - tcpConn, err := net.Dial("tcp", addr[1]) - if err != nil { - return fmt.Errorf("cannot connect to %s; %s", s.rpcAddr, err) - } - - buf := &bytes.Buffer{} - _, err = buf.Write(mb) - if err != nil { - return fmt.Errorf("failed to write message; %s", err) - } - - req, err := http.NewRequest("POST", s.rpcAddr, buf) + req, err := http.NewRequest(http.MethodPost, s.rpcAddr, bytes.NewReader(mb)) if err != nil { return fmt.Errorf("failed to request; %s", err) } - - req.Header.Set("Content-Type", "application/json;") - err = req.Write(tcpConn) + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("failed to write to rest-server; %s", err) } - respBytes, err := ioutil.ReadAll(tcpConn) - if err != nil { - return fmt.Errorf("error reading response from rest-server; %s", err) - } - - respbuf := &bytes.Buffer{} - respbuf.Write(respBytes) - resp, err := http.ReadResponse(bufio.NewReader(respbuf), req) - if err != nil { - return fmt.Errorf("could not read response; %s", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) if err != nil { return fmt.Errorf("could not read body from response; %s", err) diff --git a/go.sum b/go.sum index 96f2abf60c..84cc37daed 100644 --- a/go.sum +++ b/go.sum @@ -502,11 +502,7 @@ github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/okex/cosmos-sdk v0.39.2-exchain1 h1:ue618CeIiCtcEKJqGxgumEgS2JlgLDbD8yUdnqS+/+o= github.com/okex/cosmos-sdk v0.39.2-exchain1/go.mod h1:Y1C2roqCVZ6gSYG31X77x4NLcbSGS9VP1GIiAE1imcc= -github.com/okex/cosmos-sdk v0.39.3-0.20210411062155-2683537970bb h1:lpqIwiS+F0RoIpiviIp0WWvAdYiEziSw39A5XIBJGOg= -github.com/okex/cosmos-sdk v0.39.3-0.20210411062155-2683537970bb/go.mod h1:Y1C2roqCVZ6gSYG31X77x4NLcbSGS9VP1GIiAE1imcc= github.com/okex/iavl v0.14.3-exchain h1:kwRIwpFD6B8mDDqoaxeUN3Pg2GW0Vr+sA+b86renWcA= -github.com/okex/iavl v0.14.3-exchain h1:kwRIwpFD6B8mDDqoaxeUN3Pg2GW0Vr+sA+b86renWcA= -github.com/okex/iavl v0.14.3-exchain/go.mod h1:vHLYxU/zuxBmxxr1v+5Vnd/JzcIsyK17n9P9RDubPVU= github.com/okex/iavl v0.14.3-exchain/go.mod h1:vHLYxU/zuxBmxxr1v+5Vnd/JzcIsyK17n9P9RDubPVU= github.com/okex/tendermint v0.33.9-exchain h1:8JuPeB+NgfnPFa5ki89bspLAJzWeIV9C6OFSr1u5IsI= github.com/okex/tendermint v0.33.9-exchain/go.mod h1:EoGTbJUufUueNIigY3zyO6f7GOj29OdpFhuR8sxWdSU= @@ -702,7 +698,6 @@ github.com/tendermint/crypto v0.0.0-20191022145703-50d29ede1e15/go.mod h1:z4YtwM github.com/tendermint/go-amino v0.14.1/go.mod h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso= github.com/tendermint/go-amino v0.15.1 h1:D2uk35eT4iTsvJd9jWIetzthE5C0/k2QmMFkCN+4JgQ= github.com/tendermint/go-amino v0.15.1/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME= -github.com/tendermint/tm-db v0.5.1 h1:H9HDq8UEA7Eeg13kdYckkgwwkQLBnJGgX4PgLJRhieY= github.com/tendermint/tm-db v0.5.1/go.mod h1:g92zWjHpCYlEvQXvy9M168Su8V1IBEeawpXVVBaK4f4= github.com/tendermint/tm-db v0.5.2 h1:QG3IxQZBubWlr7kGQcYIavyTNmZRO+r//nENxoq0g34= github.com/tendermint/tm-db v0.5.2/go.mod h1:VrPTx04QJhQ9d8TFUTc2GpPBvBf/U9vIdBIzkjBk7Lk= @@ -732,7 +727,6 @@ github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62 github.com/zondax/hid v0.9.0 h1:eiT3P6vNxAEVxXMw66eZUAAnU2zD33JBkfG/EnfAKl8= github.com/zondax/hid v0.9.0/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=