diff --git a/rpc/websockets.go b/rpc/websockets.go index 7303a7717a..9c1f012748 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -86,7 +86,7 @@ type websocketsServer struct { logger log.Logger } -func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { +func NewWebsocketsServer(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer { logger = logger.With("api", "websocket-server") _, port, _ := net.SplitHostPort(cfg.JSONRPC.Address) @@ -95,7 +95,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *st wsAddr: cfg.JSONRPC.WsAddress, certFile: cfg.TLS.CertificatePath, keyFile: cfg.TLS.KeyPath, - api: newPubSubAPI(clientCtx, logger, stream), + api: newPubSubAPI(ctx, clientCtx, logger, stream), logger: logger, } } @@ -347,18 +347,20 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec type pubSubAPI struct { - events *stream.RPCStream - logger log.Logger - clientCtx client.Context + events *stream.RPCStream + logger log.Logger + clientCtx client.Context + cancelContext context.Context } // newPubSubAPI creates an instance of the ethereum PubSub API. -func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { +func newPubSubAPI(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI { logger = logger.With("module", "websocket-client") return &pubSubAPI{ - events: stream, - logger: logger, - clientCtx: clientCtx, + events: stream, + logger: logger, + clientCtx: clientCtx, + cancelContext: ctx, } } @@ -411,9 +413,9 @@ type Header struct { } func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) //nolint: errcheck - go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error { + go api.events.HeaderStream().Subscribe(api.cancelContext, func(headers []stream.RPCHeader, _ int) error { for _, header := range headers { h := header.EthHeader var enc Header @@ -569,9 +571,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } } - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) //nolint: errcheck - go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error { + go api.events.LogStream().Subscribe(api.cancelContext, func(txLogs []*ethtypes.Log, _ int) error { logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) if len(logs) == 0 { return nil @@ -605,9 +607,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac } func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) { - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) //nolint: errcheck - go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error { + go api.events.PendingTxStream().Subscribe(api.cancelContext, func(items []common.Hash, _ int) error { for _, hash := range items { // write to ws conn res := &SubscriptionNotification{ diff --git a/server/json_rpc.go b/server/json_rpc.go index 130af9bc69..c8b5b5eedb 100644 --- a/server/json_rpc.go +++ b/server/json_rpc.go @@ -165,7 +165,7 @@ func StartJSONRPC( srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress) - wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config) + wsSrv := rpc.NewWebsocketsServer(ctx, clientCtx, srvCtx.Logger, rpcStream, config) wsSrv.Start() return httpSrv, nil }