Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add implementation BlockProvider #6636

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7359754
Added basic factory and data providers for blocks
UlyanaAndrukhiv Nov 10, 2024
9f91540
Added godoc for DataProvider interface
UlyanaAndrukhiv Nov 11, 2024
7502a79
Added godoc for factory
UlyanaAndrukhiv Nov 11, 2024
c30b1a0
Refacored BlockProvider, added part of unit tests to provider factor…
UlyanaAndrukhiv Nov 12, 2024
67a184a
Updated last commit
UlyanaAndrukhiv Nov 12, 2024
e9c6ac8
Redactored HandleSubscription to use it in the rest subscription data…
UlyanaAndrukhiv Nov 12, 2024
5fdece5
Added missed package name for block provider
UlyanaAndrukhiv Nov 12, 2024
7c50eab
Added test for suppoted topics for data provoder factory
UlyanaAndrukhiv Nov 12, 2024
5390d1a
Added godoc to test
UlyanaAndrukhiv Nov 12, 2024
1d63e2d
Updated tests to check topic and subscription id, updated godoc
UlyanaAndrukhiv Nov 12, 2024
abfdd65
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 12, 2024
c33dfa2
Linted
UlyanaAndrukhiv Nov 12, 2024
7d60c33
Updated context for creating subscription for block data provider
UlyanaAndrukhiv Nov 12, 2024
22ad469
Updated BlockMessageResponse, added dogoc for constant values, remove…
UlyanaAndrukhiv Nov 13, 2024
a9fe159
Updated accoeding to comments
UlyanaAndrukhiv Nov 13, 2024
286d3d7
Update topics order
UlyanaAndrukhiv Nov 13, 2024
17eab5f
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 14, 2024
2b07afd
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 14, 2024
62eeb8e
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 18, 2024
08ec8cf
Added implementation of block headers data provider, added unit tests
UlyanaAndrukhiv Nov 19, 2024
16100da
Added implementation of block digests data provider, added unit tests
UlyanaAndrukhiv Nov 19, 2024
2fa0767
Updated according to comments
UlyanaAndrukhiv Nov 19, 2024
c339193
Updated TestSupportedTopics unit test
UlyanaAndrukhiv Nov 19, 2024
88df09c
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 19, 2024
50097fa
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 20, 2024
9d4e587
Merged with master, refactored according to new version
UlyanaAndrukhiv Nov 22, 2024
9cc1301
Updated godoc, fixed warning with naming
UlyanaAndrukhiv Nov 22, 2024
64716a5
Updated mocks
UlyanaAndrukhiv Nov 22, 2024
2644db8
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 26, 2024
5581550
Moved Run to separate goroutine, paralleled test as suggested
UlyanaAndrukhiv Nov 26, 2024
399b604
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
UlyanaAndrukhiv Nov 26, 2024
c5a9f17
Added names to interface arguments, created Arguments type for websoc…
UlyanaAndrukhiv Nov 26, 2024
48842ee
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Nov 27, 2024
3cb67c4
Updated ParseBlocksArguments for block data provider
UlyanaAndrukhiv Nov 27, 2024
31f9874
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
UlyanaAndrukhiv Nov 27, 2024
b9abf68
Updated comments and messages
UlyanaAndrukhiv Nov 29, 2024
1a2698f
Removed storing arguments in the data providers
UlyanaAndrukhiv Nov 29, 2024
ff3851b
Refactored base data provider according to comments
UlyanaAndrukhiv Nov 29, 2024
f8d711f
Added happy case unit tests for providers
UlyanaAndrukhiv Dec 2, 2024
437af4f
Added additional description for websocket data providers Run method
UlyanaAndrukhiv Dec 2, 2024
1f2f0ae
Added generic HandleResponse, updated providers
UlyanaAndrukhiv Dec 2, 2024
cb5db1d
Merge branch 'master' into UlyanaAndrukhiv/6585-block-data-provider
UlyanaAndrukhiv Dec 3, 2024
f8668ef
Update block data provider test
UlyanaAndrukhiv Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func (h *Handler) SubscribeBlocksFromStartBlockID(request *access.SubscribeBlock
}

