Skip to content

Commit

Permalink
Merge pull request onflow#6636 from The-K-R-O-K/UlyanaAndrukhiv/6585-…
Browse files Browse the repository at this point in the history
…block-data-provider

[Access] Add implementation BlockProvider
  • Loading branch information
peterargue authored Dec 3, 2024
2 parents 85913ad + f8668ef commit 533123a
Show file tree
Hide file tree
Showing 40 changed files with 1,463 additions and 276 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ generate-mocks: install-mock-generators
mockery --name 'API' --dir="./engine/protocol" --case=underscore --output="./engine/protocol/mock" --outpkg="mock"
mockery --name '.*' --dir="./engine/access/state_stream" --case=underscore --output="./engine/access/state_stream/mock" --outpkg="mock"
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_provider" --case=underscore --output="./engine/access/rest/websockets/data_provider/mock" --outpkg="mock"
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
mockery --name 'DataProviderFactory' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"
Expand Down
20 changes: 10 additions & 10 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func (h *Handler) SubscribeBlocksFromStartBlockID(request *access.SubscribeBlock
}

sub := h.api.SubscribeBlocksFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// SubscribeBlocksFromStartHeight handles subscription requests for blocks started from block height.
Expand All @@ -1093,7 +1093,7 @@ func (h *Handler) SubscribeBlocksFromStartHeight(request *access.SubscribeBlocks
}

sub := h.api.SubscribeBlocksFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// SubscribeBlocksFromLatest handles subscription requests for blocks started from latest sealed block.
Expand All @@ -1120,7 +1120,7 @@ func (h *Handler) SubscribeBlocksFromLatest(request *access.SubscribeBlocksFromL
}

sub := h.api.SubscribeBlocksFromLatest(stream.Context(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
return subscription.HandleRPCSubscription(sub, h.handleBlocksResponse(stream.Send, request.GetFullBlockResponse(), blockStatus))
}

// handleBlocksResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1179,7 +1179,7 @@ func (h *Handler) SubscribeBlockHeadersFromStartBlockID(request *access.Subscrib
}

sub := h.api.SubscribeBlockHeadersFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// SubscribeBlockHeadersFromStartHeight handles subscription requests for block headers started from block height.
Expand All @@ -1206,7 +1206,7 @@ func (h *Handler) SubscribeBlockHeadersFromStartHeight(request *access.Subscribe
}

sub := h.api.SubscribeBlockHeadersFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// SubscribeBlockHeadersFromLatest handles subscription requests for block headers started from latest sealed block.
Expand All @@ -1233,7 +1233,7 @@ func (h *Handler) SubscribeBlockHeadersFromLatest(request *access.SubscribeBlock
}

sub := h.api.SubscribeBlockHeadersFromLatest(stream.Context(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockHeadersResponse(stream.Send))
}

// handleBlockHeadersResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (h *Handler) SubscribeBlockDigestsFromStartBlockID(request *access.Subscrib
}

sub := h.api.SubscribeBlockDigestsFromStartBlockID(stream.Context(), startBlockID, blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// SubscribeBlockDigestsFromStartHeight handles subscription requests for lightweight blocks started from block height.
Expand All @@ -1320,7 +1320,7 @@ func (h *Handler) SubscribeBlockDigestsFromStartHeight(request *access.Subscribe
}

sub := h.api.SubscribeBlockDigestsFromStartHeight(stream.Context(), request.GetStartBlockHeight(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// SubscribeBlockDigestsFromLatest handles subscription requests for lightweight block started from latest sealed block.
Expand All @@ -1347,7 +1347,7 @@ func (h *Handler) SubscribeBlockDigestsFromLatest(request *access.SubscribeBlock
}

sub := h.api.SubscribeBlockDigestsFromLatest(stream.Context(), blockStatus)
return subscription.HandleSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
return subscription.HandleRPCSubscription(sub, h.handleBlockDigestsResponse(stream.Send))
}

// handleBlockDigestsResponse handles the subscription to block updates and sends
Expand Down Expand Up @@ -1433,7 +1433,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
sub := h.api.SubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())

messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
Expand Down
24 changes: 24 additions & 0 deletions engine/access/rest/common/parser/block_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package parser

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

// Finalized and Sealed represents the status of a block.
// It is used in rest arguments to provide block status.
const (
Finalized = "finalized"
Sealed = "sealed"
)

func ParseBlockStatus(blockStatus string) (flow.BlockStatus, error) {
switch blockStatus {
case Finalized:
return flow.BlockStatusFinalized, nil
case Sealed:
return flow.BlockStatusSealed, nil
}
return flow.BlockStatusUnknown, fmt.Errorf("invalid 'block_status', must be '%s' or '%s'", Finalized, Sealed)
}
39 changes: 39 additions & 0 deletions engine/access/rest/common/parser/block_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package parser

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/model/flow"
)

