Skip to content

Commit

Permalink
feat: make monitor params configurable via flags
Browse files Browse the repository at this point in the history
--monitor-max-retries - max count of status retries before closing the lease. defaults to 40
--monitor-retry-period - monitor status retry period. defaults to 4s (min value)
--monitor-retry-period-jitter - monitor status retry window. defaults to 15s
--monitor-healthcheck-period - monitor healthcheck period. defaults to 10s
--monitor-healthcheck-period-jitter - monitor healthcheck window. defaults to 5s

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Aug 17, 2024
1 parent f42936b commit 2ccb8a5
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 56 deletions.
10 changes: 10 additions & 0 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ type Config struct {
BlockedHostnames []string
DeploymentIngressStaticHosts bool
DeploymentIngressDomain string
MonitorMaxRetries uint
MonitorRetryPeriod time.Duration
MonitorRetryPeriodJitter time.Duration
MonitorHealthcheckPeriod time.Duration
MonitorHealthcheckPeriodJitter time.Duration
ClusterSettings map[interface{}]interface{}
}

func NewDefaultConfig() Config {
return Config{
InventoryResourcePollPeriod: time.Second * 5,
InventoryResourceDebugFrequency: 10,
MonitorMaxRetries: 40,
MonitorRetryPeriod: time.Second * 4, // nolint revive
MonitorRetryPeriodJitter: time.Second * 15,
MonitorHealthcheckPeriod: time.Second * 10, // nolint revive
MonitorHealthcheckPeriodJitter: time.Second * 5,
}
}
35 changes: 13 additions & 22 deletions cluster/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ import (
"github.com/akash-network/provider/tools/fromctx"
)

const (
monitorMaxRetries = 40
monitorRetryPeriodMin = time.Second * 4 // nolint revive
monitorRetryPeriodJitter = time.Second * 15

monitorHealthcheckPeriodMin = time.Second * 10 // nolint revive
monitorHealthcheckPeriodJitter = time.Second * 5
)

