Skip to content

Commit

Permalink
Merge branch 'develop' into full_metrics_dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
skudasov committed Nov 30, 2023
2 parents b3f0a8a + 0ae8279 commit f92508d
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 35 deletions.
1 change: 1 addition & 0 deletions core/chains/evm/txmgr/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func TestEthBroadcaster_LoadNextSequenceMapFailure_StartupSuccess(t *testing.T)
// Instance starts without error even if loading next sequence map fails
err := eb.Start(testutils.Context(t))
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, eb.Close()) })
}

func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,8 @@ func TestTxm_Reset(t *testing.T) {
ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(nil, nil)
ethClient.On("BatchCallContextAll", mock.Anything, mock.Anything).Return(nil).Maybe()
ethClient.On("PendingSequenceAt", mock.Anything, addr).Return(128, nil).Maybe()
ethClient.On("PendingSequenceAt", mock.Anything, addr2).Return(44, nil).Maybe()
ethClient.On("PendingNonceAt", mock.Anything, addr).Return(uint64(128), nil).Maybe()
ethClient.On("PendingNonceAt", mock.Anything, addr2).Return(uint64(44), nil).Maybe()

estimator := gas.NewEstimator(logger.Test(t), ethClient, cfg.EVM(), cfg.EVM().GasEstimator())
txm, err := makeTestEvmTxm(t, db, ethClient, estimator, cfg.EVM(), cfg.EVM().GasEstimator(), cfg.EVM().Transactions(), cfg.Database(), cfg.Database().Listener(), kst.Eth())
Expand Down
20 changes: 20 additions & 0 deletions core/scripts/gateway/client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# A gateway client script

This script is used to connect to a gateway server and send commands to it.

## Usage

All requests have to be signed on behalf of a user, you need to provide your private key in .env file, e.g.

```
PRIVATE_KEY=1a2b3c...
```

The script will automatically sign the message using the provided private key.
Run the script without arguments to get the list of available commands.

## Example

```
go run . -gateway_url https://01.functions-gateway.chain.link -don_id fun-avalanche-mainnet-2 -method secrets_list -message_id 123
```
23 changes: 19 additions & 4 deletions core/scripts/gateway/client/send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
s4SetSlotId := flag.Uint("s4_set_slot_id", 0, "S4 set slot ID")
s4SetVersion := flag.Uint64("s4_set_version", 0, "S4 set version")
s4SetExpirationPeriod := flag.Int64("s4_set_expiration_period", 60*60*1000, "S4 how long until the entry expires from now (in milliseconds)")
s4SetPayload := flag.String("s4_set_payload", "", "S4 set payload")
s4SetPayloadFile := flag.String("s4_set_payload_file", "", "S4 payload file to set secret")
repeat := flag.Bool("repeat", false, "Repeat sending the request every 10 seconds")
flag.Parse()

Expand All @@ -50,14 +50,24 @@ func main() {
}
address := crypto.PubkeyToAddress(key.PublicKey)

var s4SetPayload []byte
if *methodName == functions.MethodSecretsSet {
var err error
s4SetPayload, err = os.ReadFile(*s4SetPayloadFile)
if err != nil {
fmt.Println("error reading S4 payload file", err)
return
}
}

// build payload (if relevant)
var payloadJSON []byte
if *methodName == functions.MethodSecretsSet {
envelope := s4.Envelope{
Address: address.Bytes(),
SlotID: *s4SetSlotId,
Version: *s4SetVersion,
Payload: []byte(*s4SetPayload),
Payload: s4SetPayload,
Expiration: time.Now().UnixMilli() + *s4SetExpirationPeriod,
}
signature, err := envelope.Sign(key)
Expand All @@ -70,7 +80,7 @@ func main() {
SlotID: envelope.SlotID,
Version: envelope.Version,
Expiration: envelope.Expiration,
Payload: []byte(*s4SetPayload),
Payload: s4SetPayload,
Signature: signature,
}

Expand Down Expand Up @@ -131,7 +141,12 @@ func main() {
return
}

fmt.Println(string(body))
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, body, "", " "); err != nil {
fmt.Println(string(body))
} else {
fmt.Println(prettyJSON.String())
}
}

