Skip to content

Commit

Permalink
graceful shutdown of websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
valli0x committed Oct 25, 2024
1 parent e786b3f commit dedc64d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
32 changes: 17 additions & 15 deletions rpc/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type websocketsServer struct {
logger log.Logger
}

func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer {
func NewWebsocketsServer(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream, cfg *config.Config) WebsocketsServer {
logger = logger.With("api", "websocket-server")
_, port, _ := net.SplitHostPort(cfg.JSONRPC.Address)

Expand All @@ -95,7 +95,7 @@ func NewWebsocketsServer(clientCtx client.Context, logger log.Logger, stream *st
wsAddr: cfg.JSONRPC.WsAddress,
certFile: cfg.TLS.CertificatePath,
keyFile: cfg.TLS.KeyPath,
api: newPubSubAPI(clientCtx, logger, stream),
api: newPubSubAPI(ctx, clientCtx, logger, stream),
logger: logger,
}
}
Expand Down Expand Up @@ -347,18 +347,20 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro

// pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
type pubSubAPI struct {
events *stream.RPCStream
logger log.Logger
clientCtx client.Context
events *stream.RPCStream
logger log.Logger
clientCtx client.Context
cancelContext context.Context
}

// newPubSubAPI creates an instance of the ethereum PubSub API.
func newPubSubAPI(clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI {
func newPubSubAPI(ctx context.Context, clientCtx client.Context, logger log.Logger, stream *stream.RPCStream) *pubSubAPI {
logger = logger.With("module", "websocket-client")
return &pubSubAPI{
events: stream,
logger: logger,
clientCtx: clientCtx,
events: stream,
logger: logger,
clientCtx: clientCtx,
cancelContext: ctx,
}
}

Expand Down Expand Up @@ -411,9 +413,9 @@ type Header struct {
}

func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
_, cancel := context.WithCancel(context.Background())
//nolint: errcheck
go api.events.HeaderStream().Subscribe(ctx, func(headers []stream.RPCHeader, _ int) error {
go api.events.HeaderStream().Subscribe(api.cancelContext, func(headers []stream.RPCHeader, _ int) error {
for _, header := range headers {
h := header.EthHeader
var enc Header
Expand Down Expand Up @@ -569,9 +571,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac
}
}

ctx, cancel := context.WithCancel(context.Background())
_, cancel := context.WithCancel(context.Background())
//nolint: errcheck
go api.events.LogStream().Subscribe(ctx, func(txLogs []*ethtypes.Log, _ int) error {
go api.events.LogStream().Subscribe(api.cancelContext, func(txLogs []*ethtypes.Log, _ int) error {
logs := rpcfilters.FilterLogs(txLogs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
if len(logs) == 0 {
return nil
Expand Down Expand Up @@ -605,9 +607,9 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interfac
}

func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (context.CancelFunc, error) {
ctx, cancel := context.WithCancel(context.Background())
_, cancel := context.WithCancel(context.Background())
//nolint: errcheck
go api.events.PendingTxStream().Subscribe(ctx, func(items []common.Hash, _ int) error {
go api.events.PendingTxStream().Subscribe(api.cancelContext, func(items []common.Hash, _ int) error {
for _, hash := range items {
// write to ws conn
res := &SubscriptionNotification{
Expand Down
2 changes: 1 addition & 1 deletion server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func StartJSONRPC(

srvCtx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress)

wsSrv := rpc.NewWebsocketsServer(clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv := rpc.NewWebsocketsServer(ctx, clientCtx, srvCtx.Logger, rpcStream, config)
wsSrv.Start()
return httpSrv, nil
}

0 comments on commit dedc64d

Please sign in to comment.