Skip to content

Commit

Permalink
Merge pull request onflow#6630 from The-K-R-O-K/illia-malachyn/6617-n…
Browse files Browse the repository at this point in the history
…ew-ws-handler

[Access] Add new websocket handler and skeleton for its deps
  • Loading branch information
peterargue authored Nov 21, 2024
2 parents c5bde97 + c38f6ce commit a3676ba
Show file tree
Hide file tree
Showing 32 changed files with 737 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ generate-mocks: install-mock-generators
mockery --name 'API' --dir="./engine/protocol" --case=underscore --output="./engine/protocol/mock" --outpkg="mock"
mockery --name '.*' --dir="./engine/access/state_stream" --case=underscore --output="./engine/access/state_stream/mock" --outpkg="mock"
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_provider" --case=underscore --output="./engine/access/rest/websockets/data_provider/mock" --outpkg="mock"
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"

mockery --name '.*' --dir=model/fingerprint --case=underscore --output="./model/fingerprint/mock" --outpkg="mock"
mockery --name 'ExecForkActor' --structname 'ExecForkActorMock' --dir=module/mempool/consensus/mock/ --case=underscore --output="./module/mempool/consensus/mock/" --outpkg="mock"
mockery --name '.*' --dir=engine/verification/fetcher/ --case=underscore --output="./engine/verification/fetcher/mock" --outpkg="mockfetcher"
Expand Down
3 changes: 3 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy"
commonrest "github.com/onflow/flow-go/engine/access/rest/common"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
rpcConnection "github.com/onflow/flow-go/engine/access/rpc/connection"
Expand Down Expand Up @@ -168,6 +169,7 @@ type ObserverServiceConfig struct {
registerCacheSize uint
programCacheSize uint
registerDBPruneThreshold uint64
websocketConfig websockets.Config
}

// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
Expand Down Expand Up @@ -252,6 +254,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
registerCacheSize: 0,
programCacheSize: 0,
registerDBPruneThreshold: pruner.DefaultThreshold,
websocketConfig: websockets.NewDefaultWebsocketConfig(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/util/cmd/run-script/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/execution/computation"
Expand Down Expand Up @@ -169,6 +170,7 @@ func run(*cobra.Command, []string) {
metrics.NewNoopCollector(),
nil,
backend.Config{},
websockets.NewDefaultWebsocketConfig(),
)
if err != nil {
log.Fatal().Err(err).Msg("failed to create server")
Expand Down
2 changes: 2 additions & 0 deletions engine/access/handle_irrecoverable_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
accessmock "github.com/onflow/flow-go/engine/access/mock"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
statestreambackend "github.com/onflow/flow-go/engine/access/state_stream/backend"
Expand Down Expand Up @@ -109,6 +110,7 @@ func (suite *IrrecoverableStateTestSuite) SetupTest() {
RestConfig: rest.Config{
ListenAddress: unittest.DefaultAddress,
},
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
}

// generate a server certificate that will be served by the GRPC server
Expand Down
2 changes: 2 additions & 0 deletions engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/index"
accessmock "github.com/onflow/flow-go/engine/access/mock"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
Expand Down Expand Up @@ -138,6 +139,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
UnsecureGRPCListenAddr: unittest.DefaultAddress,
SecureGRPCListenAddr: unittest.DefaultAddress,
HTTPListenAddr: unittest.DefaultAddress,
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
}

blockCount := 5
Expand Down
27 changes: 23 additions & 4 deletions engine/access/rest/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package router

import (
"fmt"
"net/http"
"regexp"
"strings"

Expand All @@ -10,8 +11,9 @@ import (

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/common/middleware"
"github.com/onflow/flow-go/engine/access/rest/http"
flowhttp "github.com/onflow/flow-go/engine/access/rest/http"
"github.com/onflow/flow-go/engine/access/rest/http/models"
"github.com/onflow/flow-go/engine/access/rest/websockets"
legacyws "github.com/onflow/flow-go/engine/access/rest/websockets/legacy"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
Expand Down Expand Up @@ -54,7 +56,7 @@ func (b *RouterBuilder) AddRestRoutes(
) *RouterBuilder {
linkGenerator := models.NewLinkGeneratorImpl(b.v1SubRouter)
for _, r := range Routes {
h := http.NewHandler(b.logger, backend, r.Handler, linkGenerator, chain, maxRequestSize)
h := flowhttp.NewHandler(b.logger, backend, r.Handler, linkGenerator, chain, maxRequestSize)
b.v1SubRouter.
Methods(r.Method).
Path(r.Pattern).
Expand All @@ -64,8 +66,8 @@ func (b *RouterBuilder) AddRestRoutes(
return b
}

// AddWsLegacyRoutes adds WebSocket routes to the router.
func (b *RouterBuilder) AddWsLegacyRoutes(
// AddLegacyWebsocketsRoutes adds WebSocket routes to the router.
func (b *RouterBuilder) AddLegacyWebsocketsRoutes(
stateStreamApi state_stream.API,
chain flow.Chain,
stateStreamConfig backend.Config,
Expand All @@ -84,6 +86,23 @@ func (b *RouterBuilder) AddWsLegacyRoutes(
return b
}

func (b *RouterBuilder) AddWebsocketsRoute(
chain flow.Chain,
config websockets.Config,
streamApi state_stream.API,
streamConfig backend.Config,
maxRequestSize int64,
) *RouterBuilder {
handler := websockets.NewWebSocketHandler(b.logger, config, chain, streamApi, streamConfig, maxRequestSize)
b.v1SubRouter.
Methods(http.MethodGet).
Path("/ws").
Name("ws").
Handler(handler)

return b
}

func (b *RouterBuilder) Build() *mux.Router {
return b.router
}
Expand Down
4 changes: 2 additions & 2 deletions engine/access/rest/router/router_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func ExecuteRequest(req *http.Request, backend access.API) *httptest.ResponseRec
return rr
}

func ExecuteWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *TestHijackResponseRecorder, chain flow.Chain) {
func ExecuteLegacyWsRequest(req *http.Request, stateStreamApi state_stream.API, responseRecorder *TestHijackResponseRecorder, chain flow.Chain) {
restCollector := metrics.NewNoopCollector()

config := backend.Config{
Expand All @@ -147,7 +147,7 @@ func ExecuteWsRequest(req *http.Request, stateStreamApi state_stream.API, respon
router := NewRouterBuilder(
unittest.Logger(),
restCollector,
).AddWsLegacyRoutes(
).AddLegacyWebsocketsRoutes(
stateStreamApi,
chain, config, common.DefaultMaxRequestSize,
).Build()
Expand Down
6 changes: 5 additions & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/rest/router"
"github.com/onflow/flow-go/engine/access/rest/websockets"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -42,12 +43,15 @@ func NewServer(serverAPI access.API,
restCollector module.RestMetrics,
stateStreamApi state_stream.API,
stateStreamConfig backend.Config,
wsConfig websockets.Config,
) (*http.Server, error) {
builder := router.NewRouterBuilder(logger, restCollector).AddRestRoutes(serverAPI, chain, config.MaxRequestSize)
if stateStreamApi != nil {
builder.AddWsLegacyRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
}

builder.AddWebsocketsRoute(chain, wsConfig, stateStreamApi, stateStreamConfig, config.MaxRequestSize)

c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
AllowedHeaders: []string{"*"},
Expand Down
21 changes: 21 additions & 0 deletions engine/access/rest/websockets/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package websockets

import (
"time"
)

type Config struct {
MaxSubscriptionsPerConnection uint64
MaxResponsesPerSecond uint64
SendMessageTimeout time.Duration
MaxRequestSize int64
}

func NewDefaultWebsocketConfig() Config {
return Config{
MaxSubscriptionsPerConnection: 1000,
MaxResponsesPerSecond: 1000,
SendMessageTimeout: 10 * time.Second,
MaxRequestSize: 1024,
}
}
Loading

0 comments on commit a3676ba

Please sign in to comment.