Skip to content

Commit

Permalink
Merge PR: support the function of calling web3 api in websocket (#795)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhong Qiu <[email protected]>
  • Loading branch information
Ray Green and zhongqiuwood authored Apr 6, 2021
1 parent b195921 commit ed3da63
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 61 deletions.
2 changes: 1 addition & 1 deletion app/rpc/tests/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ func excuteInvalidMessage(t *testing.T, ws *websocket.Conn, message []byte) {
var res Response
require.NoError(t, json.Unmarshal(msg[:n], &res))
require.Equal(t, -32600, res.Error.Code)
require.Equal(t, 0, res.ID)
require.Equal(t, 1, res.ID)
}

func TestWebsocket_PendingTransaction(t *testing.T) {
Expand Down
37 changes: 27 additions & 10 deletions app/rpc/websockets/pubsub_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (api *PubSubAPI) unsubscribe(id rpc.ID) bool {
}
close(api.filters[id].unsubscribed)
delete(api.filters, id)
api.logger.Debug("unsubscribe client", "ID", id)
return true
}

Expand All @@ -102,7 +103,7 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
data, _ := event.Data.(tmtypes.EventDataNewBlockHeader)
headerWithBlockHash, err := rpctypes.EthHeaderWithBlockHashFromTendermint(&data.Header)
if err != nil {
api.logger.Error("failed to get header with block hash", err)
api.logger.Error("failed to get header with block hash", "error", err)
continue
}

Expand All @@ -120,19 +121,22 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {

err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing header")
api.logger.Error("failed to write header", "ID", sub.ID(), "error", err)
} else {
api.logger.Debug("successfully write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number)
}
}
api.filtersMu.Unlock()

if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down Expand Up @@ -222,6 +226,10 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp
}

logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
if len(logs) == 0 {
api.logger.Debug("no matched logs", "ID", sub.ID())
continue
}

api.filtersMu.Lock()
if f, found := api.filters[sub.ID()]; found {
Expand All @@ -237,21 +245,23 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp
res.Params.Result = singleLog
err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error(fmt.Sprintf("failed to write header: %s", err))
api.logger.Error("failed to write log", "ID", sub.ID(), "error", err)
break
}
api.logger.Debug("successfully write log", "ID", sub.ID(), "txhash", singleLog.TxHash)
}
}
api.filtersMu.Unlock()

if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down Expand Up @@ -372,19 +382,22 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID

err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error(fmt.Sprintf("failed to write header: %s", err.Error()))
api.logger.Error("failed to write pending tx", "ID", sub.ID(), "error", err)
} else {
api.logger.Debug("successfully write pending tx", "ID", sub.ID(), "txhash", txHash)
}
}
api.filtersMu.Unlock()

if err == websocket.ErrCloseSent {
api.unsubscribe(sub.ID())
}
case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down Expand Up @@ -426,7 +439,8 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {

newStatus, err := api.clientCtx.Client.Status()
if err != nil {
api.logger.Error("error get sync status: %s", err.Error())
api.logger.Error(fmt.Sprintf("error get sync status: %s", err.Error()))
continue
}

if !newStatus.SyncInfo.CatchingUp {
Expand All @@ -453,7 +467,9 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {

err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing syncing")
api.logger.Error("failed to write syncing status", "ID", sub.ID(), "error", err)
} else {
api.logger.Debug("successfully write syncing status", "ID", sub.ID())
}
}
api.filtersMu.Unlock()
Expand All @@ -462,11 +478,12 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
api.unsubscribe(sub.ID())
}

case <-errCh:
case err := <-errCh:
api.filtersMu.Lock()
sub.Unsubscribe(api.events)
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err)
return
case <-unsubscribed:
return
Expand Down
78 changes: 28 additions & 50 deletions app/rpc/websockets/server.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package websockets

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"math/big"
"net"
"net/http"
"strings"

"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/server"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/spf13/viper"

"github.com/tendermint/tendermint/libs/log"

"github.com/ethereum/go-ethereum/rpc"

context "github.com/cosmos/cosmos-sdk/client/context"
)

