Skip to content

Commit

Permalink
[BCF-2779] Better formatting of the generic plugin config (#11406)
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored and Borja Aranda committed Dec 14, 2023
1 parent 7f1a336 commit 01f8398
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 29 deletions.
41 changes: 25 additions & 16 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,29 +525,33 @@ func (d *Delegate) newServicesGenericPlugin(
) (srvs []job.ServiceCtx, err error) {
spec := jb.OCR2OracleSpec

// NOTE: we don't need to validate this config, since that happens as part of creating the job.
// See: validate/validate.go's `validateSpec`.
p := validate.OCR2GenericPluginConfig{}
err = json.Unmarshal(spec.PluginConfig.Bytes(), &p)
if err != nil {
return nil, err
}
cconf := p.CoreConfig

command := cconf.Command
command := p.Command
if command == "" {
command = defaultPathFromPluginName(cconf.PluginName)
command = defaultPathFromPluginName(p.PluginName)
}

// NOTE: we don't need to validate this config, since that happens as part of creating the job.
// See: validate/validate.go's `validateSpec`.
// Add the default pipeline to the pluginConfig
p.Pipelines = append(
p.Pipelines,
validate.PipelineSpec{Name: "__DEFAULT_PIPELINE__", Spec: jb.Pipeline.Source},
)

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{PluginName: cconf.PluginName, Err: err}
return nil, ErrJobSpecNoRelayer{PluginName: p.PluginName, Err: err}
}

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.CoreConfig.PluginName}
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.PluginName}
}

provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{
Expand All @@ -556,7 +560,7 @@ func (d *Delegate) newServicesGenericPlugin(
ContractID: spec.ContractID,
New: d.isNewlyCreatedJob,
RelayConfig: spec.RelayConfig.Bytes(),
ProviderType: cconf.ProviderType,
ProviderType: p.ProviderType,
}, types.PluginArgs{
TransmitterID: spec.TransmitterID.String,
PluginConfig: spec.PluginConfig.Bytes(),
Expand All @@ -570,7 +574,7 @@ func (d *Delegate) newServicesGenericPlugin(
rid.Network,
rid.ChainID,
spec.ContractID,
synchronization.TelemetryType(cconf.TelemetryType),
synchronization.TelemetryType(p.TelemetryType),
)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
Expand All @@ -586,8 +590,8 @@ func (d *Delegate) newServicesGenericPlugin(
OffchainConfigDigester: provider.OffchainConfigDigester(),
}

pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), command)
pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command)
if err != nil {
return nil, fmt.Errorf("failed to register loop: %w", err)
}
Expand All @@ -604,7 +608,7 @@ func (d *Delegate) newServicesGenericPlugin(
//TODO: remove this workaround when the EVM relayer is running inside of an LOOPP
d.lggr.Info("provider is not a LOOPP provider, switching to provider server")

ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(cconf.ProviderType), d.lggr)
ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(p.ProviderType), d.lggr)
if err2 != nil {
return nil, fmt.Errorf("cannot start EVM provider server: %s", err)
}
Expand All @@ -615,12 +619,17 @@ func (d *Delegate) newServicesGenericPlugin(
srvs = append(srvs, ps)
}

pc, err := json.Marshal(p.Config)
if err != nil {
return nil, fmt.Errorf("cannot dump plugin config to string before sending to plugin: %s", err)
}

pluginConfig := types.ReportingPluginServiceConfig{
PluginName: cconf.PluginName,
PluginName: p.PluginName,
Command: command,
ProviderType: cconf.ProviderType,
TelemetryType: cconf.TelemetryType,
PluginConfig: p.PluginConfig,
ProviderType: p.ProviderType,
TelemetryType: p.TelemetryType,
PluginConfig: string(pc),
}

pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner)
Expand Down
37 changes: 31 additions & 6 deletions core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,42 @@ func validateSpec(tree *toml.Tree, spec job.Job) error {
return nil
}

