Skip to content

Commit

Permalink
Feature: add gas_price websocket channel (#268)
Browse files Browse the repository at this point in the history
* Feature: add gas_price websocket channel

* Uncomment
  • Loading branch information
aopoltorzhicky authored Aug 22, 2024
1 parent b7ff5e4 commit 716ceaa
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/api/bus/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *Dispatcher) listen(ctx context.Context) {
return
}
if notification == nil {
log.Warn().Str("channel", notification.Channel).Msg("nil notification")
log.Warn().Msg("nil notification")
continue
}
if err := d.handleNotification(ctx, notification); err != nil {
Expand Down
196 changes: 194 additions & 2 deletions cmd/api/docs/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions cmd/api/gas/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
maxBlockSize = coreTypes.MaxDataBytesNoEvidence(1974272, 100)
)

type ComputeHandler func(ctx context.Context, gasState GasPrice) error

type Tracker struct {
state storage.IState
stats storage.IBlockStats
Expand All @@ -39,6 +41,8 @@ type Tracker struct {
gasState GasPrice
q *queue
g workerpool.Group

computeHandler ComputeHandler
}

func NewTracker(
Expand All @@ -64,6 +68,10 @@ func NewTracker(
}
}

func (tracker *Tracker) SubscribeOnCompute(handler ComputeHandler) {
tracker.computeHandler = handler
}

func (tracker *Tracker) Start(ctx context.Context) {
tracker.g.GoCtx(ctx, tracker.listen)
}
Expand All @@ -90,6 +98,14 @@ func (tracker *Tracker) listen(ctx context.Context) {
if err := tracker.computeMetrics(); err != nil {
log.Err(err).Msg("compute metrics")
}

if tracker.computeHandler != nil {
tracker.g.GoCtx(ctx, func(ctx context.Context) {
if err := tracker.computeHandler(ctx, tracker.gasState); err != nil {
log.Err(err).Msg("error in compute handler of gas tracker")
}
})
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/api/handler/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (c *Client) ApplyFilters(msg Subscribe) error {
c.filters.head = true
case ChannelBlocks:
c.filters.blocks = true
case ChannelGasPrice:
c.filters.gasPrice = true
default:
return errors.Wrap(ErrUnknownChannel, msg.Channel)
}
Expand All @@ -97,6 +99,8 @@ func (c *Client) DetachFilters(msg Unsubscribe) error {
c.filters.head = false
case ChannelBlocks:
c.filters.blocks = false
case ChannelGasPrice:
c.filters.gasPrice = false
default:
return errors.Wrap(ErrUnknownChannel, msg.Channel)
}
Expand Down
18 changes: 16 additions & 2 deletions cmd/api/handler/websocket/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,21 @@ func (f HeadFilter) Filter(c client, msg Notification[*responses.State]) bool {
return fltrs.head
}

type GasPriceFilter struct{}

func (f GasPriceFilter) Filter(c client, msg Notification[*responses.GasPrice]) bool {
if msg.Body == nil {
return false
}
fltrs := c.Filters()
if fltrs == nil {
return false
}
return fltrs.gasPrice
}

type Filters struct {
head bool
blocks bool
head bool
blocks bool
gasPrice bool
}
19 changes: 17 additions & 2 deletions cmd/api/handler/websocket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
sdkSync "github.com/dipdup-net/indexer-sdk/pkg/sync"

"github.com/celenium-io/celestia-indexer/cmd/api/bus"
"github.com/celenium-io/celestia-indexer/cmd/api/gas"
"github.com/celenium-io/celestia-indexer/cmd/api/handler/responses"
"github.com/celenium-io/celestia-indexer/internal/storage"
"github.com/gorilla/websocket"
Expand All @@ -25,8 +26,9 @@ type Manager struct {
clients *sdkSync.Map[uint64, *Client]
observer *bus.Observer

blocks *Channel[storage.Block, *responses.Block]
head *Channel[storage.State, *responses.State]
blocks *Channel[storage.Block, *responses.Block]
head *Channel[storage.State, *responses.State]
gasPrice *Channel[gas.GasPrice, *responses.GasPrice]

g workerpool.Group
}
Expand Down Expand Up @@ -56,6 +58,11 @@ func NewManager(observer *bus.Observer) *Manager {
HeadFilter{},
)

manager.gasPrice = NewChannel(
gasPriceProcessor,
GasPriceFilter{},
)

return manager
}

Expand Down Expand Up @@ -131,6 +138,8 @@ func (manager *Manager) AddClientToChannel(channel string, client *Client) {
manager.head.AddClient(client)
case ChannelBlocks:
manager.blocks.AddClient(client)
case ChannelGasPrice:
manager.gasPrice.AddClient(client)
default:
log.Error().Str("channel", channel).Msg("unknown channel name")
}
Expand All @@ -142,7 +151,13 @@ func (manager *Manager) RemoveClientFromChannel(channel string, client *Client)
manager.head.RemoveClient(client.id)
case ChannelBlocks:
manager.blocks.RemoveClient(client.id)
case ChannelGasPrice:
manager.gasPrice.RemoveClient(client.id)
default:
log.Error().Str("channel", channel).Msg("unknown channel name")
}
}

func (manager *Manager) GasTrackerHandler(_ context.Context, state gas.GasPrice) error {
return manager.gasPrice.processMessage(state)
}
Loading

0 comments on commit 716ceaa

Please sign in to comment.