// Server defines a server that handles Ethereum websockets.
Expand All @@ -32,8 +28,20 @@ type Server struct {

// NewServer creates a new websocket server instance.
func NewServer(clientCtx context.CLIContext, log log.Logger, wsAddr string) *Server {
restServerAddr := viper.GetString(server.FlagListenAddr)
parts := strings.SplitN(restServerAddr, "://", 2)
if len(parts) != 2 {
panic(fmt.Errorf("invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", restServerAddr))
}
url := parts[1]
urlParts := strings.SplitN(url, ":", 2)
if len(urlParts) != 2 {
panic(fmt.Errorf("invalid listening address %s (use ip:port as an url)", url))
}
port := urlParts[1]

return &Server{
rpcAddr: viper.GetString("laddr"),
rpcAddr: "http://localhost:" + port,
wsAddr: wsAddr,
api: NewAPI(clientCtx, log),
logger: log.With("module", "websocket-server"),
Expand Down Expand Up @@ -76,7 +84,7 @@ func (s *Server) sendErrResponse(conn *websocket.Conn, msg string) {
Code: big.NewInt(-32600),
Message: msg,
},
ID: nil,
ID: big.NewInt(1),
}
err := conn.WriteJSON(res)
if err != nil {
Expand Down Expand Up @@ -123,24 +131,24 @@ func (s *Server) readLoop(wsConn *websocket.Conn) {

err = wsConn.WriteJSON(res)
if err != nil {
s.logger.Error("failed to write json response", err)
s.logger.Error("failed to write json response", "ID", id, "error", err)
continue
}

s.logger.Debug("successfully subscribe", "ID", id)
continue
} else if method.(string) == "eth_unsubscribe" {
ids, ok := msg["params"].([]interface{})
if len(ids) == 0 {
s.sendErrResponse(wsConn, "invalid parameters")
continue
}

if _, idok := ids[0].(string); !ok || !idok {
id, idok := ids[0].(string)
if !ok || !idok {
s.sendErrResponse(wsConn, "invalid parameters")
continue
}

ok = s.api.unsubscribe(rpc.ID(ids[0].(string)))
ok = s.api.unsubscribe(rpc.ID(id))
res := &SubscriptionResponseJSON{
Jsonrpc: "2.0",
ID: 1,
Expand All @@ -149,10 +157,10 @@ func (s *Server) readLoop(wsConn *websocket.Conn) {

err = wsConn.WriteJSON(res)
if err != nil {
s.logger.Error("failed to write json response", err)
s.logger.Error("failed to write json response", "ID", id, "error", err)
continue
}

s.logger.Debug("successfully unsubscribe", "ID", id)
continue
}

Expand All @@ -167,47 +175,17 @@ func (s *Server) readLoop(wsConn *websocket.Conn) {
// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response
// to the client over websockets
func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error {
addr := strings.Split(s.rpcAddr, "tcp://")
if len(addr) != 2 {
return fmt.Errorf("invalid laddr %s", s.rpcAddr)
}

tcpConn, err := net.Dial("tcp", addr[1])
if err != nil {
return fmt.Errorf("cannot connect to %s; %s", s.rpcAddr, err)
}

buf := &bytes.Buffer{}
_, err = buf.Write(mb)
if err != nil {
return fmt.Errorf("failed to write message; %s", err)
}

req, err := http.NewRequest("POST", s.rpcAddr, buf)
req, err := http.NewRequest(http.MethodPost, s.rpcAddr, bytes.NewReader(mb))
if err != nil {
return fmt.Errorf("failed to request; %s", err)
}

req.Header.Set("Content-Type", "application/json;")
err = req.Write(tcpConn)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to write to rest-server; %s", err)
}

respBytes, err := ioutil.ReadAll(tcpConn)
if err != nil {
return fmt.Errorf("error reading response from rest-server; %s", err)
}

respbuf := &bytes.Buffer{}
respbuf.Write(respBytes)
resp, err := http.ReadResponse(bufio.NewReader(respbuf), req)
if err != nil {
return fmt.Errorf("could not read response; %s", err)
}

defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read body from response; %s", err)
Expand All @@ -220,4 +198,4 @@ func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error {
}

return conn.WriteJSON(wsSend)
}
}

0 comments on commit ed3da63

Please sign in to comment.