sendRequest()
Expand Down
41 changes: 25 additions & 16 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,29 +525,33 @@ func (d *Delegate) newServicesGenericPlugin(
) (srvs []job.ServiceCtx, err error) {
spec := jb.OCR2OracleSpec

// NOTE: we don't need to validate this config, since that happens as part of creating the job.
// See: validate/validate.go's `validateSpec`.
p := validate.OCR2GenericPluginConfig{}
err = json.Unmarshal(spec.PluginConfig.Bytes(), &p)
if err != nil {
return nil, err
}
cconf := p.CoreConfig

command := cconf.Command
command := p.Command
if command == "" {
command = defaultPathFromPluginName(cconf.PluginName)
command = defaultPathFromPluginName(p.PluginName)
}

// NOTE: we don't need to validate this config, since that happens as part of creating the job.
// See: validate/validate.go's `validateSpec`.
// Add the default pipeline to the pluginConfig
p.Pipelines = append(
p.Pipelines,
validate.PipelineSpec{Name: "__DEFAULT_PIPELINE__", Spec: jb.Pipeline.Source},
)

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{PluginName: cconf.PluginName, Err: err}
return nil, ErrJobSpecNoRelayer{PluginName: p.PluginName, Err: err}
}

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.CoreConfig.PluginName}
return nil, ErrRelayNotEnabled{Err: err, Relay: spec.Relay, PluginName: p.PluginName}
}

provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{
Expand All @@ -556,7 +560,7 @@ func (d *Delegate) newServicesGenericPlugin(
ContractID: spec.ContractID,
New: d.isNewlyCreatedJob,
RelayConfig: spec.RelayConfig.Bytes(),
ProviderType: cconf.ProviderType,
ProviderType: p.ProviderType,
}, types.PluginArgs{
TransmitterID: spec.TransmitterID.String,
PluginConfig: spec.PluginConfig.Bytes(),
Expand All @@ -570,7 +574,7 @@ func (d *Delegate) newServicesGenericPlugin(
rid.Network,
rid.ChainID,
spec.ContractID,
synchronization.TelemetryType(cconf.TelemetryType),
synchronization.TelemetryType(p.TelemetryType),
)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
Expand All @@ -586,8 +590,8 @@ func (d *Delegate) newServicesGenericPlugin(
OffchainConfigDigester: provider.OffchainConfigDigester(),
}

pluginLggr := lggr.Named(cconf.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", cconf.PluginName, spec.ContractID, spec.GetID()), command)
pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command)
if err != nil {
return nil, fmt.Errorf("failed to register loop: %w", err)
}
Expand All @@ -604,7 +608,7 @@ func (d *Delegate) newServicesGenericPlugin(
//TODO: remove this workaround when the EVM relayer is running inside of an LOOPP
d.lggr.Info("provider is not a LOOPP provider, switching to provider server")

ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(cconf.ProviderType), d.lggr)
ps, err2 := relay.NewProviderServer(provider, types.OCR2PluginType(p.ProviderType), d.lggr)
if err2 != nil {
return nil, fmt.Errorf("cannot start EVM provider server: %s", err)
}
Expand All @@ -615,12 +619,17 @@ func (d *Delegate) newServicesGenericPlugin(
srvs = append(srvs, ps)
}

pc, err := json.Marshal(p.Config)
if err != nil {
return nil, fmt.Errorf("cannot dump plugin config to string before sending to plugin: %s", err)
}

pluginConfig := types.ReportingPluginServiceConfig{
PluginName: cconf.PluginName,
PluginName: p.PluginName,
Command: command,
ProviderType: cconf.ProviderType,
TelemetryType: cconf.TelemetryType,
PluginConfig: p.PluginConfig,
ProviderType: p.ProviderType,
TelemetryType: p.TelemetryType,
PluginConfig: string(pc),
}

pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner)
Expand Down
37 changes: 31 additions & 6 deletions core/services/ocr2/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,42 @@ func validateSpec(tree *toml.Tree, spec job.Job) error {
return nil
}

type coreConfig struct {
type PipelineSpec struct {
Name string `json:"name"`
Spec string `json:"spec"`
}

type Config struct {
Pipelines []PipelineSpec `json:"pipelines"`
PluginConfig map[string]any `json:"pluginConfig"`
}

type innerConfig struct {
Command string `json:"command"`
ProviderType string `json:"providerType"`
PluginName string `json:"pluginName"`
TelemetryType string `json:"telemetryType"`
Config
}

type OCR2GenericPluginConfig struct {
CoreConfig coreConfig `json:"coreConfig"`
PluginConfig string
innerConfig
}

func (o *OCR2GenericPluginConfig) UnmarshalJSON(data []byte) error {
err := json.Unmarshal(data, &o.innerConfig)
if err != nil {
return nil
}

m := map[string]any{}
err = json.Unmarshal(data, &m)
if err != nil {
return err
}

o.PluginConfig = m
return nil
}

func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error {
Expand All @@ -144,12 +170,11 @@ func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error {
return err
}

cc := p.CoreConfig
if cc.PluginName == "" {
if p.PluginName == "" {
return errors.New("generic config invalid: must provide plugin name")
}

if cc.TelemetryType == "" {
if p.TelemetryType == "" {
return errors.New("generic config invalid: must provide telemetry type")
}

Expand Down
44 changes: 42 additions & 2 deletions core/services/ocr2/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/manyminds/api2go/jsonapi"
"github.com/pelletier/go-toml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -627,7 +628,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35"
[relayConfig]
chainID = 4
[pluginConfig.coreConfig]
[pluginConfig]
pluginName = "median"
`,
assertion: func(t *testing.T, os job.Job, err error) {
Expand Down Expand Up @@ -655,7 +656,7 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35"
[relayConfig]
chainID = 4
[pluginConfig.coreConfig]
[pluginConfig]
pluginName = "median"
telemetryType = "median"
`,
Expand All @@ -678,3 +679,42 @@ telemetryType = "median"
})
}
}

type envelope struct {
PluginConfig *validate.OCR2GenericPluginConfig
}

func TestOCR2GenericPluginConfig_Unmarshal(t *testing.T) {
payload := `
[pluginConfig]
pluginName = "median"
telemetryType = "median"
foo = "bar"
[[pluginConfig.pipelines]]
name = "default"
spec = "a spec"
`
tree, err := toml.Load(payload)
require.NoError(t, err)

// Load the toml how we load it in the plugin, i.e. convert to
// map[string]any first, then treat as JSON
o := map[string]any{}
err = tree.Unmarshal(&o)
require.NoError(t, err)

b, err := json.Marshal(o)
require.NoError(t, err)

e := &envelope{}
err = json.Unmarshal(b, e)
require.NoError(t, err)

pc := e.PluginConfig
assert.Equal(t, "bar", pc.PluginConfig["foo"])
assert.Len(t, pc.Pipelines, 1)
assert.Equal(t, validate.PipelineSpec{Name: "default", Spec: "a spec"}, pc.Pipelines[0])
assert.Equal(t, "median", pc.PluginName)
assert.Equal(t, "median", pc.TelemetryType)
}
14 changes: 10 additions & 4 deletions plugins/medianpoc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ type Plugin struct {
reportingplugins.MedianProviderServer
}

type pipelineSpec struct {
Name string `json:"name"`
Spec string `json:"spec"`
}

type jsonConfig struct {
Pipelines map[string]string `json:"pipelines"`
Pipelines []pipelineSpec `json:"pipelines"`
}

func (j jsonConfig) defaultPipeline() (string, error) {
return j.getPipeline("__DEFAULT_PIPELINE__")
}

func (j jsonConfig) getPipeline(key string) (string, error) {
v, ok := j.Pipelines[key]
if ok {
return v, nil
for _, v := range j.Pipelines {
if v.Name == key {
return v.Spec, nil
}
}
return "", fmt.Errorf("no pipeline found for %s", key)
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/medianpoc/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewPlugin(t *testing.T) {
juelsPerFeeCoinSpec := "jpfc-spec"
config := types.ReportingPluginServiceConfig{
PluginConfig: fmt.Sprintf(
`{"pipelines": {"__DEFAULT_PIPELINE__": "%s", "juelsPerFeeCoinPipeline": "%s"}}`,
`{"pipelines": [{"name": "__DEFAULT_PIPELINE__", "spec": "%s"},{"name": "juelsPerFeeCoinPipeline", "spec": "%s"}]}`,
defaultSpec,
juelsPerFeeCoinSpec,
),
Expand Down

0 comments on commit f92508d

Please sign in to comment.