Skip to content

Commit

Permalink
Merge pull request onflow#5832 from The-K-R-O-K/AndriiSlisarchuk/4757…
Browse files Browse the repository at this point in the history
…-current-indexed-height

[Access] Added highest indexed height to metadata
  • Loading branch information
peterargue authored May 31, 2024
2 parents b968034 + 9cec38f commit a6e3115
Show file tree
Hide file tree
Showing 18 changed files with 338 additions and 285 deletions.
158 changes: 130 additions & 28 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package access

import (
"context"
"errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/module/state_synchronization/indexer"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
Expand All @@ -27,6 +30,7 @@ type Handler struct {
signerIndicesDecoder hotstuff.BlockSignerDecoder
finalizedHeaderCache module.FinalizedHeaderCache
me module.Local
indexReporter state_synchronization.IndexReporter
}

// HandlerOption is used to hand over optional constructor parameters
Expand Down Expand Up @@ -196,7 +200,10 @@ func (h *Handler) GetCollectionByID(
ctx context.Context,
req *access.GetCollectionByIDRequest,
) (*access.CollectionResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.CollectionID(req.GetId())
if err != nil {
Expand All @@ -223,7 +230,10 @@ func (h *Handler) GetFullCollectionByID(
ctx context.Context,
req *access.GetFullCollectionByIDRequest,
) (*access.FullCollectionResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.CollectionID(req.GetId())
if err != nil {
Expand Down Expand Up @@ -251,7 +261,10 @@ func (h *Handler) SendTransaction(
ctx context.Context,
req *access.SendTransactionRequest,
) (*access.SendTransactionResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

txMsg := req.GetTransaction()

Expand All @@ -278,7 +291,10 @@ func (h *Handler) GetTransaction(
ctx context.Context,
req *access.GetTransactionRequest,
) (*access.TransactionResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.TransactionID(req.GetId())
if err != nil {
Expand All @@ -301,7 +317,10 @@ func (h *Handler) GetTransactionResult(
ctx context.Context,
req *access.GetTransactionRequest,
) (*access.TransactionResultResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

transactionID, err := convert.TransactionID(req.GetId())
if err != nil {
Expand Down Expand Up @@ -342,7 +361,10 @@ func (h *Handler) GetTransactionResultsByBlockID(
ctx context.Context,
req *access.GetTransactionsByBlockIDRequest,
) (*access.TransactionResultsResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.BlockID(req.GetBlockId())
if err != nil {
Expand All @@ -366,7 +388,10 @@ func (h *Handler) GetSystemTransaction(
ctx context.Context,
req *access.GetSystemTransactionRequest,
) (*access.TransactionResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.BlockID(req.GetBlockId())
if err != nil {
Expand All @@ -388,7 +413,10 @@ func (h *Handler) GetSystemTransactionResult(
ctx context.Context,
req *access.GetSystemTransactionResultRequest,
) (*access.TransactionResultResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.BlockID(req.GetBlockId())
if err != nil {
Expand All @@ -410,7 +438,10 @@ func (h *Handler) GetTransactionsByBlockID(
ctx context.Context,
req *access.GetTransactionsByBlockIDRequest,
) (*access.TransactionsResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

id, err := convert.BlockID(req.GetBlockId())
if err != nil {
Expand All @@ -434,7 +465,10 @@ func (h *Handler) GetTransactionResultByIndex(
ctx context.Context,
req *access.GetTransactionByIndexRequest,
) (*access.TransactionResultResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

blockID, err := convert.BlockID(req.GetBlockId())
if err != nil {
Expand All @@ -459,7 +493,10 @@ func (h *Handler) GetAccount(
ctx context.Context,
req *access.GetAccountRequest,
) (*access.GetAccountResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

address, err := convert.Address(req.GetAddress(), h.chain)
if err != nil {
Expand Down Expand Up @@ -487,7 +524,10 @@ func (h *Handler) GetAccountAtLatestBlock(
ctx context.Context,
req *access.GetAccountAtLatestBlockRequest,
) (*access.AccountResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

address, err := convert.Address(req.GetAddress(), h.chain)
if err != nil {
Expand All @@ -514,7 +554,10 @@ func (h *Handler) GetAccountAtBlockHeight(
ctx context.Context,
req *access.GetAccountAtBlockHeightRequest,
) (*access.AccountResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

address, err := convert.Address(req.GetAddress(), h.chain)
if err != nil {
Expand Down Expand Up @@ -542,7 +585,10 @@ func (h *Handler) ExecuteScriptAtLatestBlock(
ctx context.Context,
req *access.ExecuteScriptAtLatestBlockRequest,
) (*access.ExecuteScriptResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

script := req.GetScript()
arguments := req.GetArguments()
Expand All @@ -563,7 +609,10 @@ func (h *Handler) ExecuteScriptAtBlockHeight(
ctx context.Context,
req *access.ExecuteScriptAtBlockHeightRequest,
) (*access.ExecuteScriptResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

script := req.GetScript()
arguments := req.GetArguments()
Expand All @@ -585,8 +634,10 @@ func (h *Handler) ExecuteScriptAtBlockID(
ctx context.Context,
req *access.ExecuteScriptAtBlockIDRequest,
) (*access.ExecuteScriptResponse, error) {
metadata := h.buildMetadataResponse()

metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}
script := req.GetScript()
arguments := req.GetArguments()
blockID := convert.MessageToIdentifier(req.GetBlockId())
Expand All @@ -607,7 +658,10 @@ func (h *Handler) GetEventsForHeightRange(
ctx context.Context,
req *access.GetEventsForHeightRangeRequest,
) (*access.EventsResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

eventType, err := convert.EventType(req.GetType())
if err != nil {
Expand Down Expand Up @@ -639,7 +693,10 @@ func (h *Handler) GetEventsForBlockIDs(
ctx context.Context,
req *access.GetEventsForBlockIDsRequest,
) (*access.EventsResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

eventType, err := convert.EventType(req.GetType())
if err != nil {
Expand Down Expand Up @@ -671,7 +728,10 @@ func (h *Handler) GetEventsForBlockIDs(

// GetLatestProtocolStateSnapshot returns the latest serializable Snapshot
func (h *Handler) GetLatestProtocolStateSnapshot(ctx context.Context, req *access.GetLatestProtocolStateSnapshotRequest) (*access.ProtocolStateSnapshotResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

snapshot, err := h.api.GetLatestProtocolStateSnapshot(ctx)
if err != nil {
Expand All @@ -686,7 +746,10 @@ func (h *Handler) GetLatestProtocolStateSnapshot(ctx context.Context, req *acces

// GetProtocolStateSnapshotByBlockID returns serializable Snapshot by blockID
func (h *Handler) GetProtocolStateSnapshotByBlockID(ctx context.Context, req *access.GetProtocolStateSnapshotByBlockIDRequest) (*access.ProtocolStateSnapshotResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

blockID := convert.MessageToIdentifier(req.GetBlockId())

Expand All @@ -703,7 +766,10 @@ func (h *Handler) GetProtocolStateSnapshotByBlockID(ctx context.Context, req *ac

// GetProtocolStateSnapshotByHeight returns serializable Snapshot by block height
func (h *Handler) GetProtocolStateSnapshotByHeight(ctx context.Context, req *access.GetProtocolStateSnapshotByHeightRequest) (*access.ProtocolStateSnapshotResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

snapshot, err := h.api.GetProtocolStateSnapshotByHeight(ctx, req.GetBlockHeight())
if err != nil {
Expand All @@ -720,7 +786,10 @@ func (h *Handler) GetProtocolStateSnapshotByHeight(ctx context.Context, req *acc
// AN might receive multiple receipts with conflicting results for unsealed blocks.
// If this case happens, since AN is not able to determine which result is the correct one until the block is sealed, it has to pick one result to respond to this query. For now, we return the result from the latest received receipt.
func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.GetExecutionResultForBlockIDRequest) (*access.ExecutionResultForBlockIDResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

blockID := convert.MessageToIdentifier(req.GetBlockId())

Expand All @@ -734,7 +803,10 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.

// GetExecutionResultByID returns the execution result for the given ID.
func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

resultID := convert.MessageToIdentifier(req.GetId())

Expand Down Expand Up @@ -1161,7 +1233,10 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
}

func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

signerIDs, err := h.signerIndicesDecoder.DecodeSignerIDs(block.Header)
if err != nil {
Expand All @@ -1186,7 +1261,10 @@ func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flo
}

func (h *Handler) blockHeaderResponse(header *flow.Header, status flow.BlockStatus) (*access.BlockHeaderResponse, error) {
metadata := h.buildMetadataResponse()
metadata, err := h.buildMetadataResponse()
if err != nil {
return nil, err
}

signerIDs, err := h.signerIndicesDecoder.DecodeSignerIDs(header)
if err != nil {
Expand All @@ -1206,16 +1284,33 @@ func (h *Handler) blockHeaderResponse(header *flow.Header, status flow.BlockStat
}

// buildMetadataResponse builds and returns the metadata response object.
func (h *Handler) buildMetadataResponse() *entities.Metadata {
// Expected errors during normal operation:
// - codes.NotFound if result cannot be provided by storage due to the absence of data.
// - storage.ErrHeightNotIndexed when data is unavailable
func (h *Handler) buildMetadataResponse() (*entities.Metadata, error) {
lastFinalizedHeader := h.finalizedHeaderCache.Get()
blockId := lastFinalizedHeader.ID()
nodeId := h.me.NodeID()

return &entities.Metadata{
metadata := &entities.Metadata{
LatestFinalizedBlockId: blockId[:],
LatestFinalizedHeight: lastFinalizedHeader.Height,
NodeId: nodeId[:],
}

if h.indexReporter != nil {
highestIndexedHeight, err := h.indexReporter.HighestIndexedHeight()
if err != nil {
if !errors.Is(err, indexer.ErrIndexNotInitialized) {
return nil, rpc.ConvertIndexError(err, lastFinalizedHeader.Height, "could not get highest indexed height")
}
highestIndexedHeight = 0
}

metadata.HighestIndexedHeight = highestIndexedHeight
}

return metadata, nil
}

func executionResultToMessages(er *flow.ExecutionResult, metadata *entities.Metadata) (*access.ExecutionResultForBlockIDResponse, error) {
Expand All @@ -1237,6 +1332,13 @@ func WithBlockSignerDecoder(signerIndicesDecoder hotstuff.BlockSignerDecoder) fu
}
}

// WithIndexReporter configures the Handler to work with index reporter
func WithIndexReporter(indexReporter state_synchronization.IndexReporter) func(*Handler) {
return func(handler *Handler) {
handler.indexReporter = indexReporter
}
}

// checkBlockStatus checks the validity of the provided block status.
//
// Expected errors during normal operation:
Expand Down
Loading

0 comments on commit a6e3115

Please sign in to comment.