Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add implementation for events data providers and account statuses data providers #6766

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
fed835a
Added skeleton for events data provider
AndriiDiachuk Nov 22, 2024
ea73371
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 22, 2024
9c10d24
Added initializing of events filter, added missing data to factory
AndriiDiachuk Nov 22, 2024
9547b5d
Fixed factory test
AndriiDiachuk Nov 22, 2024
b7f5ca7
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 25, 2024
0f34ae1
Added test skeleton for testing invalid arguments
AndriiDiachuk Nov 25, 2024
411f9e5
Added test for messageIndex check
AndriiDiachuk Nov 26, 2024
9a402de
Merged
AndriiDiachuk Nov 27, 2024
636740a
Added check for a valid event types in parse function
AndriiDiachuk Nov 28, 2024
f70c8e1
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 28, 2024
588688e
Changed type of arguments for consistency
AndriiDiachuk Nov 28, 2024
b537a5f
Added test case for event provider in factory_test
AndriiDiachuk Nov 28, 2024
4f63403
Added implementations for account provider functions
AndriiDiachuk Nov 29, 2024
867fcf7
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Nov 29, 2024
dca9a25
Fixed remarks
AndriiDiachuk Nov 29, 2024
867fc98
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Nov 29, 2024
9624894
Added test for invalid arguments and for message index
AndriiDiachuk Nov 29, 2024
309b148
Added check for starting index value
AndriiDiachuk Dec 2, 2024
756c21d
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 2, 2024
47a4c19
Added check for msgIndex
AndriiDiachuk Dec 2, 2024
8f1f99c
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Dec 2, 2024
0abe203
Merge branch 'UlyanaAndrukhiv/6585-block-data-provider' of github.com…
AndriiDiachuk Dec 2, 2024
8ffe023
changed handleResponse to generic
AndriiDiachuk Dec 2, 2024
1384405
Added happy path for testing all subscribe methods
AndriiDiachuk Dec 3, 2024
57a7c0f
Linted
AndriiDiachuk Dec 3, 2024
3e48960
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
peterargue Dec 3, 2024
e076072
Changed order of params
AndriiDiachuk Dec 4, 2024
89bc4c2
Fixed issues with params order
AndriiDiachuk Dec 4, 2024
1cc0a74
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiDi…
AndriiDiachuk Dec 4, 2024
a1d7aa7
Refactored parse function
AndriiDiachuk Dec 4, 2024
8d9e090
Using json arrays instead of comma separeted lists
AndriiDiachuk Dec 4, 2024
416ff58
Added heartbeat handling in handleResponse, updated type of expected …
AndriiDiachuk Dec 4, 2024
2239802
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 4, 2024
0e85d16
Merged
AndriiDiachuk Dec 5, 2024
179a664
Fixed small remarks
AndriiDiachuk Dec 5, 2024
e0aa808
Made parse function private
AndriiDiachuk Dec 5, 2024
19bf3c9
Changed Arguments type and refactored code
AndriiDiachuk Dec 5, 2024
8d0543d
Removed comment
AndriiDiachuk Dec 5, 2024
d6fc9df
Fixed parse function for event privoder
AndriiDiachuk Dec 6, 2024
1bb4112
Linted
AndriiDiachuk Dec 9, 2024
2e90387
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk Dec 9, 2024
30e27df
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiDi…
AndriiDiachuk Dec 9, 2024
bb04f4a
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 9, 2024
df387ec
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 9, 2024
f81c794
Changed parse args function, added hearbeat for hadnling
AndriiDiachuk Dec 9, 2024
50fd289
Fixed error msg
AndriiDiachuk Dec 9, 2024
37836f9
Added happy path cases, fixed remarks from event provider PR
AndriiDiachuk Dec 9, 2024
88e0333
Merge branch 'AndriiDiachuk/6587-accounts-data-provider' of github.co…
AndriiDiachuk Dec 10, 2024
8432297
Fixed account statuses test
AndriiDiachuk Dec 11, 2024
3ba47e8
Refacored events provider tests to use generic testHappyPath function
AndriiDiachuk Dec 11, 2024
85e4464
Refactored account statuses test
AndriiDiachuk Dec 11, 2024
03e5e42
Decreased expected events count
AndriiDiachuk Dec 11, 2024
32d4850
Merge branch 'master' into AndriiDiachuk/6588-events-data-provider
AndriiDiachuk Dec 11, 2024
de5f077
Added description for missing func params
AndriiDiachuk Dec 11, 2024
e83930e
Refactored parse functions
AndriiDiachuk Dec 13, 2024
5c9f16c
Refactored args filter init
AndriiDiachuk Dec 13, 2024
c002841
Created separate file for generic testHappyPath method and for testTy…
AndriiDiachuk Dec 13, 2024
90d86fd
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into AndriiDi…
AndriiDiachuk Dec 13, 2024
2f0142d
Removed commented code
AndriiDiachuk Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package request
package parser

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rest/http/request/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
if rawType == "" {
return fmt.Errorf("event type must be provided")
}
var eventType EventType
var eventType parser.EventType
err = eventType.Parse(rawType)
if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ func NewServer(serverAPI access.API,
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
}

dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI)
dataProviderFactory := dp.NewDataProviderFactory(
logger,
stateStreamApi,
serverAPI,
chain,
stateStreamConfig.EventFilterConfig,
stateStreamConfig.HeartbeatInterval)
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)

c := cors.New(cors.Options{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
accessmock "github.com/onflow/flow-go/access/mock"
"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream"
statestreamsmock "github.com/onflow/flow-go/engine/access/state_stream/mock"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -73,7 +75,12 @@ func (s *BlocksProviderSuite) SetupTest() {
}
s.finalizedBlock = parent

s.factory = NewDataProviderFactory(s.log, nil, s.api)
s.factory = NewDataProviderFactory(s.log,
nil,
s.api,
flow.Testnet.Chain(),
state_stream.DefaultEventFilterConfig,
subscription.DefaultHeartbeatInterval)
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
s.Require().NotNil(s.factory)
}

Expand Down
209 changes: 209 additions & 0 deletions engine/access/rest/websockets/data_providers/events_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package data_providers

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/onflow/flow-go/engine/access/rest/common/parser"
"github.com/onflow/flow-go/engine/access/rest/http/request"
"github.com/onflow/flow-go/engine/access/rest/util"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/counters"
)

// EventsArguments contains the arguments required for subscribing to events
type EventsArguments struct {
StartBlockID flow.Identifier // ID of the block to start subscription from
StartBlockHeight uint64 // Height of the block to start subscription from
Filter state_stream.EventFilter // Filter applied to events for a given subscription
}

// EventsDataProvider is responsible for providing events
type EventsDataProvider struct {
*baseDataProvider

logger zerolog.Logger
stateStreamApi state_stream.API

heartbeatInterval uint64
}

var _ DataProvider = (*EventsDataProvider)(nil)

// NewEventsDataProvider creates a new instance of EventsDataProvider.
func NewEventsDataProvider(
ctx context.Context,
logger zerolog.Logger,
stateStreamApi state_stream.API,
topic string,
arguments models.Arguments,
send chan<- interface{},
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
heartbeatInterval uint64,
) (*EventsDataProvider, error) {
p := &EventsDataProvider{
logger: logger.With().Str("component", "events-data-provider").Logger(),
stateStreamApi: stateStreamApi,
heartbeatInterval: heartbeatInterval,
}

// Initialize arguments passed to the provider.
eventArgs, err := ParseEventsArguments(arguments, chain, eventFilterConfig)
if err != nil {
return nil, fmt.Errorf("invalid arguments for events data provider: %w", err)
}

subCtx, cancel := context.WithCancel(ctx)

p.baseDataProvider = newBaseDataProvider(
topic,
cancel,
send,
p.createSubscription(subCtx, eventArgs), // Set up a subscription to events based on arguments.
)

return p, nil
}

// Run starts processing the subscription for events and handles responses.
//
// No errors are expected during normal operations.
func (p *EventsDataProvider) Run() error {
return subscription.HandleSubscription(p.subscription, p.handleResponse(p.send))
}

func (p *EventsDataProvider) handleResponse(send chan<- interface{}) func(eventsResponse *backend.EventsResponse) error {
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
blocksSinceLastMessage := uint64(0)
messageIndex := counters.NewMonotonousCounter(1)

return func(eventsResponse *backend.EventsResponse) error {
// check if there are any events in the response. if not, do not send a message unless the last
// response was more than HeartbeatInterval blocks ago
if len(eventsResponse.Events) == 0 {
blocksSinceLastMessage++
if blocksSinceLastMessage < p.heartbeatInterval {
return nil
}
blocksSinceLastMessage = 0
}

index := messageIndex.Value()
if ok := messageIndex.Set(messageIndex.Value() + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
}

send <- &models.EventResponse{
BlockId: eventsResponse.BlockID.String(),
BlockHeight: strconv.FormatUint(eventsResponse.Height, 10),
BlockTimestamp: eventsResponse.BlockTimestamp,
Events: eventsResponse.Events,
MessageIndex: strconv.FormatUint(index, 10),
}

return nil
}
}

// createSubscription creates a new subscription using the specified input arguments.
func (p *EventsDataProvider) createSubscription(ctx context.Context, args EventsArguments) subscription.Subscription {
if args.StartBlockID != flow.ZeroID {
return p.stateStreamApi.SubscribeEventsFromStartBlockID(ctx, args.StartBlockID, args.Filter)
}

if args.StartBlockHeight != request.EmptyHeight {
return p.stateStreamApi.SubscribeEventsFromStartHeight(ctx, args.StartBlockHeight, args.Filter)
}

return p.stateStreamApi.SubscribeEventsFromLatest(ctx, args.Filter)
}

// ParseEventsArguments validates and initializes the events arguments.
func ParseEventsArguments(
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
arguments models.Arguments,
chain flow.Chain,
eventFilterConfig state_stream.EventFilterConfig,
) (EventsArguments, error) {
var args EventsArguments

// Check for mutual exclusivity of start_block_id and start_block_height early
startBlockIDIn, hasStartBlockID := arguments["start_block_id"]
startBlockHeightIn, hasStartBlockHeight := arguments["start_block_height"]

if hasStartBlockID && hasStartBlockHeight {
return args, fmt.Errorf("can only provide either 'start_block_id' or 'start_block_height'")
}

// Parse 'start_block_id' if provided
if hasStartBlockID {
var startBlockID parser.ID
err := startBlockID.Parse(startBlockIDIn)
if err != nil {
return args, fmt.Errorf("invalid 'start_block_id': %w", err)
}
args.StartBlockID = startBlockID.Flow()
}

// Parse 'start_block_height' if provided
if hasStartBlockHeight {
var err error
args.StartBlockHeight, err = util.ToUint64(startBlockHeightIn)
if err != nil {
return args, fmt.Errorf("invalid 'start_block_height': %w", err)
}
} else {
args.StartBlockHeight = request.EmptyHeight
}

var eventTypes parser.EventTypes
// Parse 'event_types' as []string{}
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
if eventTypesIn, ok := arguments["event_types"]; ok && eventTypesIn != "" {
err := json.Unmarshal([]byte(eventTypesIn), &eventTypes) // Expect a JSON array
if err != nil {
return args, fmt.Errorf("could not parse 'event_types': %w", err)
}

err = eventTypes.Parse(strings.Split(eventTypesIn, ","))
if err != nil {
return args, fmt.Errorf("invalid 'event_types': %w", err)
}
}

// Parse 'addresses' as []string{}
var addresses []string
if addressesIn, ok := arguments["addresses"]; ok && addressesIn != "" {
err := json.Unmarshal([]byte(addressesIn), &addresses) // Expect a JSON array
if err != nil {
return args, fmt.Errorf("could not parse 'addresses': %w", err)
}
}

// Parse 'contracts' as []string{}
var contracts []string
if contractsIn, ok := arguments["contracts"]; ok && contractsIn != "" {
err := json.Unmarshal([]byte(contractsIn), &contracts) // Expect a JSON array
if err != nil {
return args, fmt.Errorf("could not parse 'contracts': %w", err)
}
}

// Initialize the event filter with the parsed arguments
filter, err := state_stream.NewEventFilter(eventFilterConfig, chain, eventTypes.Flow(), addresses, contracts)
if err != nil {
return args, fmt.Errorf("failed to create event filter: %w", err)
}
args.Filter = filter

return args, nil
}
Loading
Loading