From 4a608c8436dcfebdaacb0af3370994c95e5defdd Mon Sep 17 00:00:00 2001 From: Borjan Trajanoski Date: Wed, 12 Sep 2018 12:17:32 +0200 Subject: [PATCH 1/2] [ethws] Brought back the eth ws module and it's VM registration --- dapp/registry/registry.go | 6 ++ ethws/ws.go | 181 ++++++++++++++++++++++++++++++++++++++ ethws/ws_test.go | 30 +++++++ 3 files changed, 217 insertions(+) create mode 100644 ethws/ws.go create mode 100644 ethws/ws_test.go diff --git a/dapp/registry/registry.go b/dapp/registry/registry.go index 24cdf6c..70eb0e5 100644 --- a/dapp/registry/registry.go +++ b/dapp/registry/registry.go @@ -22,6 +22,7 @@ import ( sendEthTxMod "github.com/Bit-Nation/panthalassa/dapp/module/sendEthTx" uuidv4Mod "github.com/Bit-Nation/panthalassa/dapp/module/uuidv4" db "github.com/Bit-Nation/panthalassa/db" + ethws "github.com/Bit-Nation/panthalassa/ethws" keyManager "github.com/Bit-Nation/panthalassa/keyManager" storm "github.com/asdine/storm" log "github.com/ipfs/go-log" @@ -55,6 +56,7 @@ type Registry struct { host host.Host closeChan chan *dapp.Data conf Config + ethWS *ethws.EthereumWS api *api.API km *keyManager.KeyManager dAppDB dapp.Storage @@ -85,6 +87,10 @@ func NewDAppRegistry(h host.Host, conf Config, api *api.API, km *keyManager.KeyM fetchDAppChan: make(chan fetchDAppChanStr), addDevStreamChan: make(chan addDevStreamChanStr), fetchDevStreamChan: make(chan fetchDAppStreamStr), + ethWS: ethws.New(ethws.Config{ + Retry: time.Second, + WSUrl: conf.EthWSEndpoint, + }), } // load all default DApps diff --git a/ethws/ws.go b/ethws/ws.go new file mode 100644 index 0000000..3b1627f --- /dev/null +++ b/ethws/ws.go @@ -0,0 +1,181 @@ +package ethws + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + wsg "github.com/gorilla/websocket" + log "github.com/ipfs/go-log" +) + +const rpcVersion = "2.0" + +var logger = log.Logger("ethws") + +type Config struct { + Retry time.Duration + WSUrl string +} + +type Request struct { + ID int64 `json:"id"` + Method string `json:"method"` + Params []interface{} `json:"params"` + JsonRPC string `json:"jsonrpc"` +} + +func (r *Request) Marshal() ([]byte, error) { + return json.Marshal(r) +} + +type Error struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type Response struct { + JsonRPC string `json:"jsonrpc"` + RPCError *Error `json:"error,omitempty"` + Result interface{} `json:"result"` + ID int64 `json:"id"` + error error +} + +func (r *Response) Error() error { + return r.error +} + +type EthereumWS struct { + lock sync.Mutex + // requests that need to be send + requestQueue chan Request + requests map[int64]chan<- Response + conn *wsg.Conn +} + +// send an request to ethereum network +func (ws *EthereumWS) SendRequest(r Request) (<-chan Response, error) { + + c := make(chan Response) + + // add request to stack + ws.lock.Lock() + defer ws.lock.Unlock() + for { + id := time.Now().UnixNano() + if _, exist := ws.requests[id]; !exist { + r.ID = id + ws.requests[id] = c + break + } + } + + // send request to queue + ws.requestQueue <- r + + return c, nil + +} + +// create new ethereum websocket +func New(conf Config) *EthereumWS { + + startSendWorker := make(chan bool) + startReadWorker := make(chan bool) + + etws := &EthereumWS{ + lock: sync.Mutex{}, + requestQueue: make(chan Request, 1000), + requests: map[int64]chan<- Response{}, + } + + // worker that sends the requests + go func() { + + // wait for connection + <-startSendWorker + + // send requests + for { + select { + case req := <-etws.requestQueue: + + // send request + if err := etws.conn.WriteJSON(req); err != nil { + logger.Error(err) + + etws.lock.Lock() + respChan := etws.requests[req.ID] + etws.lock.Unlock() + + respChan <- Response{error: err} + } + } + } + + }() + + // worker that read response from websocket + go func() { + + // wait to start worker + <-startReadWorker + + for { + + // read message + _, response, err := etws.conn.ReadMessage() + if err != nil { + logger.Error(err) + continue + } + + // unmarshal + var resp Response + if err := json.Unmarshal(response, &resp); err != nil { + logger.Error(err) + continue + } + + // get response channel + etws.lock.Lock() + respChan, exist := etws.requests[resp.ID] + if !exist { + logger.Error(fmt.Sprintf("failed to get response channel for ID: %d", resp.ID)) + continue + } + delete(etws.requests, resp.ID) + etws.lock.Unlock() + + // send response + respChan <- resp + + } + + }() + + // connect to ethereum node + go func() { + + // try to connect till success + for { + co, _, err := wsg.DefaultDialer.Dial(conf.WSUrl, nil) + if err == nil { + etws.conn = co + break + } + logger.Error(err) + // wait a bit. We don't want to stress the endpoint + time.Sleep(conf.Retry) + } + + // signal the workers to start + startReadWorker <- true + startSendWorker <- true + + }() + + return etws +} diff --git a/ethws/ws_test.go b/ethws/ws_test.go new file mode 100644 index 0000000..cd33638 --- /dev/null +++ b/ethws/ws_test.go @@ -0,0 +1,30 @@ +package ethws + +import ( + "testing" + "time" + + require "github.com/stretchr/testify/require" +) + +func TestEthereumWebSocket(t *testing.T) { + + ws := New(Config{ + Retry: time.Second, + WSUrl: "wss://mainnet.infura.io/_ws", + }) + + respChan, err := ws.SendRequest(Request{ + Method: `eth_protocolVersion`, + JsonRPC: rpcVersion, + }) + + require.Nil(t, err) + + resp := <-respChan + require.Nil(t, resp.Error()) + + require.Equal(t, "2.0", resp.JsonRPC) + require.Equal(t, "0x3f", resp.Result) + +} From bb62c14c908ac4403b964edf0d63fc69fd6c2630 Mon Sep 17 00:00:00 2001 From: Borjan Trajanoski Date: Wed, 12 Sep 2018 14:40:11 +0200 Subject: [PATCH 2/2] [ethws] Switched mutex lock to experimental channel state --- ethws/ws.go | 84 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/ethws/ws.go b/ethws/ws.go index 3b1627f..5dd7f91 100644 --- a/ethws/ws.go +++ b/ethws/ws.go @@ -3,7 +3,6 @@ package ethws import ( "encoding/json" "fmt" - "sync" "time" wsg "github.com/gorilla/websocket" @@ -48,33 +47,72 @@ func (r *Response) Error() error { } type EthereumWS struct { - lock sync.Mutex + state State // requests that need to be send requestQueue chan Request requests map[int64]chan<- Response conn *wsg.Conn } +type State struct { + addRequestIfNotExist chan StateObject + getRequest chan StateObject + deleteRequest chan StateObject + response chan StateObject +} + +type StateObject struct { + id int64 + c chan<- Response +} + +// start state machine +func (ws *EthereumWS) State() { + for { + select { + case getRequest := <-ws.state.getRequest: + // if the request exists, respond with the id and the response channel + if respChan, exist := ws.requests[getRequest.id]; exist { + ws.state.response <- StateObject{getRequest.id, respChan} + break + } + // if the request doesn't exist, respond with 0 and nil + ws.state.response <- StateObject{0, nil} + case addRequest := <-ws.state.addRequestIfNotExist: + // if the request exists, don't add it to the map, respond with the id and the response channel + if respChan, exist := ws.requests[addRequest.id]; exist { + ws.state.response <- StateObject{addRequest.id, respChan} + break + } + // if the request doesn't exist, add it to the map, respond with 0 and nil + ws.requests[addRequest.id] = addRequest.c + ws.state.response <- StateObject{0, nil} + case deleteRequest := <-ws.state.deleteRequest: + delete(ws.requests, deleteRequest.id) + ws.state.response <- StateObject{0, nil} + } // select + } // infinite for +} // func (ws *EthereumWS) State() + // send an request to ethereum network func (ws *EthereumWS) SendRequest(r Request) (<-chan Response, error) { c := make(chan Response) // add request to stack - ws.lock.Lock() - defer ws.lock.Unlock() for { id := time.Now().UnixNano() - if _, exist := ws.requests[id]; !exist { + ws.state.addRequestIfNotExist <- StateObject{id, c} + stateResponse := <-ws.state.response + // if the id didn't exist + if stateResponse.id == 0 { r.ID = id - ws.requests[id] = c break } } // send request to queue ws.requestQueue <- r - return c, nil } @@ -86,17 +124,21 @@ func New(conf Config) *EthereumWS { startReadWorker := make(chan bool) etws := &EthereumWS{ - lock: sync.Mutex{}, + state: State{ + addRequestIfNotExist: make(chan StateObject), + getRequest: make(chan StateObject), + deleteRequest: make(chan StateObject), + response: make(chan StateObject), + }, requestQueue: make(chan Request, 1000), requests: map[int64]chan<- Response{}, } - + go etws.State() // worker that sends the requests go func() { // wait for connection <-startSendWorker - // send requests for { select { @@ -106,9 +148,9 @@ func New(conf Config) *EthereumWS { if err := etws.conn.WriteJSON(req); err != nil { logger.Error(err) - etws.lock.Lock() - respChan := etws.requests[req.ID] - etws.lock.Unlock() + etws.state.getRequest <- StateObject{req.ID, nil} + stateResponse := <-etws.state.response + respChan := stateResponse.c respChan <- Response{error: err} } @@ -122,7 +164,6 @@ func New(conf Config) *EthereumWS { // wait to start worker <-startReadWorker - for { // read message @@ -131,7 +172,6 @@ func New(conf Config) *EthereumWS { logger.Error(err) continue } - // unmarshal var resp Response if err := json.Unmarshal(response, &resp); err != nil { @@ -140,16 +180,16 @@ func New(conf Config) *EthereumWS { } // get response channel - etws.lock.Lock() - respChan, exist := etws.requests[resp.ID] - if !exist { - logger.Error(fmt.Sprintf("failed to get response channel for ID: %d", resp.ID)) + etws.state.getRequest <- StateObject{resp.ID, nil} + stateResponse := <-etws.state.response + if stateResponse.id == 0 { + logger.Error(fmt.Sprintf("failed to get response channel for ID: %d", stateResponse.id)) continue } - delete(etws.requests, resp.ID) - etws.lock.Unlock() - + etws.state.deleteRequest <- StateObject{resp.ID, nil} + _ = <-etws.state.response // send response + respChan := stateResponse.c respChan <- resp }