diff --git a/app/config/config.go b/app/config/config.go index e2e48404ad..3bdeceb9cb 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -122,6 +122,8 @@ type OecConfig struct { // commitGapOffset int64 + + maxSubscriptionClients int } const ( @@ -159,6 +161,8 @@ const ( FlagEnableHasBlockPartMsg = "enable-blockpart-ack" FlagDebugGcInterval = "debug.gc-interval" FlagCommitGapOffset = "commit-gap-offset" + + FlagMaxSubscriptionClients = "max-subscription-clients" ) var ( @@ -308,6 +312,7 @@ func (c *OecConfig) loadFromConfig() { c.SetEnableHasBlockPartMsg(viper.GetBool(FlagEnableHasBlockPartMsg)) c.SetGcInterval(viper.GetInt(FlagDebugGcInterval)) c.SetIavlAcNoBatch(viper.GetBool(tmiavl.FlagIavlCommitAsyncNoBatch)) + c.SetMaxSubscriptionClients(viper.GetInt(FlagMaxSubscriptionClients)) } func resolveNodeKeyWhitelist(plain string) []string { @@ -380,7 +385,8 @@ func (c *OecConfig) format() string { commit-gap-height: %d enable-analyzer: %v iavl-commit-async-no-batch: %v - active-view-change: %v`, system.ChainName, + active-view-change: %v + max_subscription_clients: %v`, system.ChainName, c.GetMempoolRecheck(), c.GetMempoolForceRecheckGap(), c.GetMempoolSize(), @@ -410,6 +416,7 @@ func (c *OecConfig) format() string { c.GetEnableAnalyzer(), c.GetIavlAcNoBatch(), c.GetActiveVC(), + c.GetMaxSubscriptionClients(), ) } @@ -658,6 +665,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetCommitGapOffset(r) + case FlagMaxSubscriptionClients: + r, err := strconv.Atoi(v) + if err != nil { + return + } + c.SetMaxSubscriptionClients(r) } } @@ -1075,3 +1088,14 @@ func (c *OecConfig) GetIavlAcNoBatch() bool { func (c *OecConfig) SetIavlAcNoBatch(value bool) { c.iavlAcNoBatch = value } + +func (c *OecConfig) SetMaxSubscriptionClients(v int) { + if v < 0 { + v = 0 + } + c.maxSubscriptionClients = v +} + +func (c *OecConfig) GetMaxSubscriptionClients() int { + return c.maxSubscriptionClients +} diff --git a/app/rpc/tests/rpc_test.go b/app/rpc/tests/rpc_test.go index 4028b2f6d5..16d6ecf08d 100644 --- a/app/rpc/tests/rpc_test.go +++ b/app/rpc/tests/rpc_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/okex/exchain/app/config" "github.com/okex/exchain/app/crypto/ethsecp256k1" "github.com/okex/exchain/app/rpc/backend" cosmos_context "github.com/okex/exchain/libs/cosmos-sdk/client/context" @@ -149,6 +150,7 @@ func (suite *RPCTestSuite) SetupTest() { viper.Set(flags.FlagKeyringBackend, "test") viper.Set(rpc.FlagPersonalAPI, true) + viper.Set(config.FlagMaxSubscriptionClients, 100) senderPv := suite.chain.SenderAccountPVBZ() genesisAcc = suite.chain.SenderAccount().GetAddress() diff --git a/cmd/client/flags.go b/cmd/client/flags.go index 9df2c3d9fe..3729e76a24 100644 --- a/cmd/client/flags.go +++ b/cmd/client/flags.go @@ -138,5 +138,9 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().Int(backend.FlagLogsLimit, 0, "Maximum number of logs returned when calling eth_getLogs") cmd.Flags().Int(backend.FlagLogsTimeout, 60, "Maximum query duration when calling eth_getLogs") cmd.Flags().Int(websockets.FlagSubscribeLimit, 15, "Maximum subscription on a websocket connection") + + // flags for tendermint rpc + cmd.Flags().Int(config.FlagMaxSubscriptionClients, 100, "Maximum number of unique clientIDs that Tendermint RPC server can /subscribe or /broadcast_tx_commit") + wasm.AddModuleInitFlags(cmd) } diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index 288048994d..fd4a1b836e 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -33,6 +33,7 @@ type IDynamicConfig interface { GetDynamicGpMode() int GetDynamicGpMaxTxNum() int64 GetDynamicGpMaxGasUsed() int64 + GetMaxSubscriptionClients() int } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -42,10 +43,11 @@ func SetDynamicConfig(c IDynamicConfig) { } type MockDynamicConfig struct { - enableDeleteMinGPTx bool - dynamicGpMode int - dynamicGpMaxTxNum int64 - dynamicGpMaxGasUsed int64 + enableDeleteMinGPTx bool + dynamicGpMode int + dynamicGpMaxTxNum int64 + dynamicGpMaxGasUsed int64 + maxSubscriptionClients int } func (d MockDynamicConfig) GetMempoolRecheck() bool { @@ -185,3 +187,14 @@ func (d *MockDynamicConfig) SetDynamicGpMaxGasUsed(value int64) { func (d MockDynamicConfig) GetDynamicGpMaxGasUsed() int64 { return d.dynamicGpMaxGasUsed } + +func (d MockDynamicConfig) GetMaxSubscriptionClients() int { + return d.maxSubscriptionClients +} + +func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) { + if value < 0 { + return + } + d.maxSubscriptionClients = value +} diff --git a/libs/tendermint/lite/proxy/query_test.go b/libs/tendermint/lite/proxy/query_test.go index 77de7e03c3..40b84ff1d4 100644 --- a/libs/tendermint/lite/proxy/query_test.go +++ b/libs/tendermint/lite/proxy/query_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/okex/exchain/libs/tendermint/abci/example/kvstore" + cfg "github.com/okex/exchain/libs/tendermint/config" "github.com/okex/exchain/libs/tendermint/crypto/merkle" "github.com/okex/exchain/libs/tendermint/lite" certclient "github.com/okex/exchain/libs/tendermint/lite/client" @@ -125,6 +126,7 @@ func _TestAppProofs(t *testing.T) { } func TestTxProofs(t *testing.T) { + setMocConfig(100) assert, require := assert.New(t), require.New(t) cl := rpclocal.New(node) @@ -162,3 +164,10 @@ func TestTxProofs(t *testing.T) { require.Nil(err, "%#v", err) require.Equal(res.Proof.RootHash, commit.Header.DataHash) } + +func setMocConfig(clientNum int) { + moc := cfg.MockDynamicConfig{} + moc.SetMaxSubscriptionClients(100) + + cfg.SetDynamicConfig(moc) +} diff --git a/libs/tendermint/rpc/client/event_test.go b/libs/tendermint/rpc/client/event_test.go index e36a106839..e628d88d87 100644 --- a/libs/tendermint/rpc/client/event_test.go +++ b/libs/tendermint/rpc/client/event_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/okex/exchain/libs/tendermint/abci/types" + cfg "github.com/okex/exchain/libs/tendermint/config" tmrand "github.com/okex/exchain/libs/tendermint/libs/rand" "github.com/okex/exchain/libs/tendermint/rpc/client" ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types" @@ -27,6 +28,7 @@ func MakeTxKV() ([]byte, []byte, []byte) { } func TestHeaderEvents(t *testing.T) { + setMocConfig(100) for i, c := range GetClients() { i, c := i, c // capture params t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { @@ -49,6 +51,7 @@ func TestHeaderEvents(t *testing.T) { } func TestBlockEvents(t *testing.T) { + setMocConfig(100) for i, c := range GetClients() { i, c := i, c // capture params t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { @@ -97,6 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "a func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") } func testTxEventsSent(t *testing.T, broadcastMethod string) { + setMocConfig(100) for i, c := range GetClients() { i, c := i, c // capture params t.Run(reflect.TypeOf(c).String(), func(t *testing.T) { @@ -150,6 +154,7 @@ func TestClientsResubscribe(t *testing.T) { } func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) { + setMocConfig(100) c := getHTTPClient() // on Subscribe @@ -166,3 +171,10 @@ func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) { err = c.UnsubscribeAll(context.Background(), "TestHeaderEvents") assert.Error(t, err) } + +func setMocConfig(clientNum int) { + moc := cfg.MockDynamicConfig{} + moc.SetMaxSubscriptionClients(100) + + cfg.SetDynamicConfig(moc) +} diff --git a/libs/tendermint/rpc/core/events.go b/libs/tendermint/rpc/core/events.go index f32da80ace..fdf97aa801 100644 --- a/libs/tendermint/rpc/core/events.go +++ b/libs/tendermint/rpc/core/events.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" + "github.com/okex/exchain/libs/tendermint/config" tmpubsub "github.com/okex/exchain/libs/tendermint/libs/pubsub" tmquery "github.com/okex/exchain/libs/tendermint/libs/pubsub/query" ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types" @@ -22,8 +23,8 @@ const ( func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { addr := ctx.RemoteAddr() - if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) + if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients()) } else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient { return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) } diff --git a/libs/tendermint/rpc/core/mempool.go b/libs/tendermint/rpc/core/mempool.go index e4a77c7819..cd26774c4b 100644 --- a/libs/tendermint/rpc/core/mempool.go +++ b/libs/tendermint/rpc/core/mempool.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" abci "github.com/okex/exchain/libs/tendermint/abci/types" + "github.com/okex/exchain/libs/tendermint/config" mempl "github.com/okex/exchain/libs/tendermint/mempool" ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types" rpctypes "github.com/okex/exchain/libs/tendermint/rpc/jsonrpc/types" @@ -61,8 +62,8 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := ctx.RemoteAddr() - if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) + if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients()) } else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient { return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient) } diff --git a/libs/tendermint/rpc/grpc/grpc_test.go b/libs/tendermint/rpc/grpc/grpc_test.go index ec5ae22dc6..6abecf0279 100644 --- a/libs/tendermint/rpc/grpc/grpc_test.go +++ b/libs/tendermint/rpc/grpc/grpc_test.go @@ -5,10 +5,12 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/okex/exchain/libs/tendermint/abci/example/kvstore" + cfg "github.com/okex/exchain/libs/tendermint/config" core_grpc "github.com/okex/exchain/libs/tendermint/rpc/grpc" rpctest "github.com/okex/exchain/libs/tendermint/rpc/test" - "github.com/stretchr/testify/require" ) func TestMain(m *testing.M) { @@ -24,6 +26,7 @@ func TestMain(m *testing.M) { } func TestBroadcastTx(t *testing.T) { + setMocConfig(100) res, err := rpctest.GetGRPCClient().BroadcastTx( context.Background(), &core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")}, @@ -32,3 +35,10 @@ func TestBroadcastTx(t *testing.T) { require.EqualValues(t, 0, res.CheckTx.Code) require.EqualValues(t, 0, res.DeliverTx.Code) } + +func setMocConfig(clientNum int) { + moc := cfg.MockDynamicConfig{} + moc.SetMaxSubscriptionClients(100) + + cfg.SetDynamicConfig(moc) +}