Skip to content

Commit

Permalink
Feature: add gas_price websocket channel
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Aug 21, 2024
1 parent bb4304a commit f85b0eb
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/api/bus/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *Dispatcher) listen(ctx context.Context) {
return
}
if notification == nil {
log.Warn().Str("channel", notification.Channel).Msg("nil notification")
log.Warn().Msg("nil notification")
continue
}
if err := d.handleNotification(ctx, notification); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions cmd/api/gas/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ var (
maxBlockSize = coreTypes.MaxDataBytesNoEvidence(1974272, 100)
)

type ComputeHandler func(ctx context.Context, gasState GasPrice) error

type Tracker struct {
state storage.IState
stats storage.IBlockStats
Expand All @@ -39,6 +41,8 @@ type Tracker struct {
gasState GasPrice
q *queue
g workerpool.Group

computeHandler ComputeHandler
}

func NewTracker(
Expand All @@ -64,6 +68,10 @@ func NewTracker(
}
}

func (tracker *Tracker) SubscribeOnCompute(handler ComputeHandler) {
tracker.computeHandler = handler
}

func (tracker *Tracker) Start(ctx context.Context) {
tracker.g.GoCtx(ctx, tracker.listen)
}
Expand All @@ -90,6 +98,14 @@ func (tracker *Tracker) listen(ctx context.Context) {
if err := tracker.computeMetrics(); err != nil {
log.Err(err).Msg("compute metrics")
}

if tracker.computeHandler != nil {
tracker.g.GoCtx(ctx, func(ctx context.Context) {
if err := tracker.computeHandler(ctx, tracker.gasState); err != nil {
log.Err(err).Msg("error in compute handler of gas tracker")
}
})
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/api/handler/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (c *Client) ApplyFilters(msg Subscribe) error {
c.filters.head = true
case ChannelBlocks:
c.filters.blocks = true
case ChannelGasPrice:
c.filters.gasPrice = true
default:
return errors.Wrap(ErrUnknownChannel, msg.Channel)
}
Expand All @@ -97,6 +99,8 @@ func (c *Client) DetachFilters(msg Unsubscribe) error {
c.filters.head = false
case ChannelBlocks:
c.filters.blocks = false
case ChannelGasPrice:
c.filters.gasPrice = false
default:
return errors.Wrap(ErrUnknownChannel, msg.Channel)
}
Expand Down
18 changes: 16 additions & 2 deletions cmd/api/handler/websocket/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,21 @@ func (f HeadFilter) Filter(c client, msg Notification[*responses.State]) bool {
return fltrs.head
}

type GasPriceFilter struct{}

func (f GasPriceFilter) Filter(c client, msg Notification[*responses.GasPrice]) bool {
if msg.Body == nil {
return false
}
fltrs := c.Filters()
if fltrs == nil {
return false
}
return fltrs.gasPrice
}

type Filters struct {
head bool
blocks bool
head bool
blocks bool
gasPrice bool
}
19 changes: 17 additions & 2 deletions cmd/api/handler/websocket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
sdkSync "github.com/dipdup-net/indexer-sdk/pkg/sync"

"github.com/celenium-io/celestia-indexer/cmd/api/bus"
"github.com/celenium-io/celestia-indexer/cmd/api/gas"
"github.com/celenium-io/celestia-indexer/cmd/api/handler/responses"
"github.com/celenium-io/celestia-indexer/internal/storage"
"github.com/gorilla/websocket"
Expand All @@ -25,8 +26,9 @@ type Manager struct {
clients *sdkSync.Map[uint64, *Client]
observer *bus.Observer

blocks *Channel[storage.Block, *responses.Block]
head *Channel[storage.State, *responses.State]
blocks *Channel[storage.Block, *responses.Block]
head *Channel[storage.State, *responses.State]
gasPrice *Channel[gas.GasPrice, *responses.GasPrice]

g workerpool.Group
}
Expand Down Expand Up @@ -56,6 +58,11 @@ func NewManager(observer *bus.Observer) *Manager {
HeadFilter{},
)

manager.gasPrice = NewChannel(
gasPriceProcessor,
GasPriceFilter{},
)

return manager
}

Expand Down Expand Up @@ -131,6 +138,8 @@ func (manager *Manager) AddClientToChannel(channel string, client *Client) {
manager.head.AddClient(client)
case ChannelBlocks:
manager.blocks.AddClient(client)
case ChannelGasPrice:
manager.gasPrice.AddClient(client)
default:
log.Error().Str("channel", channel).Msg("unknown channel name")
}
Expand All @@ -142,7 +151,13 @@ func (manager *Manager) RemoveClientFromChannel(channel string, client *Client)
manager.head.RemoveClient(client.id)
case ChannelBlocks:
manager.blocks.RemoveClient(client.id)
case ChannelGasPrice:
manager.gasPrice.RemoveClient(client.id)
default:
log.Error().Str("channel", channel).Msg("unknown channel name")
}
}