// TestParseBlockStatus_Invalid tests the ParseBlockStatus function with invalid inputs.
// It verifies that for each invalid block status string, the function returns an error
// matching the expected error message format.
func TestParseBlockStatus_Invalid(t *testing.T) {
tests := []string{"unknown", "pending", ""}
expectedErr := fmt.Sprintf("invalid 'block_status', must be '%s' or '%s'", Finalized, Sealed)

for _, input := range tests {
_, err := ParseBlockStatus(input)
assert.EqualError(t, err, expectedErr)
}
}

// TestParseBlockStatus_Valid tests the ParseBlockStatus function with valid inputs.
// It ensures that the function returns the correct flow.BlockStatus for valid status
// strings "finalized" and "sealed" without errors.
func TestParseBlockStatus_Valid(t *testing.T) {
tests := map[string]flow.BlockStatus{
Finalized: flow.BlockStatusFinalized,
Sealed: flow.BlockStatusSealed,
}

for input, expectedStatus := range tests {
status, err := ParseBlockStatus(input)
assert.NoError(t, err)
assert.Equal(t, expectedStatus, status)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package request
package parser

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package request
package parser

import (
"testing"
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -122,7 +123,7 @@ func (g *GetBlockByIDs) Build(r *common.Request) error {
}

func (g *GetBlockByIDs) Parse(rawIds []string) error {
var ids IDs
var ids parser.IDs
err := ids.Parse(rawIds)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
}
g.EndHeight = height.Flow()

var blockIDs IDs
var blockIDs parser.IDs
err = blockIDs.Parse(rawBlockIDs)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_execution_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand All @@ -30,7 +31,7 @@ func (g *GetExecutionResultByBlockIDs) Build(r *common.Request) error {
}

func (g *GetExecutionResultByBlockIDs) Parse(rawIDs []string) error {
var ids IDs
var ids parser.IDs
err := ids.Parse(rawIDs)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/get_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -42,7 +43,7 @@ func (g *GetScript) Parse(rawHeight string, rawID string, rawScript io.Reader) e
}
g.BlockHeight = height.Flow()

var id ID
var id parser.ID
err = id.Parse(rawID)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions engine/access/rest/http/request/get_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package request

import (
"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand All @@ -15,14 +16,14 @@ type TransactionOptionals struct {
}

func (t *TransactionOptionals) Parse(r *common.Request) error {
var blockId ID
var blockId parser.ID
err := blockId.Parse(r.GetQueryParam(blockIDQueryParam))
if err != nil {
return err
}
t.BlockID = blockId.Flow()

var collectionId ID
var collectionId parser.ID
err = collectionId.Parse(r.GetQueryParam(collectionIDQueryParam))
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/model/flow"
)

Expand Down Expand Up @@ -60,7 +61,7 @@ func (g *GetByIDRequest) Build(r *common.Request) error {
}

func (g *GetByIDRequest) Parse(rawID string) error {
var id ID
var id parser.ID
err := id.Parse(rawID)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rest/http/request/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"

"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/util"
"github.com/onflow/flow-go/engine/common/rpc/convert"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (t *Transaction) Parse(raw io.Reader, chain flow.Chain) error {
return fmt.Errorf("invalid transaction script encoding")
}

var blockID ID
var blockID parser.ID
err = blockID.Parse(tx.ReferenceBlockId)
if err != nil {
return fmt.Errorf("invalid reference block ID: %w", err)
Expand Down
1 change: 0 additions & 1 deletion engine/access/rest/http/routes/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/common"

"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/http/request"
)
Expand Down
6 changes: 3 additions & 3 deletions engine/access/rest/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
flowhttp "github.com/onflow/flow-go/engine/access/rest/http"
"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/websockets"
dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
legacyws "github.com/onflow/flow-go/engine/access/rest/websockets/legacy"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
Expand Down Expand Up @@ -89,11 +90,10 @@ func (b *RouterBuilder) AddLegacyWebsocketsRoutes(
func (b *RouterBuilder) AddWebsocketsRoute(
chain flow.Chain,
config websockets.Config,
streamApi state_stream.API,
streamConfig backend.Config,
maxRequestSize int64,
dataProviderFactory dp.DataProviderFactory,
) *RouterBuilder {
handler := websockets.NewWebSocketHandler(b.logger, config, chain, streamApi, streamConfig, maxRequestSize)
handler := websockets.NewWebSocketHandler(b.logger, config, chain, maxRequestSize, dataProviderFactory)
b.v1SubRouter.
Methods(http.MethodGet).
Path("/ws").
Expand Down
4 changes: 3 additions & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets"
dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -50,7 +51,8 @@ func NewServer(serverAPI access.API,
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
}

builder.AddWebsocketsRoute(chain, wsConfig, stateStreamApi, stateStreamConfig, config.MaxRequestSize)
dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI)
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)

c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
Expand Down
Loading

0 comments on commit 533123a

Please sign in to comment.