From 4b67a15cc4fce20cea56ccd46bc8c95c4af85c24 Mon Sep 17 00:00:00 2001 From: Ray Green Date: Thu, 27 May 2021 15:03:37 +0800 Subject: [PATCH] add mutex in websocket connection, in case of concurrent write (#887) * add mutex in websocket connection, in case of concurrent write * adpat the header format * inital refactor * add mutex in EventSystem Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> --- .../namespaces/eth/filters/filter_system.go | 96 ++----------------- .../namespaces/eth/filters/subscription.go | 5 - app/rpc/types/utils.go | 14 +-- app/rpc/websockets/pubsub_api.go | 67 ++++++------- app/rpc/websockets/server.go | 38 +++++++- app/rpc/websockets/types.go | 4 +- 6 files changed, 81 insertions(+), 143 deletions(-) diff --git a/app/rpc/namespaces/eth/filters/filter_system.go b/app/rpc/namespaces/eth/filters/filter_system.go index d84e413e32..d71a1ddab4 100644 --- a/app/rpc/namespaces/eth/filters/filter_system.go +++ b/app/rpc/namespaces/eth/filters/filter_system.go @@ -3,6 +3,7 @@ package filters import ( "context" "fmt" + "sync" "time" "github.com/cosmos/cosmos-sdk/server" @@ -42,27 +43,12 @@ type EventSystem struct { // light client mode lightMode bool - index filterIndex - - // Subscriptions - txsSub *Subscription // Subscription for new transaction event - logsSub *Subscription // Subscription for new log event - // rmLogsSub *Subscription // Subscription for removed log event - - pendingLogsSub *Subscription // Subscription for pending log event - chainSub *Subscription // Subscription for new chain event + index filterIndex + indexMux *sync.RWMutex // Channels install chan *Subscription // install filter for event notification uninstall chan *Subscription // remove filter for event notification - - // Unidirectional channels to receive Tendermint ResultEvents - txsCh <-chan coretypes.ResultEvent // Channel to receive new pending transactions event - logsCh <-chan coretypes.ResultEvent // Channel to receive new log event - pendingLogsCh <-chan coretypes.ResultEvent // Channel to receive new pending log event - // rmLogsCh <-chan coretypes.ResultEvent // Channel to receive removed log event - - chainCh <-chan coretypes.ResultEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -83,13 +69,9 @@ func NewEventSystem(client rpcclient.Client) *EventSystem { channelLength: viper.GetInt(server.FlagWsSubChannelLength), lightMode: false, index: index, + indexMux: new(sync.RWMutex), install: make(chan *Subscription), uninstall: make(chan *Subscription), - txsCh: make(<-chan coretypes.ResultEvent), - logsCh: make(<-chan coretypes.ResultEvent), - pendingLogsCh: make(<-chan coretypes.ResultEvent), - // rmLogsCh: make(<-chan coretypes.ResultEvent), - chainCh: make(<-chan coretypes.ResultEvent), } go es.eventLoop() @@ -293,64 +275,10 @@ func (es *EventSystem) handleChainEvent(ev coretypes.ResultEvent) { // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { - var ( - err error - cancelPendingTxsSubs, cancelLogsSubs, cancelPendingLogsSubs, cancelHeaderSubs context.CancelFunc - ) - - // Subscribe events - es.txsSub, cancelPendingTxsSubs, err = es.SubscribePendingTxs() - if err != nil { - panic(fmt.Errorf("failed to subscribe pending txs: %w", err)) - } - - defer cancelPendingTxsSubs() - - es.logsSub, cancelLogsSubs, err = es.SubscribeLogs(filters.FilterCriteria{}) - if err != nil { - panic(fmt.Errorf("failed to subscribe logs: %w", err)) - } - - defer cancelLogsSubs() - - es.pendingLogsSub, cancelPendingLogsSubs, err = es.subscribePendingLogs(filters.FilterCriteria{}) - if err != nil { - panic(fmt.Errorf("failed to subscribe pending logs: %w", err)) - } - - defer cancelPendingLogsSubs() - - es.chainSub, cancelHeaderSubs, err = es.SubscribeNewHeads() - if err != nil { - panic(fmt.Errorf("failed to subscribe headers: %w", err)) - } - - defer cancelHeaderSubs() - - // Ensure all subscriptions get cleaned up - defer func() { - es.txsSub.Unsubscribe(es) - es.logsSub.Unsubscribe(es) - // es.rmLogsSub.Unsubscribe(es) - es.pendingLogsSub.Unsubscribe(es) - es.chainSub.Unsubscribe(es) - }() - for { select { - case txEvent := <-es.txsSub.eventCh: - es.handleTxsEvent(txEvent) - case headerEv := <-es.chainSub.eventCh: - es.handleChainEvent(headerEv) - case logsEv := <-es.logsSub.eventCh: - es.handleLogs(logsEv) - // TODO: figure out how to handle removed logs - // case logsEv := <-es.rmLogsSub.eventCh: - // es.handleLogs(logsEv) - case logsEv := <-es.pendingLogsSub.eventCh: - es.handleLogs(logsEv) - case f := <-es.install: + es.indexMux.Lock() if f.typ == filters.MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions es.index[filters.LogsSubscription][f.id] = f @@ -358,9 +286,11 @@ func (es *EventSystem) eventLoop() { } else { es.index[f.typ][f.id] = f } + es.indexMux.Unlock() close(f.installed) case f := <-es.uninstall: + es.indexMux.Lock() if f.typ == filters.MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions delete(es.index[filters.LogsSubscription], f.id) @@ -368,18 +298,8 @@ func (es *EventSystem) eventLoop() { } else { delete(es.index[f.typ], f.id) } + es.indexMux.Unlock() close(f.err) - // System stopped - case <-es.txsSub.Err(): - return - case <-es.logsSub.Err(): - return - // case <-es.rmLogsSub.Err(): - // return - case <-es.pendingLogsSub.Err(): - return - case <-es.chainSub.Err(): - return } } // }() diff --git a/app/rpc/namespaces/eth/filters/subscription.go b/app/rpc/namespaces/eth/filters/subscription.go index ee09f6ebe8..1a845faa0d 100644 --- a/app/rpc/namespaces/eth/filters/subscription.go +++ b/app/rpc/namespaces/eth/filters/subscription.go @@ -1,7 +1,6 @@ package filters import ( - "log" "time" "github.com/ethereum/go-ethereum/common" @@ -39,10 +38,6 @@ func (s *Subscription) Unsubscribe(es *EventSystem) { } go func() { - defer func() { - log.Println("successfully unsubscribed to event", s.event) - }() - uninstallLoop: for { // write uninstall request and consume logs/hashes. This prevents diff --git a/app/rpc/types/utils.go b/app/rpc/types/utils.go index 2b95439a96..722bd05cb5 100644 --- a/app/rpc/types/utils.go +++ b/app/rpc/types/utils.go @@ -114,7 +114,7 @@ func EthHeaderFromTendermint(header tmtypes.Header) *ethtypes.Header { Coinbase: common.BytesToAddress(header.ProposerAddress), Root: common.BytesToHash(header.AppHash), TxHash: common.BytesToHash(header.DataHash), - ReceiptHash: common.Hash{}, + ReceiptHash: ethtypes.EmptyRootHash, Difficulty: nil, Number: big.NewInt(header.Height), Time: uint64(header.Time.Unix()), @@ -258,11 +258,13 @@ func EthHeaderWithBlockHashFromTendermint(tmHeader *tmtypes.Header) (header *Eth } header = &EthHeaderWithBlockHash{ - ParentHash: common.BytesToHash(tmHeader.LastBlockID.Hash.Bytes()), - Coinbase: common.BytesToAddress(tmHeader.ProposerAddress), - Root: common.BytesToHash(tmHeader.AppHash), - TxHash: common.BytesToHash(tmHeader.DataHash), - Number: (*hexutil.Big)(big.NewInt(tmHeader.Height)), + ParentHash: common.BytesToHash(tmHeader.LastBlockID.Hash.Bytes()), + UncleHash: ethtypes.EmptyUncleHash, + Coinbase: common.BytesToAddress(tmHeader.ProposerAddress), + Root: common.BytesToHash(tmHeader.AppHash), + TxHash: common.BytesToHash(tmHeader.DataHash), + ReceiptHash: ethtypes.EmptyRootHash, + Number: (*hexutil.Big)(big.NewInt(tmHeader.Height)), // difficulty is not available for DPOS Difficulty: defaultDifficulty, GasLimit: defaultGasLimit, diff --git a/app/rpc/websockets/pubsub_api.go b/app/rpc/websockets/pubsub_api.go index 8465cfac4f..602a0ca481 100644 --- a/app/rpc/websockets/pubsub_api.go +++ b/app/rpc/websockets/pubsub_api.go @@ -4,8 +4,6 @@ import ( "fmt" "sync" - "github.com/gorilla/websocket" - "github.com/tendermint/tendermint/libs/log" coretypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" @@ -25,7 +23,7 @@ import ( type PubSubAPI struct { clientCtx context.CLIContext events *rpcfilters.EventSystem - filtersMu sync.Mutex + filtersMu *sync.RWMutex filters map[rpc.ID]*wsSubscription logger log.Logger } @@ -35,12 +33,13 @@ func NewAPI(clientCtx context.CLIContext, log log.Logger) *PubSubAPI { return &PubSubAPI{ clientCtx: clientCtx, events: rpcfilters.NewEventSystem(clientCtx.Client), + filtersMu: new(sync.RWMutex), filters: make(map[rpc.ID]*wsSubscription), logger: log.With("module", "websocket-client"), } } -func (api *PubSubAPI) subscribe(conn *websocket.Conn, params []interface{}) (rpc.ID, error) { +func (api *PubSubAPI) subscribe(conn *wsConn, params []interface{}) (rpc.ID, error) { method, ok := params[0].(string) if !ok { return "0", fmt.Errorf("invalid parameters") @@ -82,7 +81,7 @@ func (api *PubSubAPI) unsubscribe(id rpc.ID) bool { return true } -func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { +func (api *PubSubAPI) subscribeNewHeads(conn *wsConn) (rpc.ID, error) { sub, _, err := api.events.SubscribeNewHeads() if err != nil { return "", fmt.Errorf("error creating block filter: %s", err.Error()) @@ -112,7 +111,7 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { continue } - api.filtersMu.Lock() + api.filtersMu.RLock() if f, found := api.filters[sub.ID()]; found { // write to ws conn res := &SubscriptionNotification{ @@ -131,17 +130,16 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { api.logger.Debug("successfully write header", "ID", sub.ID(), "blocknumber", headerWithBlockHash.Number) } } - api.filtersMu.Unlock() + api.filtersMu.RUnlock() if err != nil { api.unsubscribe(sub.ID()) } case err := <-errCh: - api.filtersMu.Lock() - sub.Unsubscribe(api.events) - delete(api.filters, sub.ID()) - api.filtersMu.Unlock() - api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + if err != nil { + api.unsubscribe(sub.ID()) + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + } return case <-unsubscribed: api.logger.Debug("NewHeads channel is closed", "ID", sub.ID()) @@ -153,7 +151,7 @@ func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { return sub.ID(), nil } -func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rpc.ID, error) { +func (api *PubSubAPI) subscribeLogs(conn *wsConn, extra interface{}) (rpc.ID, error) { crit := filters.FilterCriteria{} if extra != nil { @@ -239,7 +237,7 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp return } - api.filtersMu.Lock() + api.filtersMu.RLock() if f, found := api.filters[sub.ID()]; found { // write to ws conn res := &SubscriptionNotification{ @@ -259,18 +257,17 @@ func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rp api.logger.Debug("successfully write log", "ID", sub.ID(), "height", singleLog.BlockNumber, "txhash", singleLog.TxHash) } } - api.filtersMu.Unlock() + api.filtersMu.RUnlock() if err != nil { api.unsubscribe(sub.ID()) } }(event) case err := <-errCh: - api.filtersMu.Lock() - sub.Unsubscribe(api.events) - delete(api.filters, sub.ID()) - api.filtersMu.Unlock() - api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + if err != nil { + api.unsubscribe(sub.ID()) + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + } return case <-unsubscribed: api.logger.Debug("Logs channel is closed", "ID", sub.ID()) @@ -356,7 +353,7 @@ func isHex(str string) bool { return true } -func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID, error) { +func (api *PubSubAPI) subscribePendingTransactions(conn *wsConn) (rpc.ID, error) { sub, _, err := api.events.SubscribePendingTxs() if err != nil { return "", fmt.Errorf("error creating block filter: %s", err.Error()) @@ -382,7 +379,7 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID } txHash := common.BytesToHash(data.Tx.Hash()) - api.filtersMu.Lock() + api.filtersMu.RLock() if f, found := api.filters[sub.ID()]; found { // write to ws conn res := &SubscriptionNotification{ @@ -401,17 +398,16 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID api.logger.Debug("successfully write pending tx", "ID", sub.ID(), "txhash", txHash) } } - api.filtersMu.Unlock() + api.filtersMu.RUnlock() if err != nil { api.unsubscribe(sub.ID()) } case err := <-errCh: - api.filtersMu.Lock() - sub.Unsubscribe(api.events) - delete(api.filters, sub.ID()) - api.filtersMu.Unlock() - api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + if err != nil { + api.unsubscribe(sub.ID()) + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + } return case <-unsubscribed: api.logger.Debug("PendingTransactions channel is closed", "ID", sub.ID()) @@ -423,7 +419,7 @@ func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID return sub.ID(), nil } -func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { +func (api *PubSubAPI) subscribeSyncing(conn *wsConn) (rpc.ID, error) { sub, _, err := api.events.SubscribeNewHeads() if err != nil { return "", fmt.Errorf("error creating block filter: %s", err.Error()) @@ -468,7 +464,7 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { } } - api.filtersMu.Lock() + api.filtersMu.RLock() if f, found := api.filters[sub.ID()]; found { // write to ws conn res := &SubscriptionNotification{ @@ -487,18 +483,17 @@ func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { api.logger.Debug("successfully write syncing status", "ID", sub.ID()) } } - api.filtersMu.Unlock() + api.filtersMu.RUnlock() if err != nil { api.unsubscribe(sub.ID()) } case err := <-errCh: - api.filtersMu.Lock() - sub.Unsubscribe(api.events) - delete(api.filters, sub.ID()) - api.filtersMu.Unlock() - api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + if err != nil { + api.unsubscribe(sub.ID()) + api.logger.Error("websocket recv error, close the conn", "ID", sub.ID(), "error", err) + } return case <-unsubscribed: api.logger.Debug("Syncing channel is closed", "ID", sub.ID()) diff --git a/app/rpc/websockets/server.go b/app/rpc/websockets/server.go index 19fdef78f9..b5cad71fb1 100644 --- a/app/rpc/websockets/server.go +++ b/app/rpc/websockets/server.go @@ -101,7 +101,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { }, } - wsConn, err := upgrader.Upgrade(w, r, nil) + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { s.logger.Error("websocket upgrade failed", " error", err) return @@ -109,10 +109,13 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.connPool <- struct{}{} s.currentConnNum.Set(float64(len(s.connPool))) - go s.readLoop(wsConn) + go s.readLoop(&wsConn{ + mux: new(sync.Mutex), + conn: conn, + }) } -func (s *Server) sendErrResponse(conn *websocket.Conn, msg string) { +func (s *Server) sendErrResponse(conn *wsConn, msg string) { res := &ErrorResponseJSON{ Jsonrpc: "2.0", Error: &ErrorMessageJSON{ @@ -127,7 +130,32 @@ func (s *Server) sendErrResponse(conn *websocket.Conn, msg string) { } } -func (s *Server) readLoop(wsConn *websocket.Conn) { +type wsConn struct { + conn *websocket.Conn + mux *sync.Mutex +} + +func (w *wsConn) WriteJSON(v interface{}) error { + w.mux.Lock() + defer w.mux.Unlock() + + return w.conn.WriteJSON(v) +} + +func (w *wsConn) Close() error { + w.mux.Lock() + defer w.mux.Unlock() + + return w.conn.Close() +} + +func (w *wsConn) ReadMessage() (messageType int, p []byte, err error) { + // not protected by write mutex + + return w.conn.ReadMessage() +} + +func (s *Server) readLoop(wsConn *wsConn) { subIds := make(map[rpc.ID]struct{}) for { _, mb, err := wsConn.ReadMessage() @@ -225,7 +253,7 @@ func (s *Server) readLoop(wsConn *websocket.Conn) { // tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response // to the client over websockets -func (s *Server) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error { +func (s *Server) tcpGetAndSendResponse(conn *wsConn, mb []byte) error { req, err := http.NewRequest(http.MethodPost, s.rpcAddr, bytes.NewReader(mb)) if err != nil { return fmt.Errorf("failed to request; %s", err) diff --git a/app/rpc/websockets/types.go b/app/rpc/websockets/types.go index cb340886a2..c98bb02d72 100644 --- a/app/rpc/websockets/types.go +++ b/app/rpc/websockets/types.go @@ -3,8 +3,6 @@ package websockets import ( "math/big" - "github.com/gorilla/websocket" - "github.com/ethereum/go-ethereum/rpc" rpcfilters "github.com/okex/exchain/app/rpc/namespaces/eth/filters" @@ -41,5 +39,5 @@ type ErrorMessageJSON struct { type wsSubscription struct { sub *rpcfilters.Subscription unsubscribed chan struct{} // closed when unsubscribing - conn *websocket.Conn + conn *wsConn }