func (manager *Manager) GasTrackerHandler(_ context.Context, state gas.GasPrice) error {
return manager.gasPrice.processMessage(state)
}
14 changes: 11 additions & 3 deletions cmd/api/handler/websocket/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ const (

// channels
const (
ChannelHead = "head"
ChannelBlocks = "blocks"
ChannelHead = "head"
ChannelBlocks = "blocks"
ChannelGasPrice = "gas_price"
)

type Message struct {
Expand All @@ -40,7 +41,7 @@ type TransactionFilters struct {
}

type INotification interface {
*responses.Block | *responses.State
*responses.Block | *responses.State | *responses.GasPrice
}

type Notification[T INotification] struct {
Expand All @@ -61,3 +62,10 @@ func NewStateNotification(state responses.State) Notification[*responses.State]
Body: &state,
}
}

func NewGasPriceNotification(value responses.GasPrice) Notification[*responses.GasPrice] {
return Notification[*responses.GasPrice]{
Channel: ChannelGasPrice,
Body: &value,
}
}
9 changes: 9 additions & 0 deletions cmd/api/handler/websocket/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package websocket

import (
"github.com/celenium-io/celestia-indexer/cmd/api/gas"
"github.com/celenium-io/celestia-indexer/cmd/api/handler/responses"
"github.com/celenium-io/celestia-indexer/internal/storage"
)
Expand All @@ -17,3 +18,11 @@ func headProcessor(state storage.State) Notification[*responses.State] {
response := responses.NewState(state)
return NewStateNotification(response)
}

func gasPriceProcessor(data gas.GasPrice) Notification[*responses.GasPrice] {
return NewGasPriceNotification(responses.GasPrice{
Slow: data.Slow,
Median: data.Median,
Fast: data.Fast,
})
}
3 changes: 3 additions & 0 deletions cmd/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ var (
func initWebsocket(ctx context.Context, group *echo.Group) {
observer := dispatcher.Observe(storage.ChannelHead, storage.ChannelBlock)
wsManager = websocket.NewManager(observer)
if gasTracker != nil {
gasTracker.SubscribeOnCompute(wsManager.GasTrackerHandler)
}
wsManager.Start(ctx)
group.GET("/ws", wsManager.Handle)
}
Expand Down
15 changes: 14 additions & 1 deletion cmd/api/markdown/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ To receive updates from websocket API send `subscribe` request to server.
}
```

Now 2 channels are supported:
Now 3 channels are supported:

* `head` - receive information about indexer state. Channel does not have any filters. Subscribe message should looks like:

Expand Down Expand Up @@ -55,6 +55,19 @@ Notification body of `responses.State` type will be sent to the channel.

Notification body of `responses.Block` type will be sent to the channel.

* `gas_price` - receive information about current gas price. Channel does not have any filters. Subscribe message should looks like:

```json
{
"method": "subscribe",
"body": {
"channel": "gas_price"
}
}
```

Notification body of `responses.GasPrice` type will be sent to the channel.


### Unsubscribe

Expand Down
8 changes: 4 additions & 4 deletions pkg/indexer/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,10 @@ func (module *Module) processBlockInTransaction(ctx context.Context, tx storage.
}

func (module *Module) notify(ctx context.Context, state storage.State, block storage.Block) error {
if time.Since(block.Time) > time.Hour {
// do not notify all about events if initial indexing is in progress
return nil
}
// if time.Since(block.Time) > time.Hour {
// // do not notify all about events if initial indexing is in progress
// return nil
// }

rawState, err := jsoniter.MarshalToString(state)
if err != nil {
Expand Down

0 comments on commit f85b0eb

Please sign in to comment.