From 290f72a61326472a3f6d0ab2cf9ba1246685a1c7 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Wed, 29 Nov 2023 11:31:38 +0000 Subject: [PATCH] [BCF-2779] Better formatting of the generic plugin config --- core/services/ocr2/delegate.go | 41 +++++++++++------- core/services/ocr2/validate/validate.go | 37 +++++++++++++--- core/services/ocr2/validate/validate_test.go | 44 +++++++++++++++++++- plugins/medianpoc/plugin.go | 14 +++++-- plugins/medianpoc/plugin_test.go | 2 +- 5 files changed, 109 insertions(+), 29 deletions(-) diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index eebc95903ed..af56c1e7a99 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -525,29 +525,33 @@ func (d *Delegate) newServicesGenericPlugin( ) (srvs []job.ServiceCtx, err error) { spec := jb.OCR2OracleSpec + // NOTE: we don't need to validate this config, since that happens as part of creating the job. + // See: validate/validate.go's `validateSpec`. p := validate.OCR2GenericPluginConfig{} err = json.Unmarshal(spec.PluginConfig.Bytes(), &p) if err != nil { return nil, err } - cconf := p.CoreConfig - command := cconf.Command + command := p.Command if command == "" { - command = defaultPathFromPluginName(cconf.PluginName) + command = defaultPathFromPluginName(p.PluginName) } - // NOTE: we don't need to validate this config, since that happens as part of creating the job. - // See: validate/validate.go's `validateSpec`. + // Add the default pipeline to the pluginConfig + p.Pipelines = append( + p.Pipelines, + validate.PipelineSpec{Name: "__DEFAULT_PIPELINE__", Spec: jb.Pipeline.Source}, + ) rid, err := spec.RelayID() if err != nil { - return nil, ErrJobSpecNoRelayer{PluginName: cconf.PluginName, Err: err} + return nil, ErrJobSpecNoRelayer{PluginName: p.PluginName, Err: err} } relayer, err := d.RelayGetter.Get(rid) if err != nil { - return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.CoreConfig.PluginName} + return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.PluginName} } provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{ @@ -556,7 +560,7 @@ func (d *Delegate) newServicesGenericPlugin( ContractID: spec.ContractID, New: d.isNewlyCreatedJob, RelayConfig: spec.RelayConfig.Bytes(), - ProviderType: cconf.ProviderType, + ProviderType: p.ProviderType, }, types.PluginArgs{ TransmitterID: spec.TransmitterID.String, PluginConfig: spec.PluginConfig.Bytes(), @@ -570,7 +574,7 @@ func (d *Delegate) newServicesGenericPlugin( rid.Network, rid.ChainID, spec.ContractID, - synchronization.TelemetryType(cconf.TelemetryType), + synchronization.TelemetryType(p.TelemetryType), ) oracleArgs := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, @@ -586,8 +590,8 @@ func (d *Delegate) newServicesGenericPlugin( OffchainConfigDigester: provider.OffchainConfigDigester(), } - pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID()) - cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), command) + pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID()) + cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command) if err != nil { return nil, fmt.Errorf("failed to register loop: %w", err) } @@ -604,7 +608,7 @@ func (d *Delegate) newServicesGenericPlugin( //TODO: remove this workaround when the EVM relayer is running inside of an LOOPP d.lggr.Info("provider is not a LOOPP provider, switching to provider server") - ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(cconf.ProviderType), d.lggr) + ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(p.ProviderType), d.lggr) if err2 != nil { return nil, fmt.Errorf("cannot start EVM provider server: %s", err) } @@ -615,12 +619,17 @@ func (d *Delegate) newServicesGenericPlugin( srvs = append(srvs, ps) } + pc, err := json.Marshal(p.Config) + if err != nil { + return nil, fmt.Errorf("cannot dump plugin config to string before sending to plugin: %s", err) + } + pluginConfig := types.ReportingPluginServiceConfig{ - PluginName: cconf.PluginName, + PluginName: p.PluginName, Command: command, - ProviderType: cconf.ProviderType, - TelemetryType: cconf.TelemetryType, - PluginConfig: p.PluginConfig, + ProviderType: p.ProviderType, + TelemetryType: p.TelemetryType, + PluginConfig: string(pc), } pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner) diff --git a/core/services/ocr2/validate/validate.go b/core/services/ocr2/validate/validate.go index c97d23dca09..bb9bb03a8ac 100644 --- a/core/services/ocr2/validate/validate.go +++ b/core/services/ocr2/validate/validate.go @@ -125,16 +125,42 @@ func validateSpec(tree *toml.Tree, spec job.Job) error { return nil } -type coreConfig struct { +type PipelineSpec struct { + Name string `json:"name"` + Spec string `json:"spec"` +} + +type Config struct { + Pipelines []PipelineSpec `json:"pipelines"` + PluginConfig map[string]any `json:"pluginConfig"` +} + +type innerConfig struct { Command string `json:"command"` ProviderType string `json:"providerType"` PluginName string `json:"pluginName"` TelemetryType string `json:"telemetryType"` + Config } type OCR2GenericPluginConfig struct { - CoreConfig coreConfig `json:"coreConfig"` - PluginConfig string + innerConfig +} + +func (o *OCR2GenericPluginConfig) UnmarshalJSON(data []byte) error { + err := json.Unmarshal(data, &o.innerConfig) + if err != nil { + return nil + } + + m := map[string]any{} + err = json.Unmarshal(data, &m) + if err != nil { + return err + } + + o.PluginConfig = m + return nil } func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error { @@ -144,12 +170,11 @@ func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error { return err } - cc := p.CoreConfig - if cc.PluginName == "" { + if p.PluginName == "" { return errors.New("generic config invalid: must provide plugin name") } - if cc.TelemetryType == "" { + if p.TelemetryType == "" { return errors.New("generic config invalid: must provide telemetry type") } diff --git a/core/services/ocr2/validate/validate_test.go b/core/services/ocr2/validate/validate_test.go index 5b40224a4bf..b03f08f6b08 100644 --- a/core/services/ocr2/validate/validate_test.go +++ b/core/services/ocr2/validate/validate_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/manyminds/api2go/jsonapi" + "github.com/pelletier/go-toml" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -627,7 +628,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" [relayConfig] chainID = 4 -[pluginConfig.coreConfig] +[pluginConfig] pluginName = "median" `, assertion: func(t *testing.T, os job.Job, err error) { @@ -655,7 +656,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" [relayConfig] chainID = 4 -[pluginConfig.coreConfig] +[pluginConfig] pluginName = "median" telemetryType = "median" `, @@ -678,3 +679,42 @@ telemetryType = "median" }) } } + +type envelope struct { + PluginConfig *validate.OCR2GenericPluginConfig +} + +func TestOCR2GenericPluginConfig_Unmarshal(t *testing.T) { + payload := ` +[pluginConfig] +pluginName = "median" +telemetryType = "median" +foo = "bar" + +[[pluginConfig.pipelines]] +name = "default" +spec = "a spec" +` + tree, err := toml.Load(payload) + require.NoError(t, err) + + // Load the toml how we load it in the plugin, i.e. convert to + // map[string]any first, then treat as JSON + o := map[string]any{} + err = tree.Unmarshal(&o) + require.NoError(t, err) + + b, err := json.Marshal(o) + require.NoError(t, err) + + e := &envelope{} + err = json.Unmarshal(b, e) + require.NoError(t, err) + + pc := e.PluginConfig + assert.Equal(t, "bar", pc.PluginConfig["foo"]) + assert.Len(t, pc.Pipelines, 1) + assert.Equal(t, validate.PipelineSpec{Name: "default", Spec: "a spec"}, pc.Pipelines[0]) + assert.Equal(t, "median", pc.PluginName) + assert.Equal(t, "median", pc.TelemetryType) +} diff --git a/plugins/medianpoc/plugin.go b/plugins/medianpoc/plugin.go index af4ec41ab8f..fdf409b588f 100644 --- a/plugins/medianpoc/plugin.go +++ b/plugins/medianpoc/plugin.go @@ -30,8 +30,13 @@ type Plugin struct { reportingplugins.MedianProviderServer } +type pipelineSpec struct { + Name string `json:"name"` + Spec string `json:"spec"` +} + type jsonConfig struct { - Pipelines map[string]string `json:"pipelines"` + Pipelines []pipelineSpec `json:"pipelines"` } func (j jsonConfig) defaultPipeline() (string, error) { @@ -39,9 +44,10 @@ func (j jsonConfig) defaultPipeline() (string, error) { } func (j jsonConfig) getPipeline(key string) (string, error) { - v, ok := j.Pipelines[key] - if ok { - return v, nil + for _, v := range j.Pipelines { + if v.Name == key { + return v.Spec, nil + } } return "", fmt.Errorf("no pipeline found for %s", key) } diff --git a/plugins/medianpoc/plugin_test.go b/plugins/medianpoc/plugin_test.go index bc6af7ae5d3..0d6c0360e43 100644 --- a/plugins/medianpoc/plugin_test.go +++ b/plugins/medianpoc/plugin_test.go @@ -80,7 +80,7 @@ func TestNewPlugin(t *testing.T) { juelsPerFeeCoinSpec := "jpfc-spec" config := types.ReportingPluginServiceConfig{ PluginConfig: fmt.Sprintf( - `{"pipelines": {"__DEFAULT_PIPELINE__": "%s", "juelsPerFeeCoinPipeline": "%s"}}`, + `{"pipelines": [{"name": "__DEFAULT_PIPELINE__", "spec": "%s"},{"name": "juelsPerFeeCoinPipeline", "spec": "%s"}]}`, defaultSpec, juelsPerFeeCoinSpec, ),