Skip to content

Commit

Permalink
fix grpc query logic
Browse files Browse the repository at this point in the history
  • Loading branch information
testinginprod committed Oct 2, 2024
1 parent 2e406ce commit 4f082de
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 38 deletions.
11 changes: 6 additions & 5 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ import (
"strings"
"sync"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
"cosmossdk.io/server/v2/api/grpc/gogoreflection"
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
"cosmossdk.io/server/v2/api/grpc/gogoreflection"
)

const (
Expand Down
78 changes: 45 additions & 33 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,40 +198,9 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
// if this fails then we cannot serve queries anymore
registry, err := c.getProtoRegistry()
if err != nil {
return nil, err
}

fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", "."))

desc, err := registry.FindDescriptorByName(fullName)
if err != nil {
return nil, err
}

md, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return nil, fmt.Errorf("proto descriptor %s not found in registry", req.Path)
}

handler, isGRPC := c.queryHandlersMap[string(md.Input().FullName())]
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
if isGRPC {
protoRequest := handler.MakeMsg()
err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec
if err != nil {
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)
if err != nil {
resp := QueryResult(err, c.cfg.AppTomlConfig.Trace)
resp.Height = req.Height
return resp, err

}

return queryResponse(res, req.Height)
return resp, err
}

// this error most probably means that we can't handle it with a proto message, so
Expand Down Expand Up @@ -262,6 +231,49 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return resp, nil
}

func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
// if this fails then we cannot serve queries anymore
registry, err := c.getProtoRegistry()
if err != nil {
return nil, false, err
}

// in order to check if it's a gRPC query we ensure that there's a descriptor
// for the path, if such descriptor exists, and it is a method descriptor
// then we assume this is a gRPC query.
fullName := protoreflect.FullName(strings.ReplaceAll(req.Path, "/", "."))

desc, err := registry.FindDescriptorByName(fullName)
if err != nil {
return nil, false, err
}

md, isGRPC := desc.(protoreflect.MethodDescriptor)
if !isGRPC {
return nil, false, nil
}

handler, found := c.queryHandlersMap[string(md.Input().FullName())]
if !found {
return nil, true, fmt.Errorf("no query handler found for %s", fullName)
}
protoRequest := handler.MakeMsg()
err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec
if err != nil {
return nil, true, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)
if err != nil {
resp := QueryResult(err, c.cfg.AppTomlConfig.Trace)
resp.Height = req.Height
return resp, true, err

}

resp, err = queryResponse(res, req.Height)
return resp, isGRPC, err
}

// InitChain implements types.Application.
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)
Expand Down

0 comments on commit 4f082de

Please sign in to comment.