type coreConfig struct {
type PipelineSpec struct {
Name string `json:"name"`
Spec string `json:"spec"`
}

type Config struct {
Pipelines []PipelineSpec `json:"pipelines"`
PluginConfig map[string]any `json:"pluginConfig"`
}

type innerConfig struct {
Command string `json:"command"`
ProviderType string `json:"providerType"`
PluginName string `json:"pluginName"`
TelemetryType string `json:"telemetryType"`
Config
}

type OCR2GenericPluginConfig struct {
CoreConfig coreConfig `json:"coreConfig"`
PluginConfig string
innerConfig
}

func (o *OCR2GenericPluginConfig) UnmarshalJSON(data []byte) error {
err := json.Unmarshal(data, &o.innerConfig)
if err != nil {
return nil
}

m := map[string]any{}
err = json.Unmarshal(data, &m)
if err != nil {
return err
}

o.PluginConfig = m
return nil
}

func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error {
Expand All @@ -144,12 +170,11 @@ func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error {
return err
}

cc := p.CoreConfig
if cc.PluginName == "" {
if p.PluginName == "" {
return errors.New("generic config invalid: must provide plugin name")
}

if cc.TelemetryType == "" {
if p.TelemetryType == "" {
return errors.New("generic config invalid: must provide telemetry type")
}

Expand Down
44 changes: 42 additions & 2 deletions core/services/ocr2/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/manyminds/api2go/jsonapi"
"github.com/pelletier/go-toml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -627,7 +628,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35"
[relayConfig]
chainID = 4
[pluginConfig.coreConfig]
[pluginConfig]
pluginName = "median"
`,
assertion: func(t *testing.T, os job.Job, err error) {
Expand Down Expand Up @@ -655,7 +656,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35"
[relayConfig]
chainID = 4
[pluginConfig.coreConfig]
[pluginConfig]
pluginName = "median"
telemetryType = "median"
`,
Expand All @@ -678,3 +679,42 @@ telemetryType = "median"
})
}
}

type envelope struct {
PluginConfig *validate.OCR2GenericPluginConfig
}

func TestOCR2GenericPluginConfig_Unmarshal(t *testing.T) {
payload := `
[pluginConfig]
pluginName = "median"
telemetryType = "median"
foo = "bar"
[[pluginConfig.pipelines]]
name = "default"
spec = "a spec"
`
tree, err := toml.Load(payload)
require.NoError(t, err)

// Load the toml how we load it in the plugin, i.e. convert to
// map[string]any first, then treat as JSON
o := map[string]any{}
err = tree.Unmarshal(&o)
require.NoError(t, err)

b, err := json.Marshal(o)
require.NoError(t, err)

e := &envelope{}
err = json.Unmarshal(b, e)
require.NoError(t, err)

pc := e.PluginConfig
assert.Equal(t, "bar", pc.PluginConfig["foo"])
assert.Len(t, pc.Pipelines, 1)
assert.Equal(t, validate.PipelineSpec{Name: "default", Spec: "a spec"}, pc.Pipelines[0])
assert.Equal(t, "median", pc.PluginName)
assert.Equal(t, "median", pc.TelemetryType)
}
14 changes: 10 additions & 4 deletions plugins/medianpoc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ type Plugin struct {
reportingplugins.MedianProviderServer
}

type pipelineSpec struct {
Name string `json:"name"`
Spec string `json:"spec"`
}

type jsonConfig struct {
Pipelines map[string]string `json:"pipelines"`
Pipelines []pipelineSpec `json:"pipelines"`
}

func (j jsonConfig) defaultPipeline() (string, error) {
return j.getPipeline("__DEFAULT_PIPELINE__")
}

func (j jsonConfig) getPipeline(key string) (string, error) {
v, ok := j.Pipelines[key]
if ok {
return v, nil
for _, v := range j.Pipelines {
if v.Name == key {
return v.Spec, nil
}
}
return "", fmt.Errorf("no pipeline found for %s", key)
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/medianpoc/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewPlugin(t *testing.T) {
juelsPerFeeCoinSpec := "jpfc-spec"
config := types.ReportingPluginServiceConfig{
PluginConfig: fmt.Sprintf(
`{"pipelines": {"__DEFAULT_PIPELINE__": "%s", "juelsPerFeeCoinPipeline": "%s"}}`,
`{"pipelines": [{"name": "__DEFAULT_PIPELINE__", "spec": "%s"},{"name": "juelsPerFeeCoinPipeline", "spec": "%s"}]}`,
defaultSpec,
juelsPerFeeCoinSpec,
),
Expand Down

0 comments on commit 01f8398

Please sign in to comment.