Skip to content

Commit

Permalink
Merge PR: enhance websocket (#792)
Browse files Browse the repository at this point in the history
* merge BlockNonce (#785)

* fix the api-calling process logic

* add more log in websocket

* fix ut

* optimize log

* optimize the url process

* Merge PR: optimize performance of the func eth_call (#793)

Co-authored-by: Zhong Qiu <[email protected]>

* Merge PR: support the function of calling web3 api in websocket (#795)

Co-authored-by: Zhong Qiu <[email protected]>

* enhance log

* enhance log

* merge

* expand pub channel capacity & use async mode to receive log event

* optimize comment

* change gomod

* change gomod

* expand pub channel cap

* go mod tidy

* remove unnecessary code

Co-authored-by: KamiD <[email protected]>
Co-authored-by: Zhong Qiu <[email protected]>
  • Loading branch information
3 people authored Apr 13, 2021
1 parent 720e481 commit 4558ed6
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 102 deletions.
6 changes: 3 additions & 3 deletions app/rpc/namespaces/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.Canc

switch sub.typ {
case filters.PendingTransactionsSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event, 1000)
case filters.PendingLogsSubscription, filters.MinedAndPendingLogsSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event, 1000)
case filters.LogsSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event, 1000)
case filters.BlocksSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
default:
Expand Down
2 changes: 1 addition & 1 deletion app/rpc/tests/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ func excuteInvalidMessage(t *testing.T, ws *websocket.Conn, message []byte) {
var res Response
require.NoError(t, json.Unmarshal(msg[:n], &res))
require.Equal(t, -32600, res.Error.Code)
require.Equal(t, 0, res.ID)
require.Equal(t, 1, res.ID)
}

func TestWebsocket_PendingTransaction(t *testing.T) {
Expand Down
114 changes: 71 additions & 43 deletions app/rpc/websockets/pubsub_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (api *PubSubAPI) unsubscribe(id rpc.ID) bool {
}
close(api.filters[id].unsubscribed)
delete(api.filters, id)
api.logger.Debug("unsubscribe client", "ID", id)
return true
}

Expand All @@ -99,10 +100,14 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
for {
select {
case event := <-headersCh:
data, _ := event.Data.(tmtypes.EventDataNewBlockHeader)
data, ok := event.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
api.logger.Error(fmt.Sprintf("invalid data type %T, expected EventDataTx", event.Data), "ID", sub.ID())
continue
}
headerWithBlockHash, err := rpctypes.EthHeaderWithBlockHashFromTendermint(&data.Header)
if err != nil {
api.logger.Error("failed to get header with block hash", err)
api.logger.Error("failed to get header with block hash", "error", err)
continue
}

Expand All @@ -120,19 +125,22 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {

err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing header")
api.logger.Error("failed to write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number, "error", err)
} else {
api.logger.Debug("successfully write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number)
}
}
api.filtersMu.Unlock()

if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down Expand Up @@ -209,49 +217,58 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp
for {
select {
case event := <-ch:
dataTx, ok := event.Data.(tmtypes.EventDataTx)
if !ok {
err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data)
return
}

var resultData evmtypes.ResultData
resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
if err != nil {
return
}
go func(event coretypes.ResultEvent) {
dataTx, ok := event.Data.(tmtypes.EventDataTx)
if !ok {
api.logger.Error(fmt.Sprintf("invalid event data %T, expected EventDataTx", event.Data))
return
}

logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
var resultData evmtypes.ResultData
resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
if err != nil {
api.logger.Error("failed to decode result data", "error", err)
return
}

api.filtersMu.Lock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Jsonrpc: "2.0",
Method: "eth_subscription",
Params: &SubscriptionResult{
Subscription: sub.ID(),
},
logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
if len(logs) == 0 {
api.logger.Debug("no matched logs", "ID", sub.ID(), "txhash", resultData.TxHash)
return
}
for _, singleLog := range logs {
res.Params.Result = singleLog
err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error(fmt.Sprintf("failed to write header: %s", err))
break

api.filtersMu.Lock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Jsonrpc: "2.0",
Method: "eth_subscription",
Params: &SubscriptionResult{
Subscription: sub.ID(),
},
}
for _, singleLog := range logs {
res.Params.Result = singleLog
err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error("failed to write log", "ID", sub.ID(), "height", singleLog.BlockNumber, "txhash", singleLog.TxHash, "error", err)
break
}
api.logger.Debug("successfully write log", "ID", sub.ID(), "height", singleLog.BlockNumber, "txhash", singleLog.TxHash)
}
}
}
api.filtersMu.Unlock()
api.filtersMu.Unlock()

if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
case <-errCh:
if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
}(event)
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down Expand Up @@ -355,7 +372,11 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID
for {
select {
case ev := <-txsCh:
data, _ := ev.Data.(tmtypes.EventDataTx)
data, ok := ev.Data.(tmtypes.EventDataTx)
if !ok {
api.logger.Error(fmt.Sprintf("invalid data type %T, expected EventDataTx", ev.Data), "ID", sub.ID())
continue
}
txHash := common.BytesToHash(data.Tx.Hash())

api.filtersMu.Lock()
Expand All @@ -372,19 +393,22 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID

err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error(fmt.Sprintf("failed to write header: %s", err.Error()))
api.logger.Error("failed to write pending tx", "ID", sub.ID(), "error", err)
} else {
api.logger.Debug("successfully write pending tx", "ID", sub.ID(), "txhash", txHash)
}
}
api.filtersMu.Unlock()

if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down Expand Up @@ -426,7 +450,8 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {

newStatus, err := api.clientCtx.Client.Status()
if err != nil {
api.logger.Error("error get sync status: %s", err.Error())
api.logger.Error(fmt.Sprintf("error get sync status: %s", err.Error()))
continue
}

if !newStatus.SyncInfo.CatchingUp {
Expand All @@ -453,7 +478,9 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {

err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing syncing")
api.logger.Error("failed to write syncing status", "ID", sub.ID(), "error", err)
} else {
api.logger.Debug("successfully write syncing status", "ID", sub.ID())
}
}
api.filtersMu.Unlock()
Expand All @@ -462,11 +489,12 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
api.unsubscribe(sub.ID())
}

case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down
76 changes: 27 additions & 49 deletions app/rpc/websockets/server.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package websockets

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math/big"
"net"
"net/http"
"strings"

"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/server"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/spf13/viper"

"github.com/tendermint/tendermint/libs/log"

"github.com/ethereum/go-ethereum/rpc"

context "github.com/cosmos/cosmos-sdk/client/context"
)

// Server defines a server that handles Ethereum websockets.
Expand All @@ -32,8 +28,20 @@ type Server struct {

// NewServer creates a new websocket server instance.
func NewServer(clientCtx context.CLIContext, log log.Logger, wsAddr string) *Server {
restServerAddr := viper.GetString(server.FlagListenAddr)
parts := strings.SplitN(restServerAddr, "://", 2)
if len(parts) != 2 {
panic(fmt.Errorf("invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", restServerAddr))
}
url := parts[1]
urlParts := strings.SplitN(url, ":", 2)
if len(urlParts) != 2 {
panic(fmt.Errorf("invalid listening address %s (use ip:port as an url)", url))
}
port := urlParts[1]

return &Server{
rpcAddr: viper.GetString("laddr"),
rpcAddr: "http://localhost:" + port,
wsAddr: wsAddr,
api: NewAPI(clientCtx, log),
logger: log.With("module", "websocket-server"),
Expand Down Expand Up @@ -76,7 +84,7 @@ func (s *Server) sendErrResponse(conn *websocket.Conn, msg string) {
Code: big.NewInt(-32600),
Message: msg,
},
ID: nil,
ID: big.NewInt(1),
}
err := conn.WriteJSON(res)
if err != nil {
Expand Down Expand Up @@ -123,24 +131,24 @@ func (s *Server) readLoop(wsConn *websocket.Conn) {

err = wsConn.WriteJSON(res)
if err != nil {
s.logger.Error("failed to write json response", err)
s.logger.Error("failed to write json response", "ID", id, "error", err)
continue
}

s.logger.Debug("successfully subscribe", "ID", id)
continue
} else if method.(string) == "eth_unsubscribe" {
ids, ok := msg["params"].([]interface{})
if len(ids) == 0 {
s.sendErrResponse(wsConn, "invalid parameters")
continue
}

if _, idok := ids[0].(string); !ok || !idok {
id, idok := ids[0].(string)
if !ok || !idok {
s.sendErrResponse(wsConn, "invalid parameters")
continue
}

ok = s.api.unsubscribe(rpc.ID(ids[0].(string)))
ok = s.api.unsubscribe(rpc.ID(id))
res := &SubscriptionResponseJSON{
Jsonrpc: "2.0",
ID: 1,
Expand All @@ -149,10 +157,10 @@ func (s *Server) readLoop(wsConn *websocket.Conn) {

err = wsConn.WriteJSON(res)
if err != nil {
s.logger.Error("failed to write json response", err)
s.logger.Error("failed to write json response", "ID", id, "error", err)
continue
}

s.logger.Debug("successfully unsubscribe", "ID", id)
continue
}

Expand All @@ -167,47 +175,17 @@ func (s *Server) readLoop(wsConn *websocket.Conn) {
// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response
// to the client over websockets
func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error {
addr := strings.Split(s.rpcAddr, "tcp://")
if len(addr) != 2 {
return fmt.Errorf("invalid laddr %s", s.rpcAddr)
}

tcpConn, err := net.Dial("tcp", addr[1])
if err != nil {
return fmt.Errorf("cannot connect to %s; %s", s.rpcAddr, err)
}

buf := &bytes.Buffer{}
_, err = buf.Write(mb)
if err != nil {
return fmt.Errorf("failed to write message; %s", err)
}

req, err := http.NewRequest("POST", s.rpcAddr, buf)
req, err := http.NewRequest(http.MethodPost, s.rpcAddr, bytes.NewReader(mb))
if err != nil {
return fmt.Errorf("failed to request; %s", err)
}

req.Header.Set("Content-Type", "application/json;")
err = req.Write(tcpConn)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to write to rest-server; %s", err)
}

respBytes, err := ioutil.ReadAll(tcpConn)
if err != nil {
return fmt.Errorf("error reading response from rest-server; %s", err)
}

respbuf := &bytes.Buffer{}
respbuf.Write(respBytes)
resp, err := http.ReadResponse(bufio.NewReader(respbuf), req)
if err != nil {
return fmt.Errorf("could not read response; %s", err)
}

defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read body from response; %s", err)
Expand Down
Loading

0 comments on commit 4558ed6

Please sign in to comment.