From 4f082deb918c765199fecf64e0d3838cb4afe5d3 Mon Sep 17 00:00:00 2001 From: testinginprod Date: Wed, 2 Oct 2024 15:05:08 +0200 Subject: [PATCH] fix grpc query logic --- server/v2/api/grpc/server.go | 11 ++--- server/v2/cometbft/abci.go | 78 +++++++++++++++++++++--------------- 2 files changed, 51 insertions(+), 38 deletions(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index ba14a52e8cf8..3a1c9cf6c13f 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -12,11 +12,6 @@ import ( "strings" "sync" - appmodulev2 "cosmossdk.io/core/appmodule/v2" - "cosmossdk.io/core/transaction" - "cosmossdk.io/log" - serverv2 "cosmossdk.io/server/v2" - "cosmossdk.io/server/v2/api/grpc/gogoreflection" gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/pflag" "google.golang.org/grpc" @@ -24,6 +19,12 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" + + appmodulev2 "cosmossdk.io/core/appmodule/v2" + "cosmossdk.io/core/transaction" + "cosmossdk.io/log" + serverv2 "cosmossdk.io/server/v2" + "cosmossdk.io/server/v2/api/grpc/gogoreflection" ) const ( diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 4757aa052e38..12b8b026183c 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -198,40 +198,9 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // Query implements types.Application. // It is called by cometbft to query application state. func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { - // if this fails then we cannot serve queries anymore - registry, err := c.getProtoRegistry() - if err != nil { - return nil, err - } - - fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", ".")) - - desc, err := registry.FindDescriptorByName(fullName) - if err != nil { - return nil, err - } - - md, ok := desc.(protoreflect.MethodDescriptor) - if !ok { - return nil, fmt.Errorf("proto descriptor %s not found in registry", req.Path) - } - - handler, isGRPC := c.queryHandlersMap[string(md.Input().FullName())] + resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req) if isGRPC { - protoRequest := handler.MakeMsg() - err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec - if err != nil { - return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) - } - res, err := c.app.Query(ctx, uint64(req.Height), protoRequest) - if err != nil { - resp := QueryResult(err, c.cfg.AppTomlConfig.Trace) - resp.Height = req.Height - return resp, err - - } - - return queryResponse(res, req.Height) + return resp, err } // this error most probably means that we can't handle it with a proto message, so @@ -262,6 +231,49 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( return resp, nil } +func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) { + // if this fails then we cannot serve queries anymore + registry, err := c.getProtoRegistry() + if err != nil { + return nil, false, err + } + + // in order to check if it's a gRPC query we ensure that there's a descriptor + // for the path, if such descriptor exists, and it is a method descriptor + // then we assume this is a gRPC query. + fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", ".")) + + desc, err := registry.FindDescriptorByName(fullName) + if err != nil { + return nil, false, err + } + + md, isGRPC := desc.(protoreflect.MethodDescriptor) + if !isGRPC { + return nil, false, nil + } + + handler, found := c.queryHandlersMap[string(md.Input().FullName())] + if !found { + return nil, true, fmt.Errorf("no query handler found for %s", fullName) + } + protoRequest := handler.MakeMsg() + err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec + if err != nil { + return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) + } + res, err := c.app.Query(ctx, uint64(req.Height), protoRequest) + if err != nil { + resp := QueryResult(err, c.cfg.AppTomlConfig.Trace) + resp.Height = req.Height + return resp, true, err + + } + + resp, err = queryResponse(res, req.Height) + return resp, isGRPC, err +} + // InitChain implements types.Application. func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) { c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)