Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj committed Sep 13, 2023
1 parent b7b4e3e commit b0e8a26
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 53 deletions.
68 changes: 18 additions & 50 deletions tools/walletextension/accountmanager/account_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/obscuronet/go-obscuro/tools/walletextension/subscriptions"
"strings"
"time"

Expand All @@ -18,8 +19,6 @@ import (

wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common"

"github.com/go-kit/kit/transport/http/jsonrpc"

"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/rpc"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
Expand All @@ -28,8 +27,6 @@ import (
)

const (
methodEthSubscription = "eth_subscription"

ethCallPaddedArgLen = 64
ethCallAddrPadding = "000000000000000000000000"

Expand All @@ -41,15 +38,17 @@ const (
type AccountManager struct {
unauthedClient rpc.Client
// todo (@ziga) - create two types of clients - WS clients, and HTTP clients - to not create WS clients unnecessarily.
accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account
logger gethlog.Logger
accountClients map[gethcommon.Address]*rpc.EncRPCClient // An encrypted RPC client per registered account
subscriptionsManager *subscriptions.SubscriptionManager
logger gethlog.Logger
}

func NewAccountManager(unauthedClient rpc.Client, logger gethlog.Logger) *AccountManager {
return &AccountManager{
unauthedClient: unauthedClient,
accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient),
logger: logger,
unauthedClient: unauthedClient,
accountClients: make(map[gethcommon.Address]*rpc.EncRPCClient),
subscriptionsManager: subscriptions.New(logger),
logger: logger,
}
}

Expand All @@ -60,13 +59,15 @@ func (m *AccountManager) AddClient(address gethcommon.Address, client *rpc.EncRP

// ProxyRequest tries to identify the correct EncRPCClient to proxy the request to the Obscuro node, or it will attempt
// the request with all clients until it succeeds
func (m *AccountManager) ProxyRequest(rpcReq *RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error {
func (m *AccountManager) ProxyRequest(rpcReq *wecommon.RPCRequest, rpcResp *interface{}, userConn userconn.UserConn) error {
if rpcReq.Method == rpc.Subscribe {
clients, err := m.suggestSubscriptionClient(rpcReq)
if err != nil {
return err
}
// fetch the clients from a topic

m.subscriptionsManager.HandleNewSubscriptions(clients, rpcReq, rpcResp, userConn)

Check failure on line 69 in tools/walletextension/accountmanager/account_manager.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `m.subscriptionsManager.HandleNewSubscriptions` is not checked (errcheck)
// fetch the clients from a topic (todo: remove and replace with HandleNewSubscriptions)
for _, client := range clients {
return m.executeSubscribe(client, rpcReq, rpcResp, userConn)
}
Expand All @@ -79,7 +80,7 @@ const emptyFilterCriteria = "[]" // This is the value that gets passed for an em

// determine the client based on the topics
// if none is found use all clients from current user
func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Client, error) {
func (m *AccountManager) suggestSubscriptionClient(rpcReq *wecommon.RPCRequest) ([]rpc.Client, error) {
clients := make([]rpc.Client, 0, len(m.accountClients))

// by default, if no client is identified as a candidate, then subscribe to all accounts
Expand Down Expand Up @@ -128,7 +129,7 @@ func (m *AccountManager) suggestSubscriptionClient(rpcReq *RPCRequest) ([]rpc.Cl
return clients, nil
}

func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) error {
func (m *AccountManager) executeCall(rpcReq *wecommon.RPCRequest, rpcResp *interface{}) error {
// for obscuro RPC requests it is important we know the sender account for the viewing key encryption/decryption
suggestedClient := m.suggestAccountClient(rpcReq, m.accountClients)

Expand Down Expand Up @@ -160,7 +161,7 @@ func (m *AccountManager) executeCall(rpcReq *RPCRequest, rpcResp *interface{}) e
}

// suggestAccountClient works through various methods to try and guess which available client to use for a request, returns nil if none found
func (m *AccountManager) suggestAccountClient(req *RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient {
func (m *AccountManager) suggestAccountClient(req *wecommon.RPCRequest, accClients map[gethcommon.Address]*rpc.EncRPCClient) *rpc.EncRPCClient {
if len(accClients) == 1 {
for _, client := range accClients {
// return the first (and only) client
Expand Down Expand Up @@ -275,7 +276,7 @@ func searchDataFieldForAccount(callParams map[string]interface{}, accClients map
return nil, fmt.Errorf("no known account found in data bytes")
}

func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit
func (m *AccountManager) executeSubscribe(client rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error { //nolint: gocognit
if len(req.Params) == 0 {
return fmt.Errorf("could not subscribe as no subscription namespace was provided")
}
Expand All @@ -296,7 +297,7 @@ func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, re
return
}

jsonResponse, err := prepareLogResponse(idAndLog)
jsonResponse, err := wecommon.PrepareLogResponse(idAndLog)
if err != nil {
m.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
Expand Down Expand Up @@ -334,7 +335,7 @@ func (m *AccountManager) executeSubscribe(client rpc.Client, req *RPCRequest, re
return nil
}

func submitCall(client *rpc.EncRPCClient, req *RPCRequest, resp *interface{}) error {
func submitCall(client *rpc.EncRPCClient, req *wecommon.RPCRequest, resp *interface{}) error {
if req.Method == rpc.Call || req.Method == rpc.EstimateGas {
// Never modify the original request, as it might be reused.
req = req.Clone()
Expand Down Expand Up @@ -390,36 +391,3 @@ func setFromFieldIfMissing(args []interface{}, account gethcommon.Address) ([]in

return request, nil
}

// Formats the log to be sent as an Eth JSON-RPC response.
func prepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[wecommon.JSONKeySubscription] = idAndLog.SubID
paramsMap[wecommon.JSONKeyResult] = idAndLog.Log

respMap := make(map[string]interface{})
respMap[wecommon.JSONKeyRPCVersion] = jsonrpc.Version
respMap[wecommon.JSONKeyMethod] = methodEthSubscription
respMap[wecommon.JSONKeyParams] = paramsMap

jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}

type RPCRequest struct {
ID json.RawMessage
Method string
Params []interface{}
}

// Clone returns a new instance of the *RPCRequest
func (r *RPCRequest) Clone() *RPCRequest {
return &RPCRequest{
ID: r.ID,
Method: r.Method,
Params: r.Params,
}
}
36 changes: 36 additions & 0 deletions tools/walletextension/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package common
import (
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
"fmt"

Check failure on line 8 in tools/walletextension/common/common.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
"github.com/go-kit/kit/transport/http/jsonrpc"
"github.com/obscuronet/go-obscuro/go/common"
"regexp"

Check failure on line 11 in tools/walletextension/common/common.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)

gethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -83,3 +86,36 @@ func CreateEncClient(hostRPCBindAddr string, addressBytes []byte, privateKeyByte
}
return encClient, nil
}

type RPCRequest struct {
ID json.RawMessage
Method string
Params []interface{}
}

// Clone returns a new instance of the *RPCRequest
func (r *RPCRequest) Clone() *RPCRequest {
return &RPCRequest{
ID: r.ID,
Method: r.Method,
Params: r.Params,
}
}

// Formats the log to be sent as an Eth JSON-RPC response.
func PrepareLogResponse(idAndLog common.IDAndLog) ([]byte, error) {
paramsMap := make(map[string]interface{})
paramsMap[JSONKeySubscription] = idAndLog.SubID
paramsMap[JSONKeyResult] = idAndLog.Log

respMap := make(map[string]interface{})
respMap[JSONKeyRPCVersion] = jsonrpc.Version
respMap[JSONKeyMethod] = methodEthSubscription
respMap[JSONKeyParams] = paramsMap

jsonResponse, err := json.Marshal(respMap)
if err != nil {
return nil, fmt.Errorf("could not marshal log response to JSON. Cause: %w", err)
}
return jsonResponse, nil
}
1 change: 1 addition & 0 deletions tools/walletextension/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
GetStorageAtUserIDRequestMethodName = "getUserID"
SuccessMsg = "success"
APIVersion1 = "/v1"
methodEthSubscription = "eth_subscription"
)

var (
Expand Down
88 changes: 88 additions & 0 deletions tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package subscriptions

import (
"errors"
"fmt"

Check failure on line 5 in tools/walletextension/subscriptions/subscriptions.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
gethlog "github.com/ethereum/go-ethereum/log"
"github.com/obscuronet/go-obscuro/go/common"
"github.com/obscuronet/go-obscuro/go/common/log"
"github.com/obscuronet/go-obscuro/go/rpc"
wecommon "github.com/obscuronet/go-obscuro/tools/walletextension/common"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
)

type SubscriptionManager struct {
logger gethlog.Logger
}

func New(logger gethlog.Logger) *SubscriptionManager {
return &SubscriptionManager{
logger: logger,
}
}

func (s *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req *wecommon.RPCRequest, resp *interface{}, userConn userconn.UserConn) error {

Check warning on line 24 in tools/walletextension/subscriptions/subscriptions.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'resp' seems to be unused, consider removing or renaming it as _ (revive)
if len(req.Params) == 0 {
return fmt.Errorf("could not subscribe as no subscription namespace was provided")
}

if !userConn.SupportsSubscriptions() {
return errors.New("userConn doesn't support subscriptions")
}

// create a chanel that will collect the data from all subscriptions
commonChannel := make(chan common.IDAndLog)

// TODO: Create a buffered channel and perform deduplication of logs or implement additional logic to filter logs

// TODO: Do periodic checks if userConn is closed and unsubscribe from all subscriptions

// Send all logs from common channel to user (via userConn)
go func() {
for {
select {
case idAndLog := <-commonChannel:
if userConn.IsClosed() {
s.logger.Info("received log but websocket was closed on subscription", log.SubIDKey, idAndLog.SubID)
return
}

jsonResponse, err := wecommon.PrepareLogResponse(idAndLog)
if err != nil {
s.logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
}

err = userConn.WriteResponse(jsonResponse)
if err != nil {
s.logger.Error("could not write the JSON log to the websocket on subscription %", log.SubIDKey, idAndLog.SubID, log.ErrKey, err)
continue
}
}
}
}()

// loop over all clients and create a new subscription for each of them
s.logger.Info(fmt.Sprintf("Subscribing to: %d clients", len(clients)))
for _, client := range clients {
s.logger.Info(fmt.Sprintf("Subscribing for an event with client: %s", client))
subscriptionID, err := s.addSubscription(client, req, commonChannel)
if err != nil {
s.logger.Info(fmt.Sprintf("Error: %v", err))
}
s.logger.Info(fmt.Sprintf("Subscribed with subscription ID: %s", subscriptionID))
}
return nil
}

func (s *SubscriptionManager) addSubscription(client rpc.Client, req *wecommon.RPCRequest, commonChannel chan common.IDAndLog) (string, error) {

Check warning on line 78 in tools/walletextension/subscriptions/subscriptions.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'commonChannel' seems to be unused, consider removing or renaming it as _ (revive)
s.logger.Info(fmt.Sprintf("Subscribing client: %s for request: %s", client, req))

// TODO: Create a subscription with Obscuro Node (with new web socket connection each time!)

// TODO:
// Do something similar as in HandleNewSubscriptions
//and check for events comming to this channel and forward them to a common channel

return "", nil
}
5 changes: 2 additions & 3 deletions tools/walletextension/wallet_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/obscuronet/go-obscuro/go/common/stopcontrol"
"github.com/obscuronet/go-obscuro/go/common/viewingkey"
"github.com/obscuronet/go-obscuro/go/rpc"
"github.com/obscuronet/go-obscuro/tools/walletextension/accountmanager"
"github.com/obscuronet/go-obscuro/tools/walletextension/common"
"github.com/obscuronet/go-obscuro/tools/walletextension/storage"
"github.com/obscuronet/go-obscuro/tools/walletextension/userconn"
Expand Down Expand Up @@ -65,7 +64,7 @@ func (w *WalletExtension) Logger() gethlog.Logger {
}

// ProxyEthRequest proxys an incoming user request to the enclave
func (w *WalletExtension) ProxyEthRequest(request *accountmanager.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) {
func (w *WalletExtension) ProxyEthRequest(request *common.RPCRequest, conn userconn.UserConn, hexUserID string) (map[string]interface{}, error) {
response := map[string]interface{}{}
// all responses must contain the request id. Both successful and unsuccessful.
response[common.JSONKeyRPCVersion] = jsonrpc.Version
Expand Down Expand Up @@ -357,7 +356,7 @@ func adjustStateRoot(rpcResp interface{}, respMap map[string]interface{}) {

// getStorageAtInterceptor checks if the parameters for getStorageAt are set to values that require interception
// and return response or nil if the gateway should forward the request to the node.
func (w *WalletExtension) getStorageAtInterceptor(request *accountmanager.RPCRequest, hexUserID string) map[string]interface{} {
func (w *WalletExtension) getStorageAtInterceptor(request *common.RPCRequest, hexUserID string) map[string]interface{} {
// check if parameters are correct, and we can intercept a request, otherwise return nil
if w.checkParametersForInterceptedGetStorageAt(request.Params) {
// check if userID in the parameters is also in our database
Expand Down

0 comments on commit b0e8a26

Please sign in to comment.