From eaeb2ebe7bfc53572655be322b793f0bf9556e1e Mon Sep 17 00:00:00 2001 From: krehermann <16602512+krehermann@users.noreply.github.com> Date: Thu, 12 Dec 2024 10:26:36 -0700 Subject: [PATCH] [KS-616] asset job updates (#15573) * fix error handling * add test * idempotent registry * add changeset * fix tests * isolate fix to mercury; more tests --- .changeset/big-camels-report.md | 5 + core/services/ocr2/plugins/mercury/plugin.go | 89 ++++++++---- .../ocr2/plugins/mercury/plugin_test.go | 133 ++++++++++++++++-- plugins/registrar.go | 2 +- 4 files changed, 190 insertions(+), 39 deletions(-) create mode 100644 .changeset/big-camels-report.md diff --git a/.changeset/big-camels-report.md b/.changeset/big-camels-report.md new file mode 100644 index 00000000000..f81f66b9138 --- /dev/null +++ b/.changeset/big-camels-report.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix fix non-idempotent loopp registry.Register diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index 8a4101804dd..b0983e55c89 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -1,6 +1,7 @@ package mercury import ( + "context" "encoding/json" "fmt" "os/exec" @@ -79,14 +80,13 @@ func NewServices( return nil, errors.New("expected job to have a non-nil PipelineSpec") } - var err error var pluginConfig config.PluginConfig if len(jb.OCR2OracleSpec.PluginConfig) == 0 { if !enableTriggerCapability { return nil, fmt.Errorf("at least one transmission option must be configured") } } else { - err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig) + err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig) if err != nil { return nil, errors.WithStack(err) } @@ -101,8 +101,8 @@ func NewServices( // encapsulate all the subservices and ensure we close them all if any fail to start srvs := []job.ServiceCtx{ocr2Provider} abort := func() { - if err = services.MultiCloser(srvs).Close(); err != nil { - lggr.Errorw("Error closing unused services", "err", err) + if cerr := services.MultiCloser(srvs).Close(); cerr != nil { + lggr.Errorw("Error closing unused services", "err", cerr) } } saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) @@ -112,6 +112,7 @@ func NewServices( var ( factory ocr3types.MercuryPluginFactory factoryServices []job.ServiceCtx + fErr error ) fCfg := factoryCfg{ orm: orm, @@ -127,31 +128,31 @@ func NewServices( } switch feedID.Version() { case 1: - factory, factoryServices, err = newv1factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv1factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 2: - factory, factoryServices, err = newv2factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv2factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 3: - factory, factoryServices, err = newv3factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv3factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr) } srvs = append(srvs, factoryServices...) case 4: - factory, factoryServices, err = newv4factory(fCfg) - if err != nil { + factory, factoryServices, fErr = newv4factory(fCfg) + if fErr != nil { abort() - return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err) + return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr) } srvs = append(srvs, factoryServices...) default: @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loop mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. loopEnabled := loopCmd != "" if loopEnabled { - cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) if err != nil { return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) } // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + // and unregistration of the loop factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) - srvs = append(srvs, factoryServer) + srvs = append(srvs, factoryServer, unregisterer) // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle factory = factoryServer } else { @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job. return factory, srvs, nil } -func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) { +func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) { lggr.Debugw("Initializing Mercury loop", "command", cmd) mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String()) envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) if err != nil { - return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) + return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err) } + loopID := mercuryLggr.Name() cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{ - ID: mercuryLggr.Name(), + ID: loopID, Cmd: cmd, Env: envVars, }) if err != nil { - return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err) + } + return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil +} + +// loopUnregisterCloser is a helper to unregister a loop +// as a service +// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern +// perhaps it can be implemented in the delegate on job delete. +type loopUnregisterCloser struct { + r plugins.RegistrarConfig + id string +} + +func (l *loopUnregisterCloser) Close() error { + l.r.UnregisterLOOP(l.id) + return nil +} + +func (l *loopUnregisterCloser) Start(ctx context.Context) error { + return nil +} + +func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser { + return &loopUnregisterCloser{ + r: r, + id: id, } - return cmdFn, opts, mercuryLggr, nil } diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index 22aaf7522de..eb67da53100 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -2,6 +2,7 @@ package mercury_test import ( "context" + "errors" "os/exec" "reflect" "testing" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -22,6 +24,7 @@ import ( v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" v4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury" @@ -92,21 +95,23 @@ var ( // this is kind of gross, but it's the best way to test return values of the services expectedEmbeddedServiceCnt = 3 - expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 1 + expectedLoopServiceCnt = expectedEmbeddedServiceCnt + 2 // factory server and loop unregisterer ) func TestNewServices(t *testing.T) { type args struct { pluginConfig job.JSONConfig feedID utils.FeedID + cfg mercuryocr2.Config } - tests := []struct { + testCases := []struct { name string args args loopMode bool wantLoopFactory any wantServiceCnt int wantErr bool + wantErrStr string }{ { name: "no plugin config error ", @@ -186,6 +191,19 @@ func TestNewServices(t *testing.T) { wantErr: false, wantLoopFactory: &loop.MercuryV3Service{}, }, + { + name: "v3 loop err", + loopMode: true, + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + cfg: mercuryocr2.NewMercuryConfig(1, 1, &testRegistrarConfig{failRegister: true}), + }, + wantServiceCnt: expectedLoopServiceCnt, + wantErr: true, + wantLoopFactory: &loop.MercuryV3Service{}, + wantErrStr: "failed to init loop for feed", + }, { name: "v4 loop", loopMode: true, @@ -198,17 +216,27 @@ func TestNewServices(t *testing.T) { wantLoopFactory: &loop.MercuryV4Service{}, }, } - for _, tt := range tests { + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { if tt.loopMode { t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) } - got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID) + // use default config if not provided + if tt.args.cfg == nil { + tt.args.cfg = testCfg + } + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) if (err != nil) != tt.wantErr { t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) return } + if err != nil { + if tt.wantErrStr != "" { + assert.Contains(t, err.Error(), tt.wantErrStr) + } + return + } assert.Len(t, got, tt.wantServiceCnt) if tt.loopMode { foundLoopFactory := false @@ -222,15 +250,97 @@ func TestNewServices(t *testing.T) { } }) } + + t.Run("restartable loop", func(t *testing.T) { + // setup a real loop registry to test restartability + registry := plugins.NewLoopRegistry(logger.TestLogger(t), nil, nil, nil, "") + loopRegistrarConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, registry.Register, registry.Unregister) + prodCfg := mercuryocr2.NewMercuryConfig(1, 1, loopRegistrarConfig) + type args struct { + pluginConfig job.JSONConfig + feedID utils.FeedID + cfg mercuryocr2.Config + } + testCases := []struct { + name string + args args + wantErr bool + }{ + { + name: "v1 loop", + args: args{ + pluginConfig: v1jsonCfg, + feedID: v1FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v2 loop", + args: args{ + pluginConfig: v2jsonCfg, + feedID: v2FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v3 loop", + args: args{ + pluginConfig: v3jsonCfg, + feedID: v3FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + { + name: "v4 loop", + args: args{ + pluginConfig: v4jsonCfg, + feedID: v4FeedId, + cfg: prodCfg, + }, + wantErr: false, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + t.Setenv(string(env.MercuryPlugin.Cmd), "fake_cmd") + assert.NotEmpty(t, env.MercuryPlugin.Cmd.Get()) + + got, err := newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + if (err != nil) != tt.wantErr { + t.Errorf("NewServices() error = %v, wantErr %v", err, tt.wantErr) + return + } + // hack to simulate a restart. we don't have enough boilerplate to start the oracle service + // only care about the subservices so we start all except the oracle, which happens to be the last one + for i := 0; i < len(got)-1; i++ { + require.NoError(t, got[i].Start(tests.Context(t))) + } + // if we don't close the services, we get conflicts with the loop registry + _, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + require.ErrorContains(t, err, "plugin already registered") + + // close all services and try again + for i := len(got) - 2; i >= 0; i-- { + require.NoError(t, got[i].Close()) + } + _, err = newServicesTestWrapper(t, tt.args.pluginConfig, tt.args.feedID, tt.args.cfg) + require.NoError(t, err) + }) + } + }) } // we are only varying the version via feedID (and the plugin config) // this wrapper supplies dummy values for the rest of the arguments -func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID) ([]job.ServiceCtx, error) { +func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID, cfg mercuryocr2.Config) ([]job.ServiceCtx, error) { t.Helper() jb := testJob jb.OCR2OracleSpec.PluginConfig = pluginConfig - return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID, false) + return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, cfg, nil, &testDataSourceORM{}, feedID, false) } type testProvider struct{} @@ -292,16 +402,21 @@ func (*testProvider) ReportCodecV3() v3.ReportCodec { return nil } func (*testProvider) ReportCodecV4() v4.ReportCodec { return nil } // Start implements types.MercuryProvider. -func (*testProvider) Start(context.Context) error { panic("unimplemented") } +func (*testProvider) Start(context.Context) error { return nil } var _ commontypes.MercuryProvider = (*testProvider)(nil) -type testRegistrarConfig struct{} +type testRegistrarConfig struct { + failRegister bool +} func (c *testRegistrarConfig) UnregisterLOOP(ID string) {} // RegisterLOOP implements plugins.RegistrarConfig. -func (*testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { +func (c *testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { + if c.failRegister { + return nil, loop.GRPCOpts{}, errors.New("failed to register") + } return nil, loop.GRPCOpts{}, nil } diff --git a/plugins/registrar.go b/plugins/registrar.go index 2a82f2a6204..8523d3980cc 100644 --- a/plugins/registrar.go +++ b/plugins/registrar.go @@ -6,7 +6,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/loop" ) -// RegistrarConfig generates contains static configuration inher +// RegistrarConfig generates contains static configuration type RegistrarConfig interface { RegisterLOOP(config CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) UnregisterLOOP(ID string)