diff --git a/emulator/blockchain.go b/emulator/blockchain.go index 4a1781ad..4d32d1db 100644 --- a/emulator/blockchain.go +++ b/emulator/blockchain.go @@ -1870,3 +1870,18 @@ func (b *Blockchain) executeSystemChunkTransaction() error { return nil } + +func (b *Blockchain) GetRegisterValues(registerIDs flowgo.RegisterIDs, height uint64) (values []flowgo.RegisterValue, err error) { + ledger, err := b.storage.LedgerByHeight(context.Background(), height) + if err != nil { + return nil, err + } + for _, registerID := range registerIDs { + value, err := ledger.Get(registerID) + if err != nil { + return nil, err + } + values = append(values, value) + } + return values, nil +} diff --git a/server/access/streamBackend.go b/server/access/streamBackend.go index 3988506e..8dc9b988 100644 --- a/server/access/streamBackend.go +++ b/server/access/streamBackend.go @@ -68,12 +68,49 @@ func NewStateStreamBackend(blockchain *emulator.Blockchain, log zerolog.Logger) var _ state_stream.API = &StateStreamBackend{} +func (b *StateStreamBackend) newSubscriptionByBlockId( + ctx context.Context, + startBlockID flow.Identifier, + f subscription.GetDataByHeightFunc, +) subscription.Subscription { + block, err := b.blockchain.GetBlockByID(startBlockID) + if err != nil { + return subscription.NewFailedSubscription(err, "could not get block by ID") + } + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f) + go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) + return sub +} + +func (b *StateStreamBackend) newSubscriptionByHeight( + ctx context.Context, + startHeight uint64, + f subscription.GetDataByHeightFunc, +) subscription.Subscription { + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, startHeight, f) + go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) + return sub +} + +func (b *StateStreamBackend) newSubscriptionByLatestHeight( + ctx context.Context, + f subscription.GetDataByHeightFunc, +) subscription.Subscription { + block, err := b.blockchain.GetLatestBlock() + if err != nil { + return subscription.NewFailedSubscription(err, "could not get latest block") + } + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, block.Header.Height, f) + go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) + return sub +} + func (b *StateStreamBackend) SubscribeEventsFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, filter state_stream.EventFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByBlockId(ctx, startBlockID, b.getEventsResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeEventsFromStartHeight( @@ -81,14 +118,14 @@ func (b *StateStreamBackend) SubscribeEventsFromStartHeight( startHeight uint64, filter state_stream.EventFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByHeight(ctx, startHeight, b.getEventsResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeEventsFromLatest( ctx context.Context, filter state_stream.EventFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByLatestHeight(ctx, b.getEventsResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeAccountStatusesFromStartBlockID( @@ -96,7 +133,7 @@ func (b *StateStreamBackend) SubscribeAccountStatusesFromStartBlockID( startBlockID flow.Identifier, filter state_stream.AccountStatusFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByBlockId(ctx, startBlockID, b.getAccountStatusResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeAccountStatusesFromStartHeight( @@ -104,14 +141,14 @@ func (b *StateStreamBackend) SubscribeAccountStatusesFromStartHeight( startHeight uint64, filter state_stream.AccountStatusFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByHeight(ctx, startHeight, b.getAccountStatusResponseFactory(filter)) } func (b *StateStreamBackend) SubscribeAccountStatusesFromLatestBlock( ctx context.Context, filter state_stream.AccountStatusFilter, ) subscription.Subscription { - return nil + return b.newSubscriptionByLatestHeight(ctx, b.getAccountStatusResponseFactory(filter)) } func getStartHeightFunc(blockchain *emulator.Blockchain) GetStartHeightFunc { @@ -261,7 +298,7 @@ func (b *StateStreamBackend) SubscribeExecutionData(ctx context.Context, startBl return subscription.NewFailedSubscription(err, "could not get start height") } - sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponse) + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getExecutionDataResponse) go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) @@ -269,18 +306,18 @@ func (b *StateStreamBackend) SubscribeExecutionData(ctx context.Context, startBl } func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockID(ctx context.Context, startBlockID flow.Identifier) subscription.Subscription { - return nil + return b.newSubscriptionByBlockId(ctx, startBlockID, b.getExecutionDataResponse) } func (b *StateStreamBackend) SubscribeExecutionDataFromStartBlockHeight(ctx context.Context, startBlockHeight uint64) subscription.Subscription { - return nil + return b.newSubscriptionByHeight(ctx, startBlockHeight, b.getExecutionDataResponse) } func (b *StateStreamBackend) SubscribeExecutionDataFromLatest(ctx context.Context) subscription.Subscription { - return nil + return b.newSubscriptionByLatestHeight(ctx, b.getExecutionDataResponse) } -func (b *StateStreamBackend) getResponse(ctx context.Context, height uint64) (interface{}, error) { +func (b *StateStreamBackend) getExecutionDataResponse(ctx context.Context, height uint64) (interface{}, error) { executionData, err := b.getExecutionData(ctx, height) if err != nil { return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err) @@ -296,20 +333,20 @@ type GetExecutionDataFunc func(context.Context, uint64) (*execution_data.BlockEx type GetStartHeightFunc func(flow.Identifier, uint64) (uint64, error) -func (b StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription { +func (b *StateStreamBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) subscription.Subscription { nextHeight, err := b.getStartHeight(startBlockID, startHeight) if err != nil { return subscription.NewFailedSubscription(err, "could not get start height") } - sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponseFactory(filter)) + sub := subscription.NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getEventsResponseFactory(filter)) go subscription.NewStreamer(b.log, b.blockchain.Broadcaster(), b.sendTimeout, b.responseLimit, sub).Stream(ctx) return sub } -func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc { +func (b *StateStreamBackend) getEventsResponseFactory(filter state_stream.EventFilter) subscription.GetDataByHeightFunc { return func(ctx context.Context, height uint64) (interface{}, error) { executionData, err := b.getExecutionData(ctx, height) if err != nil { @@ -334,6 +371,30 @@ func (b StateStreamBackend) getResponseFactory(filter state_stream.EventFilter) } } -func (b StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) { - return nil, status.Errorf(codes.Unimplemented, "not implemented") +func (b *StateStreamBackend) getAccountStatusResponseFactory( + filter state_stream.AccountStatusFilter, +) subscription.GetDataByHeightFunc { + return func(ctx context.Context, height uint64) (interface{}, error) { + executionData, err := b.getExecutionData(ctx, height) + if err != nil { + return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err) + } + + events := []flow.Event{} + for _, chunkExecutionData := range executionData.ChunkExecutionDatas { + events = append(events, filter.Filter(chunkExecutionData.Events)...) + } + + allAccountProtocolEvents := filter.GroupCoreEventsByAccountAddress(events, b.log) + + return &backend.AccountStatusesResponse{ + BlockID: executionData.BlockID, + Height: height, + AccountEvents: allAccountProtocolEvents, + }, nil + } +} + +func (b *StateStreamBackend) GetRegisterValues(registerIDs flow.RegisterIDs, height uint64) ([]flow.RegisterValue, error) { + return b.blockchain.GetRegisterValues(registerIDs, height) }