diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 93b1093e795..26e78344b93 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -161,6 +161,7 @@ func TestEthBroadcaster_LoadNextSequenceMapFailure_StartupSuccess(t *testing.T) // Instance starts without error even if loading next sequence map fails err := eb.Start(testutils.Context(t)) require.NoError(t, err) + t.Cleanup(func() { assert.NoError(t, eb.Close()) }) } func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 524c85b108b..745623ed77e 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -534,8 +534,8 @@ func TestTxm_Reset(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(nil, nil) ethClient.On("BatchCallContextAll", mock.Anything, mock.Anything).Return(nil).Maybe() - ethClient.On("PendingSequenceAt", mock.Anything, addr).Return(128, nil).Maybe() - ethClient.On("PendingSequenceAt", mock.Anything, addr2).Return(44, nil).Maybe() + ethClient.On("PendingNonceAt", mock.Anything, addr).Return(uint64(128), nil).Maybe() + ethClient.On("PendingNonceAt", mock.Anything, addr2).Return(uint64(44), nil).Maybe() estimator := gas.NewEstimator(logger.Test(t), ethClient, cfg.EVM(), cfg.EVM().GasEstimator()) txm, err := makeTestEvmTxm(t, db, ethClient, estimator, cfg.EVM(), cfg.EVM().GasEstimator(), cfg.EVM().Transactions(), cfg.Database(), cfg.Database().Listener(), kst.Eth()) diff --git a/core/scripts/gateway/client/README.md b/core/scripts/gateway/client/README.md new file mode 100644 index 00000000000..8c257753e6a --- /dev/null +++ b/core/scripts/gateway/client/README.md @@ -0,0 +1,20 @@ +# A gateway client script + +This script is used to connect to a gateway server and send commands to it. + +## Usage + +All requests have to be signed on behalf of a user, you need to provide your private key in .env file, e.g. + +``` +PRIVATE_KEY=1a2b3c... +``` + +The script will automatically sign the message using the provided private key. +Run the script without arguments to get the list of available commands. + +## Example + +``` +go run . -gateway_url https://01.functions-gateway.chain.link -don_id fun-avalanche-mainnet-2 -method secrets_list -message_id 123 +``` diff --git a/core/scripts/gateway/client/send_request.go b/core/scripts/gateway/client/send_request.go index 94ff6fb17da..8ab4e8bce79 100644 --- a/core/scripts/gateway/client/send_request.go +++ b/core/scripts/gateway/client/send_request.go @@ -28,7 +28,7 @@ func main() { s4SetSlotId := flag.Uint("s4_set_slot_id", 0, "S4 set slot ID") s4SetVersion := flag.Uint64("s4_set_version", 0, "S4 set version") s4SetExpirationPeriod := flag.Int64("s4_set_expiration_period", 60*60*1000, "S4 how long until the entry expires from now (in milliseconds)") - s4SetPayload := flag.String("s4_set_payload", "", "S4 set payload") + s4SetPayloadFile := flag.String("s4_set_payload_file", "", "S4 payload file to set secret") repeat := flag.Bool("repeat", false, "Repeat sending the request every 10 seconds") flag.Parse() @@ -50,6 +50,16 @@ func main() { } address := crypto.PubkeyToAddress(key.PublicKey) + var s4SetPayload []byte + if *methodName == functions.MethodSecretsSet { + var err error + s4SetPayload, err = os.ReadFile(*s4SetPayloadFile) + if err != nil { + fmt.Println("error reading S4 payload file", err) + return + } + } + // build payload (if relevant) var payloadJSON []byte if *methodName == functions.MethodSecretsSet { @@ -57,7 +67,7 @@ func main() { Address: address.Bytes(), SlotID: *s4SetSlotId, Version: *s4SetVersion, - Payload: []byte(*s4SetPayload), + Payload: s4SetPayload, Expiration: time.Now().UnixMilli() + *s4SetExpirationPeriod, } signature, err := envelope.Sign(key) @@ -70,7 +80,7 @@ func main() { SlotID: envelope.SlotID, Version: envelope.Version, Expiration: envelope.Expiration, - Payload: []byte(*s4SetPayload), + Payload: s4SetPayload, Signature: signature, } @@ -131,7 +141,12 @@ func main() { return } - fmt.Println(string(body)) + var prettyJSON bytes.Buffer + if err := json.Indent(&prettyJSON, body, "", " "); err != nil { + fmt.Println(string(body)) + } else { + fmt.Println(prettyJSON.String()) + } } sendRequest() diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index eebc95903ed..af56c1e7a99 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -525,29 +525,33 @@ func (d *Delegate) newServicesGenericPlugin( ) (srvs []job.ServiceCtx, err error) { spec := jb.OCR2OracleSpec + // NOTE: we don't need to validate this config, since that happens as part of creating the job. + // See: validate/validate.go's `validateSpec`. p := validate.OCR2GenericPluginConfig{} err = json.Unmarshal(spec.PluginConfig.Bytes(), &p) if err != nil { return nil, err } - cconf := p.CoreConfig - command := cconf.Command + command := p.Command if command == "" { - command = defaultPathFromPluginName(cconf.PluginName) + command = defaultPathFromPluginName(p.PluginName) } - // NOTE: we don't need to validate this config, since that happens as part of creating the job. - // See: validate/validate.go's `validateSpec`. + // Add the default pipeline to the pluginConfig + p.Pipelines = append( + p.Pipelines, + validate.PipelineSpec{Name: "__DEFAULT_PIPELINE__", Spec: jb.Pipeline.Source}, + ) rid, err := spec.RelayID() if err != nil { - return nil, ErrJobSpecNoRelayer{PluginName: cconf.PluginName, Err: err} + return nil, ErrJobSpecNoRelayer{PluginName: p.PluginName, Err: err} } relayer, err := d.RelayGetter.Get(rid) if err != nil { - return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.CoreConfig.PluginName} + return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.PluginName} } provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{ @@ -556,7 +560,7 @@ func (d *Delegate) newServicesGenericPlugin( ContractID: spec.ContractID, New: d.isNewlyCreatedJob, RelayConfig: spec.RelayConfig.Bytes(), - ProviderType: cconf.ProviderType, + ProviderType: p.ProviderType, }, types.PluginArgs{ TransmitterID: spec.TransmitterID.String, PluginConfig: spec.PluginConfig.Bytes(), @@ -570,7 +574,7 @@ func (d *Delegate) newServicesGenericPlugin( rid.Network, rid.ChainID, spec.ContractID, - synchronization.TelemetryType(cconf.TelemetryType), + synchronization.TelemetryType(p.TelemetryType), ) oracleArgs := libocr2.OCR2OracleArgs{ BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, @@ -586,8 +590,8 @@ func (d *Delegate) newServicesGenericPlugin( OffchainConfigDigester: provider.OffchainConfigDigester(), } - pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID()) - cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), command) + pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID()) + cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command) if err != nil { return nil, fmt.Errorf("failed to register loop: %w", err) } @@ -604,7 +608,7 @@ func (d *Delegate) newServicesGenericPlugin( //TODO: remove this workaround when the EVM relayer is running inside of an LOOPP d.lggr.Info("provider is not a LOOPP provider, switching to provider server") - ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(cconf.ProviderType), d.lggr) + ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(p.ProviderType), d.lggr) if err2 != nil { return nil, fmt.Errorf("cannot start EVM provider server: %s", err) } @@ -615,12 +619,17 @@ func (d *Delegate) newServicesGenericPlugin( srvs = append(srvs, ps) } + pc, err := json.Marshal(p.Config) + if err != nil { + return nil, fmt.Errorf("cannot dump plugin config to string before sending to plugin: %s", err) + } + pluginConfig := types.ReportingPluginServiceConfig{ - PluginName: cconf.PluginName, + PluginName: p.PluginName, Command: command, - ProviderType: cconf.ProviderType, - TelemetryType: cconf.TelemetryType, - PluginConfig: p.PluginConfig, + ProviderType: p.ProviderType, + TelemetryType: p.TelemetryType, + PluginConfig: string(pc), } pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner) diff --git a/core/services/ocr2/validate/validate.go b/core/services/ocr2/validate/validate.go index c97d23dca09..bb9bb03a8ac 100644 --- a/core/services/ocr2/validate/validate.go +++ b/core/services/ocr2/validate/validate.go @@ -125,16 +125,42 @@ func validateSpec(tree *toml.Tree, spec job.Job) error { return nil } -type coreConfig struct { +type PipelineSpec struct { + Name string `json:"name"` + Spec string `json:"spec"` +} + +type Config struct { + Pipelines []PipelineSpec `json:"pipelines"` + PluginConfig map[string]any `json:"pluginConfig"` +} + +type innerConfig struct { Command string `json:"command"` ProviderType string `json:"providerType"` PluginName string `json:"pluginName"` TelemetryType string `json:"telemetryType"` + Config } type OCR2GenericPluginConfig struct { - CoreConfig coreConfig `json:"coreConfig"` - PluginConfig string + innerConfig +} + +func (o *OCR2GenericPluginConfig) UnmarshalJSON(data []byte) error { + err := json.Unmarshal(data, &o.innerConfig) + if err != nil { + return nil + } + + m := map[string]any{} + err = json.Unmarshal(data, &m) + if err != nil { + return err + } + + o.PluginConfig = m + return nil } func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error { @@ -144,12 +170,11 @@ func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error { return err } - cc := p.CoreConfig - if cc.PluginName == "" { + if p.PluginName == "" { return errors.New("generic config invalid: must provide plugin name") } - if cc.TelemetryType == "" { + if p.TelemetryType == "" { return errors.New("generic config invalid: must provide telemetry type") } diff --git a/core/services/ocr2/validate/validate_test.go b/core/services/ocr2/validate/validate_test.go index 5b40224a4bf..b03f08f6b08 100644 --- a/core/services/ocr2/validate/validate_test.go +++ b/core/services/ocr2/validate/validate_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/manyminds/api2go/jsonapi" + "github.com/pelletier/go-toml" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -627,7 +628,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" [relayConfig] chainID = 4 -[pluginConfig.coreConfig] +[pluginConfig] pluginName = "median" `, assertion: func(t *testing.T, os job.Job, err error) { @@ -655,7 +656,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" [relayConfig] chainID = 4 -[pluginConfig.coreConfig] +[pluginConfig] pluginName = "median" telemetryType = "median" `, @@ -678,3 +679,42 @@ telemetryType = "median" }) } } + +type envelope struct { + PluginConfig *validate.OCR2GenericPluginConfig +} + +func TestOCR2GenericPluginConfig_Unmarshal(t *testing.T) { + payload := ` +[pluginConfig] +pluginName = "median" +telemetryType = "median" +foo = "bar" + +[[pluginConfig.pipelines]] +name = "default" +spec = "a spec" +` + tree, err := toml.Load(payload) + require.NoError(t, err) + + // Load the toml how we load it in the plugin, i.e. convert to + // map[string]any first, then treat as JSON + o := map[string]any{} + err = tree.Unmarshal(&o) + require.NoError(t, err) + + b, err := json.Marshal(o) + require.NoError(t, err) + + e := &envelope{} + err = json.Unmarshal(b, e) + require.NoError(t, err) + + pc := e.PluginConfig + assert.Equal(t, "bar", pc.PluginConfig["foo"]) + assert.Len(t, pc.Pipelines, 1) + assert.Equal(t, validate.PipelineSpec{Name: "default", Spec: "a spec"}, pc.Pipelines[0]) + assert.Equal(t, "median", pc.PluginName) + assert.Equal(t, "median", pc.TelemetryType) +} diff --git a/plugins/medianpoc/plugin.go b/plugins/medianpoc/plugin.go index af4ec41ab8f..fdf409b588f 100644 --- a/plugins/medianpoc/plugin.go +++ b/plugins/medianpoc/plugin.go @@ -30,8 +30,13 @@ type Plugin struct { reportingplugins.MedianProviderServer } +type pipelineSpec struct { + Name string `json:"name"` + Spec string `json:"spec"` +} + type jsonConfig struct { - Pipelines map[string]string `json:"pipelines"` + Pipelines []pipelineSpec `json:"pipelines"` } func (j jsonConfig) defaultPipeline() (string, error) { @@ -39,9 +44,10 @@ func (j jsonConfig) defaultPipeline() (string, error) { } func (j jsonConfig) getPipeline(key string) (string, error) { - v, ok := j.Pipelines[key] - if ok { - return v, nil + for _, v := range j.Pipelines { + if v.Name == key { + return v.Spec, nil + } } return "", fmt.Errorf("no pipeline found for %s", key) } diff --git a/plugins/medianpoc/plugin_test.go b/plugins/medianpoc/plugin_test.go index bc6af7ae5d3..0d6c0360e43 100644 --- a/plugins/medianpoc/plugin_test.go +++ b/plugins/medianpoc/plugin_test.go @@ -80,7 +80,7 @@ func TestNewPlugin(t *testing.T) { juelsPerFeeCoinSpec := "jpfc-spec" config := types.ReportingPluginServiceConfig{ PluginConfig: fmt.Sprintf( - `{"pipelines": {"__DEFAULT_PIPELINE__": "%s", "juelsPerFeeCoinPipeline": "%s"}}`, + `{"pipelines": [{"name": "__DEFAULT_PIPELINE__", "spec": "%s"},{"name": "juelsPerFeeCoinPipeline", "spec": "%s"}]}`, defaultSpec, juelsPerFeeCoinSpec, ),