From 59bfb208eb9a14c9019927437943eabe8ba9048b Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Jan 2024 22:02:44 +0200 Subject: [PATCH] Enable state streaming API over gRPC --- server/access/grpc.go | 19 ++++++++++++++++++- server/server.go | 2 +- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/server/access/grpc.go b/server/access/grpc.go index 7d96ae22..389d36b0 100644 --- a/server/access/grpc.go +++ b/server/access/grpc.go @@ -23,14 +23,18 @@ import ( "net" "github.com/onflow/flow-emulator/adapters" + "github.com/onflow/flow-emulator/emulator" mockModule "github.com/onflow/flow-go/module/mock" grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/onflow/flow-go/access" legacyaccess "github.com/onflow/flow-go/access/legacy" + "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" flowgo "github.com/onflow/flow-go/model/flow" accessproto "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/executiondata" legacyaccessproto "github.com/onflow/flow/protobuf/go/flow/legacy/access" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -52,7 +56,7 @@ type GRPCServer struct { listener net.Listener } -func NewGRPCServer(logger *zerolog.Logger, adapter *adapters.AccessAdapter, chain flow.Chain, host string, port int, debug bool) *GRPCServer { +func NewGRPCServer(logger *zerolog.Logger, blockchain *emulator.Blockchain, adapter *adapters.AccessAdapter, chain flow.Chain, host string, port int, debug bool) *GRPCServer { grpcServer := grpc.NewServer( grpc.StreamInterceptor(grpcprometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpcprometheus.UnaryServerInterceptor), @@ -71,6 +75,19 @@ func NewGRPCServer(logger *zerolog.Logger, adapter *adapters.AccessAdapter, chai reflection.Register(grpcServer) } + streamConfig := backend.Config{ + EventFilterConfig: state_stream.DefaultEventFilterConfig, + RpcMetricsEnabled: false, + MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams, + ClientSendTimeout: state_stream.DefaultSendTimeout, + ClientSendBufferSize: state_stream.DefaultSendBufferSize, + ResponseLimit: state_stream.DefaultResponseLimit, + HeartbeatInterval: state_stream.DefaultHeartbeatInterval, + } + streamBackend := NewStateStreamBackend(blockchain, *logger) + handler := backend.NewHandler(streamBackend, chain, streamConfig) + executiondata.RegisterExecutionDataAPIServer(grpcServer, handler) + return &GRPCServer{ logger: logger, host: host, diff --git a/server/server.go b/server/server.go index 94028eb0..30bf20e0 100644 --- a/server/server.go +++ b/server/server.go @@ -193,7 +193,7 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer { accessAdapter := adapters.NewAccessAdapter(logger, emulatedBlockchain) livenessTicker := utils.NewLivenessTicker(conf.LivenessCheckTolerance) - grpcServer := access.NewGRPCServer(logger, accessAdapter, chain, conf.Host, conf.GRPCPort, conf.GRPCDebug) + grpcServer := access.NewGRPCServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.GRPCPort, conf.GRPCDebug) restServer, err := access.NewRestServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.RESTPort, conf.RESTDebug) if err != nil { logger.Error().Err(err).Msg("❗ Failed to startup REST API")