Skip to content

Commit

Permalink
add config for max_subscription_number (#3178)
Browse files Browse the repository at this point in the history
  • Loading branch information
LeoGuo621 authored Jun 19, 2023
1 parent 8acb639 commit 792ea82
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 10 deletions.
26 changes: 25 additions & 1 deletion app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ type OecConfig struct {

//
commitGapOffset int64

maxSubscriptionClients int
}

const (
Expand Down Expand Up @@ -159,6 +161,8 @@ const (
FlagEnableHasBlockPartMsg = "enable-blockpart-ack"
FlagDebugGcInterval = "debug.gc-interval"
FlagCommitGapOffset = "commit-gap-offset"

FlagMaxSubscriptionClients = "max-subscription-clients"
)

var (
Expand Down Expand Up @@ -308,6 +312,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetEnableHasBlockPartMsg(viper.GetBool(FlagEnableHasBlockPartMsg))
c.SetGcInterval(viper.GetInt(FlagDebugGcInterval))
c.SetIavlAcNoBatch(viper.GetBool(tmiavl.FlagIavlCommitAsyncNoBatch))
c.SetMaxSubscriptionClients(viper.GetInt(FlagMaxSubscriptionClients))
}

func resolveNodeKeyWhitelist(plain string) []string {
Expand Down Expand Up @@ -380,7 +385,8 @@ func (c *OecConfig) format() string {
commit-gap-height: %d
enable-analyzer: %v
iavl-commit-async-no-batch: %v
active-view-change: %v`, system.ChainName,
active-view-change: %v
max_subscription_clients: %v`, system.ChainName,
c.GetMempoolRecheck(),
c.GetMempoolForceRecheckGap(),
c.GetMempoolSize(),
Expand Down Expand Up @@ -410,6 +416,7 @@ func (c *OecConfig) format() string {
c.GetEnableAnalyzer(),
c.GetIavlAcNoBatch(),
c.GetActiveVC(),
c.GetMaxSubscriptionClients(),
)
}

Expand Down Expand Up @@ -658,6 +665,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetCommitGapOffset(r)
case FlagMaxSubscriptionClients:
r, err := strconv.Atoi(v)
if err != nil {
return
}
c.SetMaxSubscriptionClients(r)
}

}
Expand Down Expand Up @@ -1075,3 +1088,14 @@ func (c *OecConfig) GetIavlAcNoBatch() bool {
func (c *OecConfig) SetIavlAcNoBatch(value bool) {
c.iavlAcNoBatch = value
}

func (c *OecConfig) SetMaxSubscriptionClients(v int) {
if v < 0 {
v = 0
}
c.maxSubscriptionClients = v
}

func (c *OecConfig) GetMaxSubscriptionClients() int {
return c.maxSubscriptionClients
}
2 changes: 2 additions & 0 deletions app/rpc/tests/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/okex/exchain/app/config"
"github.com/okex/exchain/app/crypto/ethsecp256k1"
"github.com/okex/exchain/app/rpc/backend"
cosmos_context "github.com/okex/exchain/libs/cosmos-sdk/client/context"
Expand Down Expand Up @@ -149,6 +150,7 @@ func (suite *RPCTestSuite) SetupTest() {
viper.Set(flags.FlagKeyringBackend, "test")

viper.Set(rpc.FlagPersonalAPI, true)
viper.Set(config.FlagMaxSubscriptionClients, 100)

senderPv := suite.chain.SenderAccountPVBZ()
genesisAcc = suite.chain.SenderAccount().GetAddress()
Expand Down
4 changes: 4 additions & 0 deletions cmd/client/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,9 @@ func RegisterAppFlag(cmd *cobra.Command) {
cmd.Flags().Int(backend.FlagLogsLimit, 0, "Maximum number of logs returned when calling eth_getLogs")
cmd.Flags().Int(backend.FlagLogsTimeout, 60, "Maximum query duration when calling eth_getLogs")
cmd.Flags().Int(websockets.FlagSubscribeLimit, 15, "Maximum subscription on a websocket connection")

// flags for tendermint rpc
cmd.Flags().Int(config.FlagMaxSubscriptionClients, 100, "Maximum number of unique clientIDs that Tendermint RPC server can /subscribe or /broadcast_tx_commit")

wasm.AddModuleInitFlags(cmd)
}
21 changes: 17 additions & 4 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type IDynamicConfig interface {
GetDynamicGpMode() int
GetDynamicGpMaxTxNum() int64
GetDynamicGpMaxGasUsed() int64
GetMaxSubscriptionClients() int
}

var DynamicConfig IDynamicConfig = MockDynamicConfig{}
Expand All @@ -42,10 +43,11 @@ func SetDynamicConfig(c IDynamicConfig) {
}

type MockDynamicConfig struct {
enableDeleteMinGPTx bool
dynamicGpMode int
dynamicGpMaxTxNum int64
dynamicGpMaxGasUsed int64
enableDeleteMinGPTx bool
dynamicGpMode int
dynamicGpMaxTxNum int64
dynamicGpMaxGasUsed int64
maxSubscriptionClients int
}

func (d MockDynamicConfig) GetMempoolRecheck() bool {
Expand Down Expand Up @@ -185,3 +187,14 @@ func (d *MockDynamicConfig) SetDynamicGpMaxGasUsed(value int64) {
func (d MockDynamicConfig) GetDynamicGpMaxGasUsed() int64 {
return d.dynamicGpMaxGasUsed
}

func (d MockDynamicConfig) GetMaxSubscriptionClients() int {
return d.maxSubscriptionClients
}

func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) {
if value < 0 {
return
}
d.maxSubscriptionClients = value
}
9 changes: 9 additions & 0 deletions libs/tendermint/lite/proxy/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/okex/exchain/libs/tendermint/abci/example/kvstore"
cfg "github.com/okex/exchain/libs/tendermint/config"
"github.com/okex/exchain/libs/tendermint/crypto/merkle"
"github.com/okex/exchain/libs/tendermint/lite"
certclient "github.com/okex/exchain/libs/tendermint/lite/client"
Expand Down Expand Up @@ -125,6 +126,7 @@ func _TestAppProofs(t *testing.T) {
}

func TestTxProofs(t *testing.T) {
setMocConfig(100)
assert, require := assert.New(t), require.New(t)

cl := rpclocal.New(node)
Expand Down Expand Up @@ -162,3 +164,10 @@ func TestTxProofs(t *testing.T) {
require.Nil(err, "%#v", err)
require.Equal(res.Proof.RootHash, commit.Header.DataHash)
}

func setMocConfig(clientNum int) {
moc := cfg.MockDynamicConfig{}
moc.SetMaxSubscriptionClients(100)

cfg.SetDynamicConfig(moc)
}
12 changes: 12 additions & 0 deletions libs/tendermint/rpc/client/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

abci "github.com/okex/exchain/libs/tendermint/abci/types"
cfg "github.com/okex/exchain/libs/tendermint/config"
tmrand "github.com/okex/exchain/libs/tendermint/libs/rand"
"github.com/okex/exchain/libs/tendermint/rpc/client"
ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types"
Expand All @@ -27,6 +28,7 @@ func MakeTxKV() ([]byte, []byte, []byte) {
}

func TestHeaderEvents(t *testing.T) {
setMocConfig(100)
for i, c := range GetClients() {
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
Expand All @@ -49,6 +51,7 @@ func TestHeaderEvents(t *testing.T) {
}

func TestBlockEvents(t *testing.T) {
setMocConfig(100)
for i, c := range GetClients() {
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
Expand Down Expand Up @@ -97,6 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "a
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }

func testTxEventsSent(t *testing.T, broadcastMethod string) {
setMocConfig(100)
for i, c := range GetClients() {
i, c := i, c // capture params
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
Expand Down Expand Up @@ -150,6 +154,7 @@ func TestClientsResubscribe(t *testing.T) {
}

func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
setMocConfig(100)
c := getHTTPClient()

// on Subscribe
Expand All @@ -166,3 +171,10 @@ func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
err = c.UnsubscribeAll(context.Background(), "TestHeaderEvents")
assert.Error(t, err)
}

func setMocConfig(clientNum int) {
moc := cfg.MockDynamicConfig{}
moc.SetMaxSubscriptionClients(100)

cfg.SetDynamicConfig(moc)
}
5 changes: 3 additions & 2 deletions libs/tendermint/rpc/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"

"github.com/okex/exchain/libs/tendermint/config"
tmpubsub "github.com/okex/exchain/libs/tendermint/libs/pubsub"
tmquery "github.com/okex/exchain/libs/tendermint/libs/pubsub/query"
ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types"
Expand All @@ -22,8 +23,8 @@ const (
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients())
} else if env.EventBus.NumClientSubscriptions(addr) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
}
Expand Down
5 changes: 3 additions & 2 deletions libs/tendermint/rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"

abci "github.com/okex/exchain/libs/tendermint/abci/types"
"github.com/okex/exchain/libs/tendermint/config"
mempl "github.com/okex/exchain/libs/tendermint/mempool"
ctypes "github.com/okex/exchain/libs/tendermint/rpc/core/types"
rpctypes "github.com/okex/exchain/libs/tendermint/rpc/jsonrpc/types"
Expand Down Expand Up @@ -61,8 +62,8 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
if env.EventBus.NumClients() >= config.DynamicConfig.GetMaxSubscriptionClients() {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.DynamicConfig.GetMaxSubscriptionClients())
} else if env.EventBus.NumClientSubscriptions(subscriber) >= env.Config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", env.Config.MaxSubscriptionsPerClient)
}
Expand Down
12 changes: 11 additions & 1 deletion libs/tendermint/rpc/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/okex/exchain/libs/tendermint/abci/example/kvstore"
cfg "github.com/okex/exchain/libs/tendermint/config"
core_grpc "github.com/okex/exchain/libs/tendermint/rpc/grpc"
rpctest "github.com/okex/exchain/libs/tendermint/rpc/test"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
Expand All @@ -24,6 +26,7 @@ func TestMain(m *testing.M) {
}

func TestBroadcastTx(t *testing.T) {
setMocConfig(100)
res, err := rpctest.GetGRPCClient().BroadcastTx(
context.Background(),
&core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")},
Expand All @@ -32,3 +35,10 @@ func TestBroadcastTx(t *testing.T) {
require.EqualValues(t, 0, res.CheckTx.Code)
require.EqualValues(t, 0, res.DeliverTx.Code)
}

func setMocConfig(clientNum int) {
moc := cfg.MockDynamicConfig{}
moc.SetMaxSubscriptionClients(100)

cfg.SetDynamicConfig(moc)
}

0 comments on commit 792ea82

Please sign in to comment.