diff --git a/cmd/api/bus/dispatcher.go b/cmd/api/bus/dispatcher.go index 07c4b581..ca9e2601 100644 --- a/cmd/api/bus/dispatcher.go +++ b/cmd/api/bus/dispatcher.go @@ -89,7 +89,7 @@ func (d *Dispatcher) listen(ctx context.Context) { return } if notification == nil { - log.Warn().Str("channel", notification.Channel).Msg("nil notification") + log.Warn().Msg("nil notification") continue } if err := d.handleNotification(ctx, notification); err != nil { diff --git a/cmd/api/docs/swagger.json b/cmd/api/docs/swagger.json index c8e264a5..54003bb0 100644 --- a/cmd/api/docs/swagger.json +++ b/cmd/api/docs/swagger.json @@ -2574,7 +2574,7 @@ "time", "blobs_count", "size", - "\"fee\"" + "fee" ], "type": "string", "description": "Sort field. Default: size", @@ -2634,6 +2634,83 @@ } } }, + "/rollup/day": { + "get": { + "description": "List rollups info with stats by previous 24 hours", + "produces": [ + "application/json" + ], + "tags": [ + "rollup" + ], + "summary": "List rollups info with stats by previous 24 hours", + "operationId": "list-rollup-24h", + "parameters": [ + { + "maximum": 100, + "type": "integer", + "description": "Count of requested entities", + "name": "limit", + "in": "query" + }, + { + "type": "integer", + "description": "Offset", + "name": "offset", + "in": "query" + }, + { + "enum": [ + "asc", + "desc" + ], + "type": "string", + "description": "Sort order. Default: desc", + "name": "sort", + "in": "query" + }, + { + "enum": [ + "avg_size", + "blobs_count", + "total_size", + "total_fee", + "throughput", + "namespace_count", + "pfb_count", + "mb_price" + ], + "type": "string", + "description": "Sort field. Default: mb_price", + "name": "sort_by", + "in": "query" + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/responses.RollupWithDayStats" + } + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/handler.Error" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/handler.Error" + } + } + } + } + }, "/rollup/slug/{slug}": { "get": { "description": "Get rollup by slug", @@ -4933,7 +5010,7 @@ }, "/ws": { "get": { - "description": "## Documentation for websocket API\n\n### Notification\n\nThe structure of notification is following in all channels:\n\n```json\n{\n \"channel\": \"channel_name\",\n \"body\": \"\u003cobject or array\u003e\" // depends on channel\n}\n```\n\n### Subscribe\n\nTo receive updates from websocket API send `subscribe` request to server.\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"\u003cCHANNEL_NAME\u003e\",\n \"filters\": {\n // pass channel filters\n }\n }\n}\n```\n\nNow 2 channels are supported:\n\n* `head` - receive information about indexer state. Channel does not have any filters. Subscribe message should looks like:\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"head\"\n }\n}\n```\n\nNotification body of `responses.State` type will be sent to the channel.\n\n* `blocks` - receive information about new blocks. Channel does not have any filters. Subscribe message should looks like:\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"blocks\"\n }\n}\n```\n\nNotification body of `responses.Block` type will be sent to the channel.\n\n\n### Unsubscribe\n\nTo unsubscribe send `unsubscribe` message containing one of channel name describing above.\n\n\n```json\n{\n \"method\": \"unsubscribe\",\n \"body\": {\n \"channel\": \"\u003cCHANNEL_NAME\u003e\",\n }\n}\n```\n", + "description": "## Documentation for websocket API\n\n### Notification\n\nThe structure of notification is following in all channels:\n\n```json\n{\n \"channel\": \"channel_name\",\n \"body\": \"\u003cobject or array\u003e\" // depends on channel\n}\n```\n\n### Subscribe\n\nTo receive updates from websocket API send `subscribe` request to server.\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"\u003cCHANNEL_NAME\u003e\",\n \"filters\": {\n // pass channel filters\n }\n }\n}\n```\n\nNow 3 channels are supported:\n\n* `head` - receive information about indexer state. Channel does not have any filters. Subscribe message should looks like:\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"head\"\n }\n}\n```\n\nNotification body of `responses.State` type will be sent to the channel.\n\n* `blocks` - receive information about new blocks. Channel does not have any filters. Subscribe message should looks like:\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"blocks\"\n }\n}\n```\n\nNotification body of `responses.Block` type will be sent to the channel.\n\n* `gas_price` - receive information about current gas price. Channel does not have any filters. Subscribe message should looks like:\n\n```json\n{\n \"method\": \"subscribe\",\n \"body\": {\n \"channel\": \"gas_price\"\n }\n}\n```\n\nNotification body of `responses.GasPrice` type will be sent to the channel.\n\n\n### Unsubscribe\n\nTo unsubscribe send `unsubscribe` message containing one of channel name describing above.\n\n\n```json\n{\n \"method\": \"unsubscribe\",\n \"body\": {\n \"channel\": \"\u003cCHANNEL_NAME\u003e\",\n }\n}\n```\n", "produces": [ "application/json" ], @@ -6032,6 +6109,121 @@ } } }, + "responses.RollupWithDayStats": { + "type": "object", + "properties": { + "avg_size": { + "type": "integer", + "format": "integer", + "example": 100 + }, + "blobs_count": { + "type": "integer", + "format": "integer", + "example": 100 + }, + "blobs_per_pfb": { + "type": "number", + "format": "float", + "example": 100 + }, + "bridge": { + "type": "string", + "format": "string", + "example": "https://github.com/account" + }, + "description": { + "type": "string", + "format": "string", + "example": "Long rollup description" + }, + "explorer": { + "type": "string", + "format": "string", + "example": "https://explorer.karak.network/" + }, + "fee_per_pfb": { + "type": "string", + "format": "string", + "example": "100" + }, + "github": { + "type": "string", + "format": "string", + "example": "https://github.com/account" + }, + "id": { + "type": "integer", + "format": "integer", + "example": 321 + }, + "l2_beat": { + "type": "string", + "format": "string", + "example": "https://l2beat.com/scaling/projects/karak" + }, + "logo": { + "type": "string", + "format": "string", + "example": "https://some_link.com/image.png" + }, + "mb_price": { + "type": "string", + "format": "string", + "example": "100" + }, + "name": { + "type": "string", + "format": "string", + "example": "Rollup name" + }, + "namespace_count": { + "type": "integer", + "format": "integer", + "example": 100 + }, + "pfb_count": { + "type": "integer", + "format": "integer", + "example": 100 + }, + "slug": { + "type": "string", + "format": "string", + "example": "rollup_slug" + }, + "stack": { + "type": "string", + "format": "string", + "example": "op_stack" + }, + "throughput": { + "type": "integer", + "format": "integer", + "example": 100 + }, + "total_fee": { + "type": "string", + "format": "string", + "example": "100" + }, + "total_size": { + "type": "integer", + "format": "integer", + "example": 100 + }, + "twitter": { + "type": "string", + "format": "string", + "example": "https://x.com/account" + }, + "website": { + "type": "string", + "format": "string", + "example": "https://website.com" + } + } + }, "responses.RollupWithStats": { "type": "object", "properties": { diff --git a/cmd/api/gas/tracker.go b/cmd/api/gas/tracker.go index 46ac8a92..4f46749f 100644 --- a/cmd/api/gas/tracker.go +++ b/cmd/api/gas/tracker.go @@ -29,6 +29,8 @@ var ( maxBlockSize = coreTypes.MaxDataBytesNoEvidence(1974272, 100) ) +type ComputeHandler func(ctx context.Context, gasState GasPrice) error + type Tracker struct { state storage.IState stats storage.IBlockStats @@ -39,6 +41,8 @@ type Tracker struct { gasState GasPrice q *queue g workerpool.Group + + computeHandler ComputeHandler } func NewTracker( @@ -64,6 +68,10 @@ func NewTracker( } } +func (tracker *Tracker) SubscribeOnCompute(handler ComputeHandler) { + tracker.computeHandler = handler +} + func (tracker *Tracker) Start(ctx context.Context) { tracker.g.GoCtx(ctx, tracker.listen) } @@ -90,6 +98,14 @@ func (tracker *Tracker) listen(ctx context.Context) { if err := tracker.computeMetrics(); err != nil { log.Err(err).Msg("compute metrics") } + + if tracker.computeHandler != nil { + tracker.g.GoCtx(ctx, func(ctx context.Context) { + if err := tracker.computeHandler(ctx, tracker.gasState); err != nil { + log.Err(err).Msg("error in compute handler of gas tracker") + } + }) + } } } } diff --git a/cmd/api/handler/websocket/client.go b/cmd/api/handler/websocket/client.go index 7b525179..28c1c6c8 100644 --- a/cmd/api/handler/websocket/client.go +++ b/cmd/api/handler/websocket/client.go @@ -82,6 +82,8 @@ func (c *Client) ApplyFilters(msg Subscribe) error { c.filters.head = true case ChannelBlocks: c.filters.blocks = true + case ChannelGasPrice: + c.filters.gasPrice = true default: return errors.Wrap(ErrUnknownChannel, msg.Channel) } @@ -97,6 +99,8 @@ func (c *Client) DetachFilters(msg Unsubscribe) error { c.filters.head = false case ChannelBlocks: c.filters.blocks = false + case ChannelGasPrice: + c.filters.gasPrice = false default: return errors.Wrap(ErrUnknownChannel, msg.Channel) } diff --git a/cmd/api/handler/websocket/filters.go b/cmd/api/handler/websocket/filters.go index a75ca8b8..7a0d475e 100644 --- a/cmd/api/handler/websocket/filters.go +++ b/cmd/api/handler/websocket/filters.go @@ -37,7 +37,21 @@ func (f HeadFilter) Filter(c client, msg Notification[*responses.State]) bool { return fltrs.head } +type GasPriceFilter struct{} + +func (f GasPriceFilter) Filter(c client, msg Notification[*responses.GasPrice]) bool { + if msg.Body == nil { + return false + } + fltrs := c.Filters() + if fltrs == nil { + return false + } + return fltrs.gasPrice +} + type Filters struct { - head bool - blocks bool + head bool + blocks bool + gasPrice bool } diff --git a/cmd/api/handler/websocket/manager.go b/cmd/api/handler/websocket/manager.go index 83b2617e..0f0c021a 100644 --- a/cmd/api/handler/websocket/manager.go +++ b/cmd/api/handler/websocket/manager.go @@ -12,6 +12,7 @@ import ( sdkSync "github.com/dipdup-net/indexer-sdk/pkg/sync" "github.com/celenium-io/celestia-indexer/cmd/api/bus" + "github.com/celenium-io/celestia-indexer/cmd/api/gas" "github.com/celenium-io/celestia-indexer/cmd/api/handler/responses" "github.com/celenium-io/celestia-indexer/internal/storage" "github.com/gorilla/websocket" @@ -25,8 +26,9 @@ type Manager struct { clients *sdkSync.Map[uint64, *Client] observer *bus.Observer - blocks *Channel[storage.Block, *responses.Block] - head *Channel[storage.State, *responses.State] + blocks *Channel[storage.Block, *responses.Block] + head *Channel[storage.State, *responses.State] + gasPrice *Channel[gas.GasPrice, *responses.GasPrice] g workerpool.Group } @@ -56,6 +58,11 @@ func NewManager(observer *bus.Observer) *Manager { HeadFilter{}, ) + manager.gasPrice = NewChannel( + gasPriceProcessor, + GasPriceFilter{}, + ) + return manager } @@ -131,6 +138,8 @@ func (manager *Manager) AddClientToChannel(channel string, client *Client) { manager.head.AddClient(client) case ChannelBlocks: manager.blocks.AddClient(client) + case ChannelGasPrice: + manager.gasPrice.AddClient(client) default: log.Error().Str("channel", channel).Msg("unknown channel name") } @@ -142,7 +151,13 @@ func (manager *Manager) RemoveClientFromChannel(channel string, client *Client) manager.head.RemoveClient(client.id) case ChannelBlocks: manager.blocks.RemoveClient(client.id) + case ChannelGasPrice: + manager.gasPrice.RemoveClient(client.id) default: log.Error().Str("channel", channel).Msg("unknown channel name") } } + +func (manager *Manager) GasTrackerHandler(_ context.Context, state gas.GasPrice) error { + return manager.gasPrice.processMessage(state) +} diff --git a/cmd/api/handler/websocket/messages.go b/cmd/api/handler/websocket/messages.go index 12efd8b0..ee586b18 100644 --- a/cmd/api/handler/websocket/messages.go +++ b/cmd/api/handler/websocket/messages.go @@ -16,8 +16,9 @@ const ( // channels const ( - ChannelHead = "head" - ChannelBlocks = "blocks" + ChannelHead = "head" + ChannelBlocks = "blocks" + ChannelGasPrice = "gas_price" ) type Message struct { @@ -40,7 +41,7 @@ type TransactionFilters struct { } type INotification interface { - *responses.Block | *responses.State + *responses.Block | *responses.State | *responses.GasPrice } type Notification[T INotification] struct { @@ -61,3 +62,10 @@ func NewStateNotification(state responses.State) Notification[*responses.State] Body: &state, } } + +func NewGasPriceNotification(value responses.GasPrice) Notification[*responses.GasPrice] { + return Notification[*responses.GasPrice]{ + Channel: ChannelGasPrice, + Body: &value, + } +} diff --git a/cmd/api/handler/websocket/processors.go b/cmd/api/handler/websocket/processors.go index bd3ba307..36d223f6 100644 --- a/cmd/api/handler/websocket/processors.go +++ b/cmd/api/handler/websocket/processors.go @@ -4,6 +4,7 @@ package websocket import ( + "github.com/celenium-io/celestia-indexer/cmd/api/gas" "github.com/celenium-io/celestia-indexer/cmd/api/handler/responses" "github.com/celenium-io/celestia-indexer/internal/storage" ) @@ -17,3 +18,11 @@ func headProcessor(state storage.State) Notification[*responses.State] { response := responses.NewState(state) return NewStateNotification(response) } + +func gasPriceProcessor(data gas.GasPrice) Notification[*responses.GasPrice] { + return NewGasPriceNotification(responses.GasPrice{ + Slow: data.Slow, + Median: data.Median, + Fast: data.Fast, + }) +} diff --git a/cmd/api/init.go b/cmd/api/init.go index 87d67052..4b97045c 100644 --- a/cmd/api/init.go +++ b/cmd/api/init.go @@ -555,6 +555,9 @@ var ( func initWebsocket(ctx context.Context, group *echo.Group) { observer := dispatcher.Observe(storage.ChannelHead, storage.ChannelBlock) wsManager = websocket.NewManager(observer) + if gasTracker != nil { + gasTracker.SubscribeOnCompute(wsManager.GasTrackerHandler) + } wsManager.Start(ctx) group.GET("/ws", wsManager.Handle) } diff --git a/cmd/api/markdown/websocket.md b/cmd/api/markdown/websocket.md index 811180f3..ad186b21 100644 --- a/cmd/api/markdown/websocket.md +++ b/cmd/api/markdown/websocket.md @@ -27,7 +27,7 @@ To receive updates from websocket API send `subscribe` request to server. } ``` -Now 2 channels are supported: +Now 3 channels are supported: * `head` - receive information about indexer state. Channel does not have any filters. Subscribe message should looks like: @@ -55,6 +55,19 @@ Notification body of `responses.State` type will be sent to the channel. Notification body of `responses.Block` type will be sent to the channel. +* `gas_price` - receive information about current gas price. Channel does not have any filters. Subscribe message should looks like: + +```json +{ + "method": "subscribe", + "body": { + "channel": "gas_price" + } +} +``` + +Notification body of `responses.GasPrice` type will be sent to the channel. + ### Unsubscribe