var (
deploymentHealthCheckCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "provider_deployment_monitor_health",
Expand All @@ -45,22 +36,22 @@ type deploymentMonitor struct {

deployment ctypes.IDeployment

attempts int
attempts uint
log log.Logger
lc lifecycle.Lifecycle

clusterSettings map[interface{}]interface{}
config Config
}

func newDeploymentMonitor(dm *deploymentManager) *deploymentMonitor {
m := &deploymentMonitor{
bus: dm.bus,
session: dm.session,
client: dm.client,
deployment: dm.deployment,
log: dm.log.With("cmp", "deployment-monitor"),
lc: lifecycle.New(),
clusterSettings: dm.config.ClusterSettings,
bus: dm.bus,
session: dm.session,
client: dm.client,
deployment: dm.deployment,
log: dm.log.With("cmp", "deployment-monitor"),
lc: lifecycle.New(),
config: dm.config,
}

go m.lc.WatchChannel(dm.lc.ShuttingDown())
Expand Down Expand Up @@ -126,7 +117,7 @@ loop:

m.publishStatus(event.ClusterDeploymentPending)

if m.attempts <= monitorMaxRetries {
if m.attempts <= m.config.MonitorMaxRetries {
// unhealthy. retry
tickch = m.scheduleRetry()
break
Expand Down Expand Up @@ -166,7 +157,7 @@ func (m *deploymentMonitor) runCheck(ctx context.Context) <-chan runner.Result {
}

func (m *deploymentMonitor) doCheck(ctx context.Context) (bool, error) {
ctx = fromctx.ApplyToContext(ctx, m.clusterSettings)
ctx = fromctx.ApplyToContext(ctx, m.config.ClusterSettings)

status, err := m.client.LeaseStatus(ctx, m.deployment.LeaseID())

Expand Down Expand Up @@ -226,11 +217,11 @@ func (m *deploymentMonitor) publishStatus(status event.ClusterDeploymentStatus)
}

func (m *deploymentMonitor) scheduleRetry() <-chan time.Time {
return m.schedule(monitorRetryPeriodMin, monitorRetryPeriodJitter)
return m.schedule(m.config.MonitorRetryPeriod, m.config.MonitorRetryPeriodJitter)
}

func (m *deploymentMonitor) scheduleHealthcheck() <-chan time.Time {
return m.schedule(monitorHealthcheckPeriodMin, monitorHealthcheckPeriodJitter)
return m.schedule(m.config.MonitorHealthcheckPeriod, m.config.MonitorHealthcheckPeriodJitter)
}

func (m *deploymentMonitor) schedule(min, jitter time.Duration) <-chan time.Time {
Expand Down
3 changes: 3 additions & 0 deletions cluster/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestMonitorInstantiate(t *testing.T) {
deployment: deployment,
log: myLog,
lc: lc,
config: NewDefaultConfig(),
}
monitor := newDeploymentMonitor(myDeploymentManager)
require.NotNil(t, monitor)
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestMonitorSendsClusterDeploymentPending(t *testing.T) {
deployment: deployment,
log: myLog,
lc: lc,
config: NewDefaultConfig(),
}
monitor := newDeploymentMonitor(myDeploymentManager)
require.NotNil(t, monitor)
Expand Down Expand Up @@ -134,6 +136,7 @@ func TestMonitorSendsClusterDeploymentDeployed(t *testing.T) {
deployment: deployment,
log: myLog,
lc: lc,
config: NewDefaultConfig(),
}
monitor := newDeploymentMonitor(myDeploymentManager)
require.NotNil(t, monitor)
Expand Down
48 changes: 48 additions & 0 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ const (
FlagBidPriceIPScale = "bid-price-ip-scale"
FlagEnableIPOperator = "ip-operator"
FlagTxBroadcastTimeout = "tx-broadcast-timeout"
FlagMonitorMaxRetries = "monitor-max-retries"
FlagMonitorRetryPeriod = "monitor-retry-period"
FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter"
FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period"
FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter"
)

const (
Expand Down Expand Up @@ -131,6 +136,14 @@ func RunCmd() *cobra.Command {
return errors.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113
}

if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second {
return errors.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorRetryPeriod, 4*time.Second) // nolint: err113
}

if viper.GetDuration(FlagMonitorHealthcheckPeriod) < 4*time.Second {
return errors.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorHealthcheckPeriod, 4*time.Second) // nolint: err113
}

group, ctx := errgroup.WithContext(cmd.Context())
cmd.SetContext(ctx)

Expand Down Expand Up @@ -397,6 +410,31 @@ func RunCmd() *cobra.Command {
panic(err)
}

cmd.Flags().Uint(FlagMonitorMaxRetries, 40, "max count of status retries before closing the lease. defaults to 40")
if err := viper.BindPFlag(FlagMonitorMaxRetries, cmd.Flags().Lookup(FlagMonitorMaxRetries)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorRetryPeriod, 4*time.Second, "monitor status retry period. defaults to 4s (min value)")
if err := viper.BindPFlag(FlagMonitorRetryPeriod, cmd.Flags().Lookup(FlagMonitorRetryPeriod)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorRetryPeriodJitter, 15*time.Second, "monitor status retry window. defaults to 15s")
if err := viper.BindPFlag(FlagMonitorRetryPeriodJitter, cmd.Flags().Lookup(FlagMonitorRetryPeriodJitter)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorHealthcheckPeriod, 10*time.Second, "monitor healthcheck period. defaults to 10s")
if err := viper.BindPFlag(FlagMonitorHealthcheckPeriod, cmd.Flags().Lookup(FlagMonitorHealthcheckPeriod)); err != nil {
panic(err)
}

cmd.Flags().Duration(FlagMonitorHealthcheckPeriodJitter, 5*time.Second, "monitor healthcheck window. defaults to 5s")
if err := viper.BindPFlag(FlagMonitorHealthcheckPeriodJitter, cmd.Flags().Lookup(FlagMonitorHealthcheckPeriodJitter)); err != nil {
panic(err)
}

if err := providerflags.AddServiceEndpointFlag(cmd, serviceHostnameOperator); err != nil {
panic(err)
}
Expand Down Expand Up @@ -522,6 +560,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge)
rpcQueryTimeout := viper.GetDuration(FlagRPCQueryTimeout)
enableIPOperator := viper.GetBool(FlagEnableIPOperator)
monitorMaxRetries := viper.GetUint(FlagMonitorMaxRetries)
monitorRetryPeriod := viper.GetDuration(FlagMonitorRetryPeriod)
monitorRetryPeriodJitter := viper.GetDuration(FlagMonitorRetryPeriodJitter)
monitorHealthcheckPeriod := viper.GetDuration(FlagMonitorHealthcheckPeriod)
monitorHealthcheckPeriodJitter := viper.GetDuration(FlagMonitorHealthcheckPeriodJitter)

pricing, err := createBidPricingStrategy(strategy)
if err != nil {
Expand Down Expand Up @@ -656,6 +699,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
config.DeploymentIngressDomain = deploymentIngressDomain
config.BidTimeout = bidTimeout
config.ManifestTimeout = manifestTimeout
config.MonitorMaxRetries = monitorMaxRetries
config.MonitorRetryPeriod = monitorRetryPeriod
config.MonitorRetryPeriodJitter = monitorRetryPeriodJitter
config.MonitorHealthcheckPeriod = monitorHealthcheckPeriod
config.MonitorHealthcheckPeriodJitter = monitorHealthcheckPeriodJitter

if len(providerConfig) != 0 {
pConf, err := config2.ReadConfigPath(providerConfig)
Expand Down
36 changes: 15 additions & 21 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,23 @@ import (
types "github.com/akash-network/akash-api/go/node/types/v1beta3"

"github.com/akash-network/provider/bidengine"
"github.com/akash-network/provider/cluster"
)

type Config struct {
ClusterWaitReadyDuration time.Duration
ClusterPublicHostname string
ClusterExternalPortQuantity uint
InventoryResourcePollPeriod time.Duration
InventoryResourceDebugFrequency uint
BidPricingStrategy bidengine.BidPricingStrategy
BidDeposit sdk.Coin
CPUCommitLevel float64
MemoryCommitLevel float64
StorageCommitLevel float64
MaxGroupVolumes int
BlockedHostnames []string
BidTimeout time.Duration
ManifestTimeout time.Duration
BalanceCheckerCfg BalanceCheckerConfig
Attributes types.Attributes
DeploymentIngressStaticHosts bool
DeploymentIngressDomain string
ClusterSettings map[interface{}]interface{}
RPCQueryTimeout time.Duration
CachedResultMaxAge time.Duration
ClusterWaitReadyDuration time.Duration
ClusterPublicHostname string
ClusterExternalPortQuantity uint
BidPricingStrategy bidengine.BidPricingStrategy
BidDeposit sdk.Coin
BidTimeout time.Duration
ManifestTimeout time.Duration
BalanceCheckerCfg BalanceCheckerConfig
Attributes types.Attributes
MaxGroupVolumes int
RPCQueryTimeout time.Duration
CachedResultMaxAge time.Duration
cluster.Config
}

func NewDefaultConfig() Config {
Expand All @@ -45,5 +38,6 @@ func NewDefaultConfig() Config {
WithdrawalPeriod: 24 * time.Hour,
},
MaxGroupVolumes: constants.DefaultMaxGroupVolumes,
Config: cluster.NewDefaultConfig(),
}
}
14 changes: 1 addition & 13 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,6 @@ func NewService(ctx context.Context,

session = session.ForModule("provider-service")

clusterConfig := cluster.NewDefaultConfig()
clusterConfig.InventoryResourcePollPeriod = cfg.InventoryResourcePollPeriod
clusterConfig.InventoryResourceDebugFrequency = cfg.InventoryResourceDebugFrequency
clusterConfig.InventoryExternalPortQuantity = cfg.ClusterExternalPortQuantity
clusterConfig.CPUCommitLevel = cfg.CPUCommitLevel
clusterConfig.MemoryCommitLevel = cfg.MemoryCommitLevel
clusterConfig.StorageCommitLevel = cfg.StorageCommitLevel
clusterConfig.BlockedHostnames = cfg.BlockedHostnames
clusterConfig.DeploymentIngressStaticHosts = cfg.DeploymentIngressStaticHosts
clusterConfig.DeploymentIngressDomain = cfg.DeploymentIngressDomain
clusterConfig.ClusterSettings = cfg.ClusterSettings

cl, err := aclient.DiscoverQueryClient(ctx, cctx)
if err != nil {
cancel()
Expand All @@ -99,7 +87,7 @@ func NewService(ctx context.Context,
return nil, err
}

cluster, err := cluster.NewService(ctx, session, bus, cclient, waiter, clusterConfig)
cluster, err := cluster.NewService(ctx, session, bus, cclient, waiter, cfg.Config)
if err != nil {
cancel()
<-bc.lc.Done()
Expand Down

0 comments on commit 2ccb8a5

Please sign in to comment.