Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL-403] Workflow Engine Rate Limiter #15939

Merged
merged 16 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chilled-papayas-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Implements rate limiter for workflow executions by workflow engine
8 changes: 8 additions & 0 deletions core/config/capabilities_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ type CapabilitiesExternalRegistry interface {
RelayID() types.RelayID
}

type EngineExecutionRateLimit interface {
GlobalRPS() float64
GlobalBurst() int
PerSenderRPS() float64
PerSenderBurst() int
}

type CapabilitiesWorkflowRegistry interface {
Address() string
NetworkID() string
Expand All @@ -38,6 +45,7 @@ type ConnectorGateway interface {
}

type Capabilities interface {
RateLimit() EngineExecutionRateLimit
Peering() P2P
Dispatcher() Dispatcher
ExternalRegistry() CapabilitiesExternalRegistry
Expand Down
10 changes: 10 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,16 @@ DeltaReconcile = '1m' # Default
# but the host and port must be fully specified and cannot be empty. You can specify `0.0.0.0` (IPv4) or `::` (IPv6) to listen on all interfaces, but that is not recommended.
ListenAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example

[Capabilities.RateLimit]
# GlobalRPS is the global rate limit for the dispatcher.
GlobalRPS = 200 # Default
# GlobalBurst is the global burst limit for the dispatcher.
GlobalBurst = 200 # Default
# PerSenderRPS is the per-sender rate limit for the dispatcher.
PerSenderRPS = 100 # Default
# PerSenderBurst is the per-sender burst limit for the dispatcher.
PerSenderBurst = 100 # Default

[Capabilities.WorkflowRegistry]
# Address is the address for the workflow registry contract.
Address = '0x0' # Example
Expand Down
34 changes: 29 additions & 5 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,28 @@ func (m *MercurySecrets) ValidateConfig() (err error) {
return err
}

type EngineExecutionRateLimit struct {
GlobalRPS *float64
GlobalBurst *int
PerSenderRPS *float64
PerSenderBurst *int
}

func (eerl *EngineExecutionRateLimit) setFrom(f *EngineExecutionRateLimit) {
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
if f.GlobalRPS != nil {
eerl.GlobalRPS = f.GlobalRPS
}
if f.GlobalBurst != nil {
eerl.GlobalBurst = f.GlobalBurst
}
if f.PerSenderRPS != nil {
eerl.PerSenderRPS = f.PerSenderRPS
}
if f.PerSenderBurst != nil {
eerl.PerSenderBurst = f.PerSenderBurst
}
}

type ExternalRegistry struct {
Address *string
NetworkID *string
Expand Down Expand Up @@ -1574,14 +1596,16 @@ type ConnectorGateway struct {
}

type Capabilities struct {
Peering P2P `toml:",omitempty"`
Dispatcher Dispatcher `toml:",omitempty"`
ExternalRegistry ExternalRegistry `toml:",omitempty"`
WorkflowRegistry WorkflowRegistry `toml:",omitempty"`
GatewayConnector GatewayConnector `toml:",omitempty"`
RateLimit EngineExecutionRateLimit `toml:",omitempty"`
Peering P2P `toml:",omitempty"`
Dispatcher Dispatcher `toml:",omitempty"`
ExternalRegistry ExternalRegistry `toml:",omitempty"`
WorkflowRegistry WorkflowRegistry `toml:",omitempty"`
GatewayConnector GatewayConnector `toml:",omitempty"`
}

func (c *Capabilities) setFrom(f *Capabilities) {
c.RateLimit.setFrom(&f.RateLimit)
c.Peering.setFrom(&f.Peering)
c.ExternalRegistry.setFrom(&f.ExternalRegistry)
c.WorkflowRegistry.setFrom(&f.WorkflowRegistry)
Expand Down
25 changes: 22 additions & 3 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
Expand Down Expand Up @@ -277,6 +278,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

workflowRateLimiter, err := ratelimiter.NewRateLimiter(ratelimiter.Config{
GlobalRPS: cfg.Capabilities().RateLimit().GlobalRPS(),
GlobalBurst: cfg.Capabilities().RateLimit().GlobalBurst(),
PerSenderRPS: cfg.Capabilities().RateLimit().PerSenderRPS(),
PerSenderBurst: cfg.Capabilities().RateLimit().PerSenderBurst(),
})
if err != nil {
return nil, fmt.Errorf("could not instantiate workflow rate limiter: %w", err)
}

var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper
if cfg.Capabilities().GatewayConnector().DonID() != "" {
globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID())
Expand Down Expand Up @@ -366,9 +377,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
lggr := globalLogger.Named("WorkflowRegistrySyncer")
fetcher := syncer.NewFetcherService(lggr, gatewayConnectorWrapper)

eventHandler := syncer.NewEventHandler(lggr, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0],
eventHandler := syncer.NewEventHandler(
lggr,
syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
fetcher.Fetch,
workflowstore.NewDBStore(opts.DS, lggr, clockwork.NewRealClock()),
opts.CapabilitiesRegistry,
custmsg.NewLabeler(),
clockwork.NewRealClock(),
keys[0],
workflowRateLimiter,
syncer.WithMaxArtifactSize(
syncer.ArtifactConfig{
MaxBinarySize: uint64(cfg.Capabilities().WorkflowRegistry().MaxBinarySize()),
Expand Down Expand Up @@ -591,6 +609,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
globalLogger,
opts.CapabilitiesRegistry,
workflowORM,
workflowRateLimiter,
)

// Flux monitor requires ethereum just to boot, silence errors with a null delegate
Expand Down
26 changes: 26 additions & 0 deletions core/services/chainlink/config_capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ func (c *capabilitiesConfig) WorkflowRegistry() config.CapabilitiesWorkflowRegis
}
}

func (c *capabilitiesConfig) RateLimit() config.EngineExecutionRateLimit {
return &engineExecutionRateLimit{
rl: c.c.RateLimit,
}
}

type engineExecutionRateLimit struct {
rl toml.EngineExecutionRateLimit
}

func (rl *engineExecutionRateLimit) GlobalRPS() float64 {
return *rl.rl.GlobalRPS
}

func (rl *engineExecutionRateLimit) GlobalBurst() int {
return *rl.rl.GlobalBurst
}

func (rl *engineExecutionRateLimit) PerSenderRPS() float64 {
return *rl.rl.PerSenderRPS
}

func (rl *engineExecutionRateLimit) PerSenderBurst() int {
return *rl.rl.PerSenderBurst
}

func (c *capabilitiesConfig) Dispatcher() config.Dispatcher {
return &dispatcher{d: c.c.Dispatcher}
}
Expand Down
6 changes: 6 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ func TestConfig_Marshal(t *testing.T) {
},
}
full.Capabilities = toml.Capabilities{
RateLimit: toml.EngineExecutionRateLimit{
GlobalRPS: ptr(200.00),
GlobalBurst: ptr(200),
PerSenderRPS: ptr(100.0),
PerSenderBurst: ptr(100),
},
Peering: toml.P2P{
IncomingMessageBufferSize: ptr[int64](13),
OutgoingMessageBufferSize: ptr[int64](17),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.RateLimit]
GlobalRPS = 200.0
GlobalBurst = 200
PerSenderRPS = 100.0
PerSenderBurst = 100

[Capabilities.Peering]
IncomingMessageBufferSize = 10
OutgoingMessageBufferSize = 10
Expand Down
6 changes: 6 additions & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ TransmitTimeout = '3m54s'
TransmitConcurrency = 456

[Capabilities]
[Capabilities.RateLimit]
GlobalRPS = 200.0
GlobalBurst = 200
PerSenderRPS = 100.0
PerSenderBurst = 100

[Capabilities.Peering]
IncomingMessageBufferSize = 13
OutgoingMessageBufferSize = 17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ TransmitTimeout = '5s'
TransmitConcurrency = 100

[Capabilities]
[Capabilities.RateLimit]
GlobalRPS = 200.0
GlobalBurst = 200
PerSenderRPS = 100.0
PerSenderBurst = 100

[Capabilities.Peering]
IncomingMessageBufferSize = 10
OutgoingMessageBufferSize = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

Expand All @@ -38,6 +39,13 @@ import (
crypto2 "github.com/ethereum/go-ethereum/crypto"
)

var rlConfig = ratelimiter.Config{
GlobalRPS: 1000.0,
GlobalBurst: 1000,
PerSenderRPS: 30.0,
PerSenderBurst: 30,
}

type testEvtHandler struct {
events []syncer.Event
mux sync.Mutex
Expand Down Expand Up @@ -343,9 +351,11 @@ func Test_SecretsWorker(t *testing.T) {
require.NoError(t, err)
require.Equal(t, contents, giveContents)

rl, err := ratelimiter.NewRateLimiter(rlConfig)
require.NoError(t, err)
handler := &testSecretsWorkEventHandler{
wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{}),
emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl),
registeredCh: make(chan syncer.Event, 1),
}

Expand Down Expand Up @@ -425,8 +435,10 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) {
giveWorkflow.ID = id

er := syncer.NewEngineRegistry()
rl, err := ratelimiter.NewRateLimiter(rlConfig)
require.NoError(t, err)
handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{}, syncer.WithEngineRegistry(er))
emitter, clockwork.NewFakeClock(), workflowkey.Key{}, rl, syncer.WithEngineRegistry(er))

worker := syncer.NewWorkflowRegistry(
lggr,
Expand Down Expand Up @@ -523,6 +535,8 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {

mf := &mockEngineFactory{}
er := syncer.NewEngineRegistry()
rl, err := ratelimiter.NewRateLimiter(rlConfig)
require.NoError(t, err)
handler := syncer.NewEventHandler(
lggr,
orm,
Expand All @@ -532,6 +546,7 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {
emitter,
clockwork.NewFakeClock(),
workflowkey.Key{},
rl,
syncer.WithEngineRegistry(er),
syncer.WithEngineFactoryFn(mf.new),
)
Expand Down
12 changes: 11 additions & 1 deletion core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/platform"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/ratelimiter"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

Expand All @@ -22,6 +23,7 @@ type Delegate struct {
secretsFetcher secretsFetcher
logger logger.Logger
store store.Store
ratelimiter *ratelimiter.RateLimiter
}

var _ job.Delegate = (*Delegate)(nil)
Expand Down Expand Up @@ -70,6 +72,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser
Config: config,
Binary: binary,
SecretsFetcher: d.secretsFetcher,
RateLimiter: d.ratelimiter,
}
engine, err := NewEngine(ctx, cfg)
if err != nil {
Expand All @@ -93,8 +96,15 @@ func NewDelegate(
logger logger.Logger,
registry core.CapabilitiesRegistry,
store store.Store,
ratelimiter *ratelimiter.RateLimiter,
) *Delegate {
return &Delegate{logger: logger, registry: registry, secretsFetcher: newNoopSecretsFetcher(), store: store}
return &Delegate{
logger: logger,
registry: registry,
secretsFetcher: newNoopSecretsFetcher(),
store: store,
ratelimiter: ratelimiter,
}
}

func ValidatedWorkflowJobSpec(ctx context.Context, tomlString string) (job.Job, error) {
Expand Down
Loading
Loading