Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe block digest endpoints #764

Merged
merged 15 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
}

func (c *Client) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus)
}

func (c *Client) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
) (<-chan flow.BlockDigest, <-chan error, error) {
return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
}

func (c *Client) SubscribeBlocksFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
Expand Down
16 changes: 16 additions & 0 deletions access/grpc/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,22 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
}, nil
}

func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest {
return flow.BlockDigest{
BlockID: flow.BytesToID(m.GetBlockId()),
Height: m.GetBlockHeight(),
Timestamp: m.GetBlockTimestamp().AsTime(),
}
}

func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse {
return &access.SubscribeBlockDigestsResponse{
BlockId: IdentifierToMessage(blockDigest.BlockID),
BlockHeight: blockDigest.Height,
BlockTimestamp: timestamppb.New(blockDigest.Timestamp),
}
}

func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
switch blockStatus {
case flow.BlockStatusFinalized:
Expand Down
120 changes: 120 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,3 +1296,123 @@ func receiveBlocksFromClient[Client interface {
}
}
}

func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: convert.BlockStatusToEntity(blockStatus),
AndriiDiachuk marked this conversation as resolved.
Show resolved Hide resolved
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
request := &access.SubscribeBlockDigestsFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockDigestsFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockDigest, <-chan error, error) {
request := &access.SubscribeBlockDigestsFromLatestRequest{
BlockStatus: convert.BlockStatusToEntity(blockStatus),
}

subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blocksChan := make(chan flow.BlockDigest)
errChan := make(chan error)

go func() {
defer close(blocksChan)
defer close(errChan)
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
}()

return blocksChan, errChan, nil
}

func receiveBlockDigestFromClient[Client interface {
Recv() (*access.SubscribeBlockDigestsResponse, error)
}](
ctx context.Context,
client Client,
blockDigestsChan chan<- flow.BlockDigest,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next blockDigest response
blockDigestResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving blockDigest: %w", err))
return
}

blockDigest := convert.MessageToBlockDigest(blockDigestResponse)

select {
case <-ctx.Done():
return
case blockDigestsChan <- blockDigest:
}
}
}
175 changes: 175 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2381,3 +2381,178 @@ func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) {
require.FailNow(t, "should not receive blocks")
}
}

func TestClient_SubscribeBlockDigest(t *testing.T) {
blockHeaders := test.BlockHeaderGenerator()

generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse {
var resBlockDigests []*access.SubscribeBlockDigestsResponse

for i := uint64(0); i < count; i++ {
blockHeader := blockHeaders.New()

digest := flow.BlockDigest{
BlockID: blockHeader.ID,
Height: blockHeader.Height,
Timestamp: blockHeader.Timestamp,
}

resBlockDigests = append(resBlockDigests, convert.BlockDigestToMessage(digest))
}

return resBlockDigests
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId)
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
responses: generateBlockDigestResponses(responseCount),
}

rpc.
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualDigest := <-blockDigestsCh
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
require.Equal(t, expectedDigest, actualDigest)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusUnknown)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlockDigests(t, blockDigestsCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
cancel()

require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))
}

type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlockDigestsResponse
}

func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) {
defer done()
for range blockDigestsChan {
require.FailNow(t, "should not receive block digests")
}
}
7 changes: 7 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,10 @@ type BlockSeal struct {
// block produces the same receipt among all verifying nodes
ExecutionReceiptID Identifier
}

// BlockDigest holds lightweight block information which includes only block id, block height and block timestamp
type BlockDigest struct {
BlockID Identifier
Height uint64
Timestamp time.Time
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.4
replace github.com/onflow/flow-go-sdk => ../

require (
github.com/onflow/cadence v1.0.0-preview.52
github.com/onflow/cadence v1.0.0
github.com/onflow/flow-cli/flowkit v1.11.0
github.com/onflow/flow-go-sdk v0.41.17
github.com/spf13/afero v1.11.0
Expand Down
1 change: 1 addition & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ github.com/onflow/cadence v1.0.0-preview.35/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmp
github.com/onflow/cadence v1.0.0-preview.36/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.38/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.52/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU=
github.com/onflow/cadence v1.0.0/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU=
github.com/onflow/crypto v0.25.0 h1:BeWbLsh3ZD13Ej+Uky6kg1PL1ZIVBDVX+2MVBNwqddg=
github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=
github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=
Expand Down
Loading