sub := h.api.SubscribeBlocksFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// SubscribeBlocksFromStartHeight handles subscription requests for blocks started from block height.
Expand All @@ -1093,7 +1093,7 @@ func (h *Handler) SubscribeBlocksFromStartHeight(request *access.SubscribeBlocks
}

sub := h.api.SubscribeBlocksFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// SubscribeBlocksFromLatest handles subscription requests for blocks started from latest sealed block.
Expand All @@ -1120,7 +1120,7 @@ func (h *Handler) SubscribeBlocksFromLatest(request *access.SubscribeBlocksFromL
}

sub := h.api.SubscribeBlocksFromLatest(stream.Context(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// handleBlocksResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1179,7 +1179,7 @@ func (h *Handler) SubscribeBlockHeadersFromStartBlockID(request *access.Subscrib
}

sub := h.api.SubscribeBlockHeadersFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// SubscribeBlockHeadersFromStartHeight handles subscription requests for block headers started from block height.
Expand All @@ -1206,7 +1206,7 @@ func (h *Handler) SubscribeBlockHeadersFromStartHeight(request *access.Subscribe
}

sub := h.api.SubscribeBlockHeadersFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// SubscribeBlockHeadersFromLatest handles subscription requests for block headers started from latest sealed block.
Expand All @@ -1233,7 +1233,7 @@ func (h *Handler) SubscribeBlockHeadersFromLatest(request *access.SubscribeBlock
}

sub := h.api.SubscribeBlockHeadersFromLatest(stream.Context(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// handleBlockHeadersResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (h *Handler) SubscribeBlockDigestsFromStartBlockID(request *access.Subscrib
}

sub := h.api.SubscribeBlockDigestsFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// SubscribeBlockDigestsFromStartHeight handles subscription requests for lightweight blocks started from block height.
Expand All @@ -1320,7 +1320,7 @@ func (h *Handler) SubscribeBlockDigestsFromStartHeight(request *access.Subscribe
}

sub := h.api.SubscribeBlockDigestsFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// SubscribeBlockDigestsFromLatest handles subscription requests for lightweight block started from latest sealed block.
Expand All @@ -1347,7 +1347,7 @@ func (h *Handler) SubscribeBlockDigestsFromLatest(request *access.SubscribeBlock
}

sub := h.api.SubscribeBlockDigestsFromLatest(stream.Context(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// handleBlockDigestsResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1433,7 +1433,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
Expand Down
24 changes: 24 additions & 0 deletions engine/access/rest/common/parser/block_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package parser

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// Finalized and Sealed represents the status of a block.
// It is used in rest arguments to provide block status.
const (
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
Finalized = "finalized"
Sealed = "sealed"
)

func ParseBlockStatus(blockStatus string) (flow.BlockStatus, error) {
switch blockStatus {
case Finalized:
return flow.BlockStatusFinalized, nil
case Sealed:
return flow.BlockStatusSealed, nil
}
return flow.BlockStatusUnknown, fmt.Errorf("invalid 'block_status', must be '%s' or '%s'", Finalized, Sealed)
}
39 changes: 39 additions & 0 deletions engine/access/rest/common/parser/block_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package parser

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/model/flow"
)

// TestParseBlockStatus_Invalid tests the ParseBlockStatus function with invalid inputs.
// It verifies that for each invalid block status string, the function returns an error
// matching the expected error message format.
func TestParseBlockStatus_Invalid(t *testing.T) {
tests := []string{"unknown", "pending", ""}
expectedErr := fmt.Sprintf("invalid 'block_status', must be '%s' or '%s'", Finalized, Sealed)

for _, input := range tests {
_, err := ParseBlockStatus(input)
assert.EqualError(t, err, expectedErr)
}
}

// TestParseBlockStatus_Valid tests the ParseBlockStatus function with valid inputs.
// It ensures that the function returns the correct flow.BlockStatus for valid status
// strings "finalized" and "sealed" without errors.
func TestParseBlockStatus_Valid(t *testing.T) {
tests := map[string]flow.BlockStatus{
Finalized: flow.BlockStatusFinalized,
Sealed: flow.BlockStatusSealed,
}

for input, expectedStatus := range tests {
status, err := ParseBlockStatus(input)
assert.NoError(t, err)
assert.Equal(t, expectedStatus, status)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package request
package parser

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package request
package parser

import (
"testing"
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -122,7 +123,7 @@ func (g *GetBlockByIDs) Build(r *common.Request) error {
}

func (g *GetBlockByIDs) Parse(rawIds []string) error {
var ids IDs
var ids parser.IDs
err := ids.Parse(rawIds)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
}
g.EndHeight = height.Flow()

var blockIDs IDs
var blockIDs parser.IDs
err = blockIDs.Parse(rawBlockIDs)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_execution_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand All @@ -30,7 +31,7 @@ func (g *GetExecutionResultByBlockIDs) Build(r *common.Request) error {
}

func (g *GetExecutionResultByBlockIDs) Parse(rawIDs []string) error {
var ids IDs
var ids parser.IDs
err := ids.Parse(rawIDs)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -42,7 +43,7 @@ func (g *GetScript) Parse(rawHeight string, rawID string, rawScript io.Reader) e
}
g.BlockHeight = height.Flow()

var id ID
var id parser.ID
err = id.Parse(rawID)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions engine/access/rest/http/request/get_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package request

import (
"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand All @@ -15,14 +16,14 @@ type TransactionOptionals struct {
}

func (t *TransactionOptionals) Parse(r *common.Request) error {
var blockId ID
var blockId parser.ID
err := blockId.Parse(r.GetQueryParam(blockIDQueryParam))
if err != nil {
return err
}
t.BlockID = blockId.Flow()

var collectionId ID
var collectionId parser.ID
err = collectionId.Parse(r.GetQueryParam(collectionIDQueryParam))
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func (g *GetByIDRequest) Build(r *common.Request) error {
}

func (g *GetByIDRequest) Parse(rawID string) error {
var id ID
var id parser.ID
err := id.Parse(rawID)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"

"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/util"
"github.com/onflow/flow-go/engine/common/rpc/convert"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (t *Transaction) Parse(raw io.Reader, chain flow.Chain) error {
return fmt.Errorf("invalid transaction script encoding")
}

var blockID ID
var blockID parser.ID
err = blockID.Parse(tx.ReferenceBlockId)
if err != nil {
return fmt.Errorf("invalid reference block ID: %w", err)
Expand Down
1 change: 0 additions & 1 deletion engine/access/rest/http/routes/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/common"

"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/http/request"
)
Expand Down
4 changes: 4 additions & 0 deletions engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -48,6 +49,9 @@ func NewServer(serverAPI access.API,
builder.AddWsLegacyRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
}

// TODO: add new websocket routes
_ = data_providers.NewDataProviderFactory(logger, stateStreamConfig.EventFilterConfig, stateStreamApi, serverAPI)

c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedHeaders: []string{"*"},
Expand Down
59 changes: 59 additions & 0 deletions engine/access/rest/websockets/data_providers/base_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package data_providers

import (
"context"

"github.com/onflow/flow-go/engine/access/subscription"
)

// BaseDataProvider defines the basic interface for a data provider. It provides methods
// for retrieving the provider's unique ID, topic, and a method to close the provider.
type BaseDataProvider interface {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
// ID returns the unique identifier of subscription in the data provider.
ID() string
// Topic returns the topic associated with the data provider.
Topic() string
// Close terminates the data provider.
Close()
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
}

var _ BaseDataProvider = (*BaseDataProviderImpl)(nil)
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved

// BaseDataProviderImpl is the concrete implementation of the BaseDataProvider interface.
// It holds common objects for the provider.
type BaseDataProviderImpl struct {
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
topic string
cancel context.CancelFunc
send chan<- interface{}
subscription subscription.Subscription
}

// NewBaseDataProviderImpl creates a new instance of BaseDataProviderImpl.
func NewBaseDataProviderImpl(
cancel context.CancelFunc,
topic string,
send chan<- interface{},
subscription subscription.Subscription,
) *BaseDataProviderImpl {
return &BaseDataProviderImpl{
topic: topic,
cancel: cancel,
send: send,
subscription: subscription,
}
}

// ID returns the unique identifier of the data provider's subscription.
func (b *BaseDataProviderImpl) ID() string {
return b.subscription.ID()
}

// Topic returns the topic associated with the data provider.
func (b *BaseDataProviderImpl) Topic() string {
return b.topic
}

// Close terminates the data provider.
func (b *BaseDataProviderImpl) Close() {
b.cancel()
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading