From f8abf01b222827b13de0e156c523ed3314a703e6 Mon Sep 17 00:00:00 2001 From: Deniz Mert Edincik Date: Fri, 30 Aug 2024 19:02:08 +0200 Subject: [PATCH 1/5] Implement missing streaming methods --- server/access/streamBackend.go | 88 +++++++++++++++++++++++++++++----- 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/server/access/streamBackend.go b/server/access/streamBackend.go index 3988506e..a836eaf2 100644 --- a/server/access/streamBackend.go +++ b/server/access/streamBackend.go @@ -68,12 +68,49 @@ func NewStateStreamBackend(blockchain *emulator.Blockchain, log zerolog.Logger) var _ state_stream.API = &StateStreamBackend{} +func (b *StateStreamBackend) newSubscriptionByBlockId( + ctx context.Context, + startBlockID flow.Identifier, + f subscription.GetDataByHeightFunc, +) subscription.Subscription { + block, err := b.blockchain.GetBlockByID(startBlockID) + if err != nil { + return subscription.NewFailedSubscription(err, "could not get block by ID") + } + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f) + go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) + return sub +} + +func (b *StateStreamBackend) newSubscriptionByHeight( + ctx context.Context, + startHeight uint64, + f subscription.GetDataByHeightFunc, +) subscription.Subscription { + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, startHeight, f) + go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) + return sub +} + +func (b *StateStreamBackend) newSubscriptionByLatestHeight( + ctx context.Context, + f subscription.GetDataByHeightFunc, +) subscription.Subscription { + block, err := b.blockchain.GetLatestBlock() + if err != nil { + return subscription.NewFailedSubscription(err, "could not get latest block") + } + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f) + go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) + return sub +} + func (b *StateStreamBackend) SubscribeEventsFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, filter state_stream.EventFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByBlockId(ctx, startBlockID, b.getEventsResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeEventsFromStartHeight( @@ -81,14 +118,14 @@ func (b *StateStreamBackend) SubscribeEventsFromStartHeight( startHeight uint64, filter state_stream.EventFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByHeight(ctx, startHeight, b.getEventsResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeEventsFromLatest( ctx context.Context, filter state_stream.EventFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByLatestHeight(ctx, b.getEventsResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeAccountStatusesFromStartBlockID( @@ -96,7 +133,7 @@ func (b *StateStreamBackend) SubscribeAccountStatusesFromStartBlockID( startBlockID flow.Identifier, filter state_stream.AccountStatusFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByBlockId(ctx, startBlockID, b.getAccountStatusResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeAccountStatusesFromStartHeight( @@ -104,14 +141,14 @@ func (b *StateStreamBackend) SubscribeAccountStatusesFromStartHeight( startHeight uint64, filter state_stream.AccountStatusFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByHeight(ctx, startHeight, b.getAccountStatusResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeAccountStatusesFromLatestBlock( ctx context.Context, filter state_stream.AccountStatusFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByLatestHeight(ctx, b.getAccountStatusResponseFactory(filter)) } func getStartHeightFunc(blockchain *emulator.Blockchain) GetStartHeightFunc { @@ -261,7 +298,7 @@ func (b *StateStreamBackend) SubscribeExecutionData(ctx context.Context, startBl return subscription.NewFailedSubscription(err, "could not get start height") } - sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponse) + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getExecutionDataResponse) go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) @@ -269,18 +306,18 @@ func (b *StateStreamBackend) SubscribeExecutionData(ctx context.Context, startBl } func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockID(ctx context.Context, startBlockID flow.Identifier) subscription.Subscription { - return nil + return b.newSubscriptionByBlockId(ctx, startBlockID, b.getExecutionDataResponse) } func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockHeight(ctx context.Context, startBlockHeight uint64) subscription.Subscription { - return nil + return b.newSubscriptionByHeight(ctx, startBlockHeight, b.getExecutionDataResponse) } func (b *StateStreamBackend) SubscribeExecutionDataFromLatest(ctx context.Context) subscription.Subscription { - return nil + return b.newSubscriptionByLatestHeight(ctx, b.getExecutionDataResponse) } -func (b *StateStreamBackend) getResponse(ctx context.Context, height uint64) (interface{}, error) { +func (b *StateStreamBackend) getExecutionDataResponse(ctx context.Context, height uint64) (interface{}, error) { executionData, err := b.getExecutionData(ctx, height) if err != nil { return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err) @@ -302,14 +339,14 @@ func (b StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID fl return subscription.NewFailedSubscription(err, "could not get start height") } - sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponseFactory(filter)) + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getEventsResponseFactory(filter)) go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) return sub } -func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc { +func (b StateStreamBackend) getEventsResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc { return func(ctx context.Context, height uint64) (interface{}, error) { executionData, err := b.getExecutionData(ctx, height) if err != nil { @@ -334,6 +371,31 @@ func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter) } } +func (b *StateStreamBackend) getAccountStatusResponseFactory( + filter state_stream.AccountStatusFilter, +) subscription.GetDataByHeightFunc { + return func(ctx context.Context, height uint64) (interface{}, error) { + executionData, err := b.getExecutionData(ctx, height) + if err != nil { + return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err) + } + + events := []flow.Event{} + for _, chunkExecutionData := range executionData.ChunkExecutionDatas { + events = append(events, filter.Filter(chunkExecutionData.Events)...) + } + + filteredProtocolEvents := filter.Filter(events) + allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(filteredProtocolEvents, b.log) + + return &backend.AccountStatusesResponse{ + BlockID: executionData.BlockID, + Height: height, + AccountEvents: allAccountProtocolEvents, + }, nil + } +} + func (b StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) { return nil, status.Errorf(codes.Unimplemented, "not implemented") } From a10db8ceb9b94b1b58bac15de94e8bba744aeec6 Mon Sep 17 00:00:00 2001 From: Deniz Mert Edincik Date: Fri, 30 Aug 2024 19:06:28 +0200 Subject: [PATCH 2/5] fix mixed calls --- server/access/streamBackend.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/access/streamBackend.go b/server/access/streamBackend.go index a836eaf2..5b30b410 100644 --- a/server/access/streamBackend.go +++ b/server/access/streamBackend.go @@ -333,7 +333,7 @@ type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockEx type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error) -func (b StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription { +func (b *StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription { nextHeight, err := b.getStartHeight(startBlockID, startHeight) if err != nil { return subscription.NewFailedSubscription(err, "could not get start height") @@ -346,7 +346,7 @@ func (b StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID fl return sub } -func (b StateStreamBackend) getEventsResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc { +func (b *StateStreamBackend) getEventsResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc { return func(ctx context.Context, height uint64) (interface{}, error) { executionData, err := b.getExecutionData(ctx, height) if err != nil { @@ -396,6 +396,6 @@ func (b *StateStreamBackend) getAccountStatusResponseFactory( } } -func (b StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) { +func (b *StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) { return nil, status.Errorf(codes.Unimplemented, "not implemented") } From 34b4c151938ed5ffac6d8689b8f4dbb15d77baff Mon Sep 17 00:00:00 2001 From: Deniz Mert Edincik Date: Fri, 30 Aug 2024 19:12:42 +0200 Subject: [PATCH 3/5] implement GetRegisterValues --- emulator/blockchain.go | 15 +++++++++++++++ server/access/streamBackend.go | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/emulator/blockchain.go b/emulator/blockchain.go index 4a1781ad..4d32d1db 100644 --- a/emulator/blockchain.go +++ b/emulator/blockchain.go @@ -1870,3 +1870,18 @@ func (b *Blockchain) executeSystemChunkTransaction() error { return nil } + +func (b *Blockchain) GetRegisterValues(registerIDs flowgo.RegisterIDs, height uint64) (values []flowgo.RegisterValue, err error) { + ledger, err := b.storage.LedgerByHeight(context.Background(), height) + if err != nil { + return nil, err + } + for _, registerID := range registerIDs { + value, err := ledger.Get(registerID) + if err != nil { + return nil, err + } + values = append(values, value) + } + return values, nil +} diff --git a/server/access/streamBackend.go b/server/access/streamBackend.go index 5b30b410..9b3a8a71 100644 --- a/server/access/streamBackend.go +++ b/server/access/streamBackend.go @@ -397,5 +397,5 @@ func (b *StateStreamBackend) getAccountStatusResponseFactory( } func (b *StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) { - return nil, status.Errorf(codes.Unimplemented, "not implemented") + return b.blockchain.GetRegisterValues(registerIDs, height) } From 8ee7ad2f4db4631b8d66f9b8a0128b455e5496c4 Mon Sep 17 00:00:00 2001 From: Deniz Mert Edincik Date: Sun, 1 Sep 2024 12:40:47 +0200 Subject: [PATCH 4/5] review fix --- server/access/streamBackend.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/access/streamBackend.go b/server/access/streamBackend.go index 9b3a8a71..221b2d9d 100644 --- a/server/access/streamBackend.go +++ b/server/access/streamBackend.go @@ -385,7 +385,6 @@ func (b *StateStreamBackend) getAccountStatusResponseFactory( events = append(events, filter.Filter(chunkExecutionData.Events)...) } - filteredProtocolEvents := filter.Filter(events) allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(filteredProtocolEvents, b.log) return &backend.AccountStatusesResponse{ From a8624e884cca756a72a02c96e22e1ce3138db01c Mon Sep 17 00:00:00 2001 From: Deniz Mert Edincik Date: Sun, 1 Sep 2024 13:37:45 +0200 Subject: [PATCH 5/5] fix --- server/access/streamBackend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/access/streamBackend.go b/server/access/streamBackend.go index 221b2d9d..8dc9b988 100644 --- a/server/access/streamBackend.go +++ b/server/access/streamBackend.go @@ -385,7 +385,7 @@ func (b *StateStreamBackend) getAccountStatusResponseFactory( events = append(events, filter.Filter(chunkExecutionData.Events)...) } - allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(filteredProtocolEvents, b.log) + allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(events, b.log) return &backend.AccountStatusesResponse{ BlockID: executionData.BlockID,