diff --git a/core/services/ccipcapability/delegate.go b/core/services/ccipcapability/delegate.go new file mode 100644 index 00000000000..42eda10aea5 --- /dev/null +++ b/core/services/ccipcapability/delegate.go @@ -0,0 +1,199 @@ +package ccipcapability + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/launcher" + "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/oraclecreator" + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/plugins" +) + +type RelayGetter interface { + GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error) +} + +type Delegate struct { + lggr logger.Logger + registrarConfig plugins.RegistrarConfig + pipelineRunner pipeline.Runner + relayGetter RelayGetter + capRegistry cctypes.CapabilityRegistry + keystore keystore.Master + ds sqlutil.DataSource + peerWrapper *ocrcommon.SingletonPeerWrapper + + isNewlyCreatedJob bool +} + +func NewDelegate( + lggr logger.Logger, + registrarConfig plugins.RegistrarConfig, + pipelineRunner pipeline.Runner, + relayGetter RelayGetter, + registrySyncer cctypes.CapabilityRegistry, + keystore keystore.Master, + ds sqlutil.DataSource, + peerWrapper *ocrcommon.SingletonPeerWrapper, +) *Delegate { + return &Delegate{ + lggr: lggr, + registrarConfig: registrarConfig, + pipelineRunner: pipelineRunner, + relayGetter: relayGetter, + capRegistry: registrySyncer, + ds: ds, + keystore: keystore, + peerWrapper: peerWrapper, + } +} + +func (d *Delegate) JobType() job.Type { + return job.CCIP +} + +func (d *Delegate) BeforeJobCreated(job.Job) { + // This is only called first time the job is created + d.isNewlyCreatedJob = true +} + +func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services []job.ServiceCtx, err error) { + // In general there should only be one P2P key but the node may have multiple. + // The job spec should specify the correct P2P key to use. + peerID, err := p2pkey.MakePeerID(spec.CCIPSpec.P2PKeyID) + if err != nil { + return nil, errors.Wrapf(err, "failed to make peer ID from provided spec p2p id: %s", spec.CCIPSpec.P2PKeyID) + } + + p2pID, err := d.keystore.P2P().Get(peerID) + if err != nil { + return nil, errors.Wrap(err, "failed to get all p2p keys") + } + + ocrKeys := make(map[chaintype.ChainType]ocr2key.KeyBundle) + for chainType, bundleAny := range spec.CCIPSpec.OCRKeyBundleIDs { + ct := chaintype.ChainType(chainType) + if !chaintype.IsSupportedChainType(ct) { + return nil, errors.Errorf("unsupported chain type: %s", chainType) + } + + bundleID, ok := bundleAny.(string) + if !ok { + return nil, errors.New("OCRKeyBundleIDs must be a map of chain types to OCR key bundle IDs") + } + + bundle, err := d.keystore.OCR2().Get(bundleID) + if err != nil { + return nil, errors.Wrapf(err, "OCR key bundle with ID %s not found", bundleID) + } + + ocrKeys[ct] = bundle + } + + transmitterKeys := make(map[types.RelayID]string) + for relayIDStr, transmitterIDAny := range spec.CCIPSpec.TransmitterIDs { + var relayID types.RelayID + if err := relayID.UnmarshalString(relayIDStr); err != nil { + return nil, errors.Wrapf(err, "invalid relay ID specified in transmitter ids mapping: %s", relayIDStr) + } + + transmitterID, ok := transmitterIDAny.(string) + if !ok { + return nil, errors.New("transmitter id is not a string") + } + + switch relayID.Network { + case types.NetworkEVM: + ethKey, err := d.keystore.Eth().Get(ctx, transmitterID) + if err != nil { + return nil, errors.Wrapf(err, "eth transmitter key with ID %s not found", transmitterID) + } + + transmitterKeys[relayID] = ethKey.String() + case types.NetworkCosmos: + cosmosKey, err := d.keystore.Cosmos().Get(transmitterID) + if err != nil { + return nil, errors.Wrapf(err, "cosmos transmitter key with ID %s not found", transmitterID) + } + + transmitterKeys[relayID] = cosmosKey.String() + case types.NetworkSolana: + solKey, err := d.keystore.Solana().Get(transmitterID) + if err != nil { + return nil, errors.Wrapf(err, "solana transmitter key with ID %s not found", transmitterID) + } + + transmitterKeys[relayID] = solKey.String() + case types.NetworkStarkNet: + starkKey, err := d.keystore.StarkNet().Get(transmitterID) + if err != nil { + return nil, errors.Wrapf(err, "starknet transmitter key with ID %s not found", transmitterID) + } + + transmitterKeys[relayID] = starkKey.String() + default: + return nil, errors.Errorf("unsupported network: %s", relayID.Network) + } + } + + relayers, err := d.relayGetter.GetIDToRelayerMap() + if err != nil { + return nil, errors.Wrap(err, "failed to get all relayers") + } + + // NOTE: we can use the same DB for all plugin instances, + // since all queries are scoped by config digest. + ocrDB := ocr2.NewDB(d.ds, spec.ID, 0, d.lggr) + + // TODO: implement + hcr := &homeChainReader{} + + oracleCreator := oraclecreator.New( + ocrKeys, + transmitterKeys, + relayers, + d.peerWrapper, + spec.ExternalJobID, + spec.ID, + d.isNewlyCreatedJob, + spec.CCIPSpec.RelayConfigs, + spec.CCIPSpec.PluginConfig, + ocrDB, + ) + + return []job.ServiceCtx{ + hcr, + launcher.New( + spec.CCIPSpec.CapabilityVersion, + spec.CCIPBootstrapSpec.CapabilityLabelledName, + p2pID, + d.capRegistry, + d.lggr, + hcr, + oracleCreator, + ), + }, nil +} + +func (d *Delegate) AfterJobCreated(spec job.Job) {} + +func (d *Delegate) BeforeJobDeleted(spec job.Job) {} + +func (d *Delegate) OnDeleteJob(ctx context.Context, spec job.Job) error { + // TODO: shut down needed services? + return nil +} diff --git a/core/services/ccipcapability/home_chain_reader.go b/core/services/ccipcapability/home_chain_reader.go new file mode 100644 index 00000000000..2833d3dda6d --- /dev/null +++ b/core/services/ccipcapability/home_chain_reader.go @@ -0,0 +1,53 @@ +package ccipcapability + +import ( + "context" + + "github.com/smartcontractkit/chainlink/v2/core/services" + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" +) + +var _ cctypes.HomeChainReader = (*homeChainReader)(nil) +var _ services.ServiceCtx = (*homeChainReader)(nil) + +type homeChainReader struct{} + +// Close implements services.Service. +func (h *homeChainReader) Close() error { + panic("unimplemented") +} + +// HealthReport implements services.Service. +func (h *homeChainReader) HealthReport() map[string]error { + panic("unimplemented") +} + +// Name implements services.Service. +func (h *homeChainReader) Name() string { + panic("unimplemented") +} + +// Ready implements services.Service. +func (h *homeChainReader) Ready() error { + panic("unimplemented") +} + +// Start implements services.Service. +func (h *homeChainReader) Start(context.Context) error { + panic("unimplemented") +} + +// GetAllChainConfigs implements HomeChainReader. +func (h *homeChainReader) GetAllChainConfigs(ctx context.Context) (map[uint64]cctypes.ChainConfig, error) { + panic("unimplemented") +} + +// GetOCRConfigs implements HomeChainReader. +func (h *homeChainReader) GetOCRConfigs(ctx context.Context, donID uint32, pluginType cctypes.PluginType) ([]cctypes.OCRConfig, error) { + panic("unimplemented") +} + +// IsHealthy implements HomeChainReader. +func (h *homeChainReader) IsHealthy() bool { + panic("unimplemented") +} diff --git a/core/services/ccipcapability/launcher/bluegreen.go b/core/services/ccipcapability/launcher/bluegreen.go new file mode 100644 index 00000000000..526198faa48 --- /dev/null +++ b/core/services/ccipcapability/launcher/bluegreen.go @@ -0,0 +1,58 @@ +package launcher + +import ( + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "go.uber.org/multierr" +) + +// blueGreenDeployment represents a blue-green deployment of OCR instances. +type blueGreenDeployment struct { + // blue is the blue OCR instance. + // blue must always be present. + blue cctypes.CCIPOracle + + // green is the green OCR instance. + // green may or may not be present. + // green must never be present if blue is not present. + // TODO: should we enforce this invariant somehow? + green cctypes.CCIPOracle +} + +// ccipDeployment represents blue-green deployments of both commit and exec +// OCR instances. +type ccipDeployment struct { + commit blueGreenDeployment + exec blueGreenDeployment +} + +// Shutdown shuts down all OCR instances in the deployment. +func (c *ccipDeployment) Shutdown() error { + var err error + + err = multierr.Append(err, c.commit.blue.Shutdown()) + if c.commit.green != nil { + err = multierr.Append(err, c.commit.green.Shutdown()) + } + + err = multierr.Append(err, c.exec.blue.Shutdown()) + if c.exec.green != nil { + err = multierr.Append(err, c.exec.green.Shutdown()) + } + return err +} + +// NumCommitInstances returns the number of commit OCR instances in the deployment. +func (c *ccipDeployment) NumCommitInstances() int { + if c.commit.green != nil { + return 2 + } + return 1 +} + +// NumExecInstances returns the number of exec OCR instances in the deployment. +func (c *ccipDeployment) NumExecInstances() int { + if c.exec.green != nil { + return 2 + } + return 1 +} diff --git a/core/services/ccipcapability/launcher/bluegreen_test.go b/core/services/ccipcapability/launcher/bluegreen_test.go new file mode 100644 index 00000000000..48401de9689 --- /dev/null +++ b/core/services/ccipcapability/launcher/bluegreen_test.go @@ -0,0 +1,187 @@ +package launcher + +import ( + "errors" + "testing" + + mocktypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types/mocks" + "github.com/test-go/testify/require" +) + +func Test_commitExecDeployment_Shutdown(t *testing.T) { + tests := []struct { + name string + commitBlue *mocktypes.CCIPOracle + commitGreen *mocktypes.CCIPOracle + execBlue *mocktypes.CCIPOracle + execGreen *mocktypes.CCIPOracle + expect func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) + asserts func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) + wantErr bool + }{ + { + name: "no errors, blue only", + commitBlue: mocktypes.NewCCIPOracle(t), + commitGreen: nil, + execBlue: mocktypes.NewCCIPOracle(t), + execGreen: nil, + expect: func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) { + commitBlue.On("Shutdown").Return(nil).Once() + execBlue.On("Shutdown").Return(nil).Once() + }, + asserts: func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) { + commitBlue.AssertExpectations(t) + execBlue.AssertExpectations(t) + }, + wantErr: false, + }, + { + name: "no errors, blue and green", + commitBlue: mocktypes.NewCCIPOracle(t), + commitGreen: mocktypes.NewCCIPOracle(t), + execBlue: mocktypes.NewCCIPOracle(t), + execGreen: mocktypes.NewCCIPOracle(t), + expect: func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) { + commitBlue.On("Shutdown").Return(nil).Once() + commitGreen.On("Shutdown").Return(nil).Once() + execBlue.On("Shutdown").Return(nil).Once() + execGreen.On("Shutdown").Return(nil).Once() + }, + asserts: func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) { + commitBlue.AssertExpectations(t) + commitGreen.AssertExpectations(t) + execBlue.AssertExpectations(t) + execGreen.AssertExpectations(t) + }, + wantErr: false, + }, + { + name: "error on commit blue", + commitBlue: mocktypes.NewCCIPOracle(t), + commitGreen: nil, + execBlue: mocktypes.NewCCIPOracle(t), + execGreen: nil, + expect: func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) { + commitBlue.On("Shutdown").Return(errors.New("failed")).Once() + execBlue.On("Shutdown").Return(nil).Once() + }, + asserts: func(t *testing.T, commitBlue, commitGreen, execBlue, execGreen *mocktypes.CCIPOracle) { + commitBlue.AssertExpectations(t) + execBlue.AssertExpectations(t) + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &ccipDeployment{ + commit: blueGreenDeployment{ + blue: tt.commitBlue, + }, + exec: blueGreenDeployment{ + blue: tt.execBlue, + }, + } + if tt.commitGreen != nil { + c.commit.green = tt.commitGreen + } + if tt.execGreen != nil { + c.exec.green = tt.execGreen + } + tt.expect(t, tt.commitBlue, tt.commitGreen, tt.execBlue, tt.execGreen) + defer tt.asserts(t, tt.commitBlue, tt.commitGreen, tt.execBlue, tt.execGreen) + err := c.Shutdown() + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func Test_commitExecDeployment_NumCommitInstances(t *testing.T) { + type fields struct { + commit blueGreenDeployment + exec blueGreenDeployment + } + tests := []struct { + name string + fields fields + want int + }{ + { + name: "only commit blue is present", + fields: fields{ + commit: blueGreenDeployment{ + blue: mocktypes.NewCCIPOracle(t), + }, + }, + want: 1, + }, + { + name: "both commit blue and green are present", + fields: fields{ + commit: blueGreenDeployment{ + blue: mocktypes.NewCCIPOracle(t), + green: mocktypes.NewCCIPOracle(t), + }, + }, + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &ccipDeployment{ + commit: tt.fields.commit, + exec: tt.fields.exec, + } + if got := c.NumCommitInstances(); got != tt.want { + t.Errorf("commitExecDeployment.NumCommitInstances() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_commitExecDeployment_NumExecInstances(t *testing.T) { + type fields struct { + commit blueGreenDeployment + exec blueGreenDeployment + } + tests := []struct { + name string + fields fields + want int + }{ + { + name: "only exec blue is present", + fields: fields{ + exec: blueGreenDeployment{ + blue: mocktypes.NewCCIPOracle(t), + }, + }, + want: 1, + }, + { + name: "both exec blue and green are present", + fields: fields{ + exec: blueGreenDeployment{ + blue: mocktypes.NewCCIPOracle(t), + green: mocktypes.NewCCIPOracle(t), + }, + }, + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &ccipDeployment{ + commit: tt.fields.commit, + exec: tt.fields.exec, + } + if got := c.NumExecInstances(); got != tt.want { + t.Errorf("commitExecDeployment.NumExecInstances() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/core/services/ccipcapability/launcher/diff.go b/core/services/ccipcapability/launcher/diff.go new file mode 100644 index 00000000000..4dc509147c0 --- /dev/null +++ b/core/services/ccipcapability/launcher/diff.go @@ -0,0 +1,154 @@ +package launcher + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry" + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" +) + +type diffResult struct { + added map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + removed map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + updated map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo +} + +func diff( + capabilityVersion, + capabilityLabelledName string, + oldState, + newState cctypes.RegistryState, +) (diffResult, error) { + ccipCapability, err := checkCapabilityPresence(capabilityVersion, capabilityLabelledName, newState) + if err != nil { + return diffResult{}, err + } + + newCCIPDONs, err := filterCCIPDONs(ccipCapability, newState) + if err != nil { + return diffResult{}, err + } + + currCCIPDONs, err := filterCCIPDONs(ccipCapability, oldState) + if err != nil { + return diffResult{}, err + } + + // compare curr with new and launch or update OCR instances as needed + added, removed, updated, err := compareDONs(currCCIPDONs, newCCIPDONs) + if err != nil { + return diffResult{}, err + } + + return diffResult{ + added: added, + removed: removed, + updated: updated, + }, nil +} + +func compareDONs( + currCCIPDONs, + newCCIPDONs map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo, +) ( + added, removed, updated map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo, + err error, +) { + added = make(map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo) + removed = make(map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo) + updated = make(map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo) + + for id, don := range newCCIPDONs { + if currDONState, ok := currCCIPDONs[id]; !ok { + // Not in current state, so mark as added. + added[id] = don + } else { + // If its in the current state and the config count for the DON has changed, mark as updated. + // Since the registry returns the full state we need to compare the config count. + if don.ConfigCount > currDONState.ConfigCount { + updated[id] = don + } + } + } + + for id, don := range currCCIPDONs { + if _, ok := newCCIPDONs[id]; !ok { + // In current state but not in latest registry state, so should remove. + removed[id] = don + } + } + + return added, removed, updated, nil +} + +func filterCCIPDONs( + ccipCapability keystone_capability_registry.CapabilityRegistryCapability, + state cctypes.RegistryState, +) (map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo, error) { + ccipDONs := make(map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo) + for _, don := range state.DONs { + // CCIP DONs should only have one capability, CCIP. + var found bool + for _, donCapabilities := range don.CapabilityConfigurations { + if donCapabilities.CapabilityId == hashedCapabilityId(ccipCapability.Version, ccipCapability.LabelledName) { + ccipDONs[don.Id] = don + found = true + } + } + if found && len(don.CapabilityConfigurations) > 1 { + return nil, fmt.Errorf("found more than one capability (actual: %d) in the CCIP DON %d", + len(don.CapabilityConfigurations), don.Id) + } + } + + return ccipDONs, nil +} + +func checkCapabilityPresence( + capabilityVersion, + capabilityLabelledName string, + state cctypes.RegistryState, +) (keystone_capability_registry.CapabilityRegistryCapability, error) { + // Sanity check to make sure the capability registry has the capability we are looking for. + var ccipCapability keystone_capability_registry.CapabilityRegistryCapability + for _, capability := range state.Capabilities { + if string(capability.LabelledName[:]) == capabilityLabelledName && + string(capability.Version[:]) == capabilityVersion { + ccipCapability = capability + break + } + } + + if ccipCapability.LabelledName == "" { + return keystone_capability_registry.CapabilityRegistryCapability{}, + fmt.Errorf("unable to find capability with name %s and version %s in capability registry state", + capabilityLabelledName, capabilityVersion) + } + + return ccipCapability, nil +} + +func hashedCapabilityId(capabilityVersion, capabilityLabelledName string) (r [32]byte) { + capVersionBytes := []byte(capabilityVersion) + capLabelledNameBytes := []byte(capabilityLabelledName) + var capVersionBytes32, capLabelledNameBytes32 [32]byte + copy(capVersionBytes32[:], capVersionBytes) + copy(capLabelledNameBytes32[:], capLabelledNameBytes) + h := crypto.Keccak256(capVersionBytes32[:], capLabelledNameBytes32[:]) + copy(r[:], h) + return r +} + +// isMemberOfDON returns true if and only if the given p2pID is a member of the given DON. +func isMemberOfDON(don keystone_capability_registry.CapabilityRegistryDONInfo, p2pID p2pkey.KeyV2) bool { + var found bool + for _, node := range don.NodeP2PIds { + if node == p2pID.PeerID() { + found = true + break + } + } + return found +} diff --git a/core/services/ccipcapability/launcher/diff_test.go b/core/services/ccipcapability/launcher/diff_test.go new file mode 100644 index 00000000000..6684a9d5c13 --- /dev/null +++ b/core/services/ccipcapability/launcher/diff_test.go @@ -0,0 +1,171 @@ +package launcher + +import ( + "reflect" + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry" + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" +) + +func Test_diff(t *testing.T) { + type args struct { + capabilityVersion string + capabilityLabelledName string + oldState cctypes.RegistryState + newState cctypes.RegistryState + } + tests := []struct { + name string + args args + want diffResult + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := diff(tt.args.capabilityVersion, tt.args.capabilityLabelledName, tt.args.oldState, tt.args.newState) + if (err != nil) != tt.wantErr { + t.Errorf("diff() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("diff() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_compareDONs(t *testing.T) { + type args struct { + currCCIPDONs map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + newCCIPDONs map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + } + tests := []struct { + name string + args args + wantAdded map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + wantRemoved map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + wantUpdated map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotAdded, gotRemoved, gotUpdated, err := compareDONs(tt.args.currCCIPDONs, tt.args.newCCIPDONs) + if (err != nil) != tt.wantErr { + t.Errorf("compareDONs() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotAdded, tt.wantAdded) { + t.Errorf("compareDONs() gotAdded = %v, want %v", gotAdded, tt.wantAdded) + } + if !reflect.DeepEqual(gotRemoved, tt.wantRemoved) { + t.Errorf("compareDONs() gotRemoved = %v, want %v", gotRemoved, tt.wantRemoved) + } + if !reflect.DeepEqual(gotUpdated, tt.wantUpdated) { + t.Errorf("compareDONs() gotUpdated = %v, want %v", gotUpdated, tt.wantUpdated) + } + }) + } +} + +func Test_filterCCIPDONs(t *testing.T) { + type args struct { + ccipCapability keystone_capability_registry.CapabilityRegistryCapability + state cctypes.RegistryState + } + tests := []struct { + name string + args args + want map[uint32]keystone_capability_registry.CapabilityRegistryDONInfo + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := filterCCIPDONs(tt.args.ccipCapability, tt.args.state) + if (err != nil) != tt.wantErr { + t.Errorf("filterCCIPDONs() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("filterCCIPDONs() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_checkCapabilityPresence(t *testing.T) { + type args struct { + capabilityVersion string + capabilityLabelledName string + state cctypes.RegistryState + } + tests := []struct { + name string + args args + want keystone_capability_registry.CapabilityRegistryCapability + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := checkCapabilityPresence(tt.args.capabilityVersion, tt.args.capabilityLabelledName, tt.args.state) + if (err != nil) != tt.wantErr { + t.Errorf("checkCapabilityPresence() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("checkCapabilityPresence() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_hashedCapabilityId(t *testing.T) { + type args struct { + capabilityVersion string + capabilityLabelledName string + } + tests := []struct { + name string + args args + wantR [32]byte + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotR := hashedCapabilityId(tt.args.capabilityVersion, tt.args.capabilityLabelledName); !reflect.DeepEqual(gotR, tt.wantR) { + t.Errorf("hashedCapabilityId() = %v, want %v", gotR, tt.wantR) + } + }) + } +} + +func Test_isMemberOfDON(t *testing.T) { + type args struct { + don keystone_capability_registry.CapabilityRegistryDONInfo + p2pID p2pkey.KeyV2 + } + tests := []struct { + name string + args args + want bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isMemberOfDON(tt.args.don, tt.args.p2pID); got != tt.want { + t.Errorf("isMemberOfDON() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/core/services/ccipcapability/launcher/launcher.go b/core/services/ccipcapability/launcher/launcher.go new file mode 100644 index 00000000000..6abc0f9acdd --- /dev/null +++ b/core/services/ccipcapability/launcher/launcher.go @@ -0,0 +1,340 @@ +package launcher + +import ( + "context" + "fmt" + "time" + + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry" + "github.com/smartcontractkit/chainlink/v2/core/logger" + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" +) + +var ( + _ job.ServiceCtx = (*launcher)(nil) +) + +const ( + tickInterval = 10 * time.Second +) + +func New( + capabilityVersion, + capabilityLabelledName string, + p2pID p2pkey.KeyV2, + capRegistry cctypes.CapabilityRegistry, + lggr logger.Logger, + homeChainReader cctypes.HomeChainReader, + oracleCreator cctypes.OracleCreator, +) job.ServiceCtx { + return &launcher{ + capabilityVersion: capabilityVersion, + capabilityLabelledName: capabilityLabelledName, + p2pID: p2pID, + capRegistry: capRegistry, + lggr: lggr, + homeChainReader: homeChainReader, + regState: cctypes.RegistryState{}, + oracleCreator: oracleCreator, + dons: make(map[uint32]*ccipDeployment), + } +} + +// launcher manages the lifecycles of the CCIP capability on all chains. +type launcher struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + + // dons is a map of CCIP DON IDs to the OCR instances that are running on them. + // we can have up to two OCR instances per CCIP plugin, since we are running two plugins, + // thats four OCR instances per CCIP DON maximum. + dons map[uint32]*ccipDeployment +} + +// Close implements job.ServiceCtx. +func (l *launcher) Close() error { + // shut down the monitor goroutine. + close(l.stopChan) + + // shut down all running oracles. + var err error + for _, ceDep := range l.dons { + err = multierr.Append(err, ceDep.Shutdown()) + } + + return err +} + +// Start implements job.ServiceCtx. +func (l *launcher) Start(context.Context) error { + l.stopChan = make(chan struct{}) + go l.monitor() + return nil +} + +func (l *launcher) monitor() { + ticker := time.NewTicker(tickInterval) + for { + select { + case <-l.stopChan: + return + case <-ticker.C: + if err := l.tick(); err != nil { + l.lggr.Errorw("Failed to tick", "err", err) + } + } + } +} + +func (l *launcher) tick() error { + // Ensure that the home chain reader is healthy. + // For new jobs it may be possible that the home chain reader is not yet ready + // so we won't be able to fetch configs and start any OCR instances. + if !l.homeChainReader.IsHealthy() { + return fmt.Errorf("home chain reader is unhealthy") + } + + // Fetch the latest state from the capability registry and determine if we need to + // launch or update any OCR instances. + latestState, err := l.capRegistry.LatestState() + if err != nil { + return fmt.Errorf("failed to fetch latest state from capability registry: %w", err) + } + + diffResult, err := diff(l.capabilityVersion, l.capabilityLabelledName, l.regState, latestState) + if err != nil { + return fmt.Errorf("failed to diff capability registry states: %w", err) + } + + err = l.processDiff(diffResult) + if err != nil { + return fmt.Errorf("failed to process diff: %w", err) + } + + // if diff is correctly processed, update latest state. + l.regState = latestState + return nil +} + +// processDiff processes the diff between the current and latest capability registry states. +// for any added OCR instances, it will launch them. +// for any removed OCR instances, it will shut them down. +// for any updated OCR instances, it will restart them with the new configuration. +func (l *launcher) processDiff(diff diffResult) error { + for id := range diff.removed { + if err := l.removeDON(id); err != nil { + return err + } + } + + for _, don := range diff.added { + if err := l.addDON(don); err != nil { + return err + } + } + + for _, don := range diff.updated { + if err := l.updateDON(don); err != nil { + return err + } + } + + return nil +} + +func (l *launcher) removeDON(id uint32) error { + ceDep, ok := l.dons[id] + if !ok { + // not running this particular DON. + return nil + } + + if err := ceDep.Shutdown(); err != nil { + return fmt.Errorf("failed to shutdown oracles for CCIP DON %d: %w", id, err) + } + + // after a successful shutdown we can safely remove the DON deployment from the map. + delete(l.dons, id) + return nil +} + +// updateDON handles the case where a DON in the capability registry has received a new configuration. +// In the case of CCIP, which follows blue-green deployment, we either: +// 1. Create a new oracle (the green instance) and start it. +// 2. Shut down the blue instance, making the green instance the new blue instance. +func (l *launcher) updateDON(don keystone_capability_registry.CapabilityRegistryDONInfo) error { + if !isMemberOfDON(don, l.p2pID) { + l.lggr.Infow("Not a member of this DON, skipping", "donId", don.Id, "p2pId", l.p2pID.ID()) + return nil + } + + ceDep, ok := l.dons[don.Id] + if !ok { + // This should never happen. + return fmt.Errorf("no deployment found for CCIP DON %d", don.Id) + } + + // this should be a retryable error. + commitOCRConfigs, err := l.homeChainReader.GetOCRConfigs(context.Background(), don.Id, cctypes.PluginTypeCCIPCommit) + if err != nil { + return fmt.Errorf("failed to fetch OCR configs for CCIP commit plugin (don id: %d) from home chain config contract: %w", + don.Id, err) + } + + execOCRConfigs, err := l.homeChainReader.GetOCRConfigs(context.Background(), don.Id, cctypes.PluginTypeCCIPExec) + if err != nil { + return fmt.Errorf("failed to fetch OCR configs for CCIP exec plugin (don id: %d) from home chain config contract: %w", + don.Id, err) + } + + // valid cases: + // a) len(commitOCRConfigs) == 2 && ceDep.NumCommitInstances() == 1: this is a new green instance. + // b) len(commitOCRConfigs) == 1 && ceDep.NumCommitInstances() == 2: this is a promotion of green->blue. + // invalid cases (enforced in the config contract): + // a) len(commitOCRConfigs) == 2 && ceDep.NumCommitInstances() == 2: this is an invariant violation. + // b) len(commitOCRConfigs) == 1 && ceDep.NumCommitInstances() == 1: this is an invariant violation. + // same thing applies to exec. + if len(commitOCRConfigs) == 2 && ceDep.NumCommitInstances() == 1 { + // this is a new green instance. + greenOracle, err := l.oracleCreator.CreateCommitOracle(commitOCRConfigs[1]) + if err != nil { + return fmt.Errorf("failed to create CCIP commit oracle: %w", err) + } + + if err := greenOracle.Start(); err != nil { + return fmt.Errorf("failed to start green commit oracle: %w", err) + } + ceDep.commit.green = greenOracle + l.lggr.Infow("Started green commit oracle", + "donId", don.Id, "ocrConfig", commitOCRConfigs[1].String()) + } else if len(commitOCRConfigs) == 1 && ceDep.NumCommitInstances() == 2 { + // this is a promotion of green->blue. + // swap the green oracle with the blue oracle in the ceDep struct. + oldBlue := ceDep.commit.blue + ceDep.commit.blue = ceDep.commit.green + ceDep.commit.green = nil + + // shut down blue oracle. + if err := oldBlue.Shutdown(); err != nil { + // we can't really roll back here, so we just log the error. + l.lggr.Errorw("Failed to shutdown blue oracle", "err", err) + } + } else { + return fmt.Errorf("invariant violation: expected 1 or 2 OCR configs for CCIP commit plugin (don id: %d), got %d", don.Id, len(commitOCRConfigs)) + } + + if len(execOCRConfigs) == 2 && ceDep.NumExecInstances() == 1 { + // this is a new green instance. + greenOracle, err := l.oracleCreator.CreateExecOracle(execOCRConfigs[1]) + if err != nil { + return fmt.Errorf("failed to create CCIP exec oracle: %w", err) + } + + if err := greenOracle.Start(); err != nil { + return fmt.Errorf("failed to start green exec oracle: %w", err) + } + ceDep.exec.green = greenOracle + l.lggr.Infow("Started green exec oracle", + "donId", don.Id, "ocrConfig", execOCRConfigs[1].String()) + } else if len(execOCRConfigs) == 1 && ceDep.NumExecInstances() == 2 { + // this is a promotion of green->blue. + // swap the green oracle with the blue oracle in the ceDep struct. + oldBlue := ceDep.exec.blue + ceDep.exec.blue = ceDep.exec.green + ceDep.exec.green = nil + + // shut down blue oracle. + if err := oldBlue.Shutdown(); err != nil { + // we can't really roll back here, so we just log the error. + l.lggr.Errorw("Failed to shutdown blue oracle", "err", err) + } + } else { + return fmt.Errorf("invariant violation: expected 1 or 2 OCR configs for CCIP exec plugin (don id: %d), got %d", don.Id, len(execOCRConfigs)) + } + + return nil +} + +func (l *launcher) addDON(don keystone_capability_registry.CapabilityRegistryDONInfo) error { + if !isMemberOfDON(don, l.p2pID) { + l.lggr.Infow("Not a member of this DON, skipping", "donId", don.Id, "p2pId", l.p2pID.ID()) + return nil + } + + // this should be a retryable error. + commitOCRConfigs, err := l.homeChainReader.GetOCRConfigs(context.Background(), don.Id, cctypes.PluginTypeCCIPCommit) + if err != nil { + return fmt.Errorf("failed to fetch OCR configs for CCIP commit plugin (don id: %d) from home chain config contract: %w", + don.Id, err) + } + + execOCRConfigs, err := l.homeChainReader.GetOCRConfigs(context.Background(), don.Id, cctypes.PluginTypeCCIPExec) + if err != nil { + return fmt.Errorf("failed to fetch OCR configs for CCIP exec plugin (don id: %d) from home chain config contract: %w", + don.Id, err) + } + + // upon creation we should only have one OCR config per plugin type. + if len(commitOCRConfigs) != 1 { + return fmt.Errorf("expected exactly one OCR config for CCIP commit plugin (don id: %d), got %d", don.Id, len(commitOCRConfigs)) + } + + if len(execOCRConfigs) != 1 { + return fmt.Errorf("expected exactly one OCR config for CCIP exec plugin (don id: %d), got %d", don.Id, len(execOCRConfigs)) + } + + commitOracle, err := l.oracleCreator.CreateCommitOracle(commitOCRConfigs[0]) + if err != nil { + return fmt.Errorf("failed to create CCIP commit oracle: %w", err) + } + + execOracle, err := l.oracleCreator.CreateExecOracle(execOCRConfigs[0]) + if err != nil { + return fmt.Errorf("failed to create CCIP exec oracle: %w", err) + } + + var errGroup errgroup.Group + errGroup.Go(func() error { + return commitOracle.Start() + }) + errGroup.Go(func() error { + return execOracle.Start() + }) + err = errGroup.Wait() + if err != nil { + // shut down the oracles if we failed to start them. + errShutdown := commitOracle.Shutdown() + if errShutdown != nil { + l.lggr.Errorw("Failed to shutdown commit oracle", "err", err) + } + errShutdown = execOracle.Shutdown() + if errShutdown != nil { + l.lggr.Errorw("Failed to shutdown exec oracle", "err", err) + } + return fmt.Errorf("failed to start oracles for CCIP DON %d: %w, err shutdown (could be nil): %w", don.Id, err, errShutdown) + } + + // update the dons map with the new deployment. + l.dons[don.Id] = &ccipDeployment{ + commit: blueGreenDeployment{ + blue: commitOracle, + }, + exec: blueGreenDeployment{ + blue: execOracle, + }, + } + + return nil +} diff --git a/core/services/ccipcapability/launcher/launcher_test.go b/core/services/ccipcapability/launcher/launcher_test.go new file mode 100644 index 00000000000..c7a71ed2eb6 --- /dev/null +++ b/core/services/ccipcapability/launcher/launcher_test.go @@ -0,0 +1,272 @@ +package launcher + +import ( + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry" + "github.com/smartcontractkit/chainlink/v2/core/logger" + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" +) + +func Test_launcher_Close(t *testing.T) { + type fields struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + dons map[uint32]*ccipDeployment + } + tests := []struct { + name string + fields fields + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &launcher{ + capabilityVersion: tt.fields.capabilityVersion, + capabilityLabelledName: tt.fields.capabilityLabelledName, + p2pID: tt.fields.p2pID, + capRegistry: tt.fields.capRegistry, + lggr: tt.fields.lggr, + homeChainReader: tt.fields.homeChainReader, + stopChan: tt.fields.stopChan, + regState: tt.fields.regState, + oracleCreator: tt.fields.oracleCreator, + dons: tt.fields.dons, + } + if err := l.Close(); (err != nil) != tt.wantErr { + t.Errorf("launcher.Close() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_launcher_tick(t *testing.T) { + type fields struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + dons map[uint32]*ccipDeployment + } + tests := []struct { + name string + fields fields + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &launcher{ + capabilityVersion: tt.fields.capabilityVersion, + capabilityLabelledName: tt.fields.capabilityLabelledName, + p2pID: tt.fields.p2pID, + capRegistry: tt.fields.capRegistry, + lggr: tt.fields.lggr, + homeChainReader: tt.fields.homeChainReader, + stopChan: tt.fields.stopChan, + regState: tt.fields.regState, + oracleCreator: tt.fields.oracleCreator, + dons: tt.fields.dons, + } + if err := l.tick(); (err != nil) != tt.wantErr { + t.Errorf("launcher.tick() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_launcher_processDiff(t *testing.T) { + type fields struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + dons map[uint32]*ccipDeployment + } + type args struct { + diff diffResult + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &launcher{ + capabilityVersion: tt.fields.capabilityVersion, + capabilityLabelledName: tt.fields.capabilityLabelledName, + p2pID: tt.fields.p2pID, + capRegistry: tt.fields.capRegistry, + lggr: tt.fields.lggr, + homeChainReader: tt.fields.homeChainReader, + stopChan: tt.fields.stopChan, + regState: tt.fields.regState, + oracleCreator: tt.fields.oracleCreator, + dons: tt.fields.dons, + } + if err := l.processDiff(tt.args.diff); (err != nil) != tt.wantErr { + t.Errorf("launcher.processDiff() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_launcher_removeDON(t *testing.T) { + type fields struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + dons map[uint32]*ccipDeployment + } + type args struct { + id uint32 + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &launcher{ + capabilityVersion: tt.fields.capabilityVersion, + capabilityLabelledName: tt.fields.capabilityLabelledName, + p2pID: tt.fields.p2pID, + capRegistry: tt.fields.capRegistry, + lggr: tt.fields.lggr, + homeChainReader: tt.fields.homeChainReader, + stopChan: tt.fields.stopChan, + regState: tt.fields.regState, + oracleCreator: tt.fields.oracleCreator, + dons: tt.fields.dons, + } + if err := l.removeDON(tt.args.id); (err != nil) != tt.wantErr { + t.Errorf("launcher.removeDON() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_launcher_updateDON(t *testing.T) { + type fields struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + dons map[uint32]*ccipDeployment + } + type args struct { + don keystone_capability_registry.CapabilityRegistryDONInfo + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &launcher{ + capabilityVersion: tt.fields.capabilityVersion, + capabilityLabelledName: tt.fields.capabilityLabelledName, + p2pID: tt.fields.p2pID, + capRegistry: tt.fields.capRegistry, + lggr: tt.fields.lggr, + homeChainReader: tt.fields.homeChainReader, + stopChan: tt.fields.stopChan, + regState: tt.fields.regState, + oracleCreator: tt.fields.oracleCreator, + dons: tt.fields.dons, + } + if err := l.updateDON(tt.args.don); (err != nil) != tt.wantErr { + t.Errorf("launcher.updateDON() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_launcher_addDON(t *testing.T) { + type fields struct { + capabilityVersion string + capabilityLabelledName string + p2pID p2pkey.KeyV2 + capRegistry cctypes.CapabilityRegistry + lggr logger.Logger + homeChainReader cctypes.HomeChainReader + stopChan chan struct{} + regState cctypes.RegistryState + oracleCreator cctypes.OracleCreator + dons map[uint32]*ccipDeployment + } + type args struct { + don keystone_capability_registry.CapabilityRegistryDONInfo + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := &launcher{ + capabilityVersion: tt.fields.capabilityVersion, + capabilityLabelledName: tt.fields.capabilityLabelledName, + p2pID: tt.fields.p2pID, + capRegistry: tt.fields.capRegistry, + lggr: tt.fields.lggr, + homeChainReader: tt.fields.homeChainReader, + stopChan: tt.fields.stopChan, + regState: tt.fields.regState, + oracleCreator: tt.fields.oracleCreator, + dons: tt.fields.dons, + } + if err := l.addDON(tt.args.don); (err != nil) != tt.wantErr { + t.Errorf("launcher.addDON() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/core/services/ccipcapability/oraclecreator/oraclecreator.go b/core/services/ccipcapability/oraclecreator/oraclecreator.go new file mode 100644 index 00000000000..e02a76b18c5 --- /dev/null +++ b/core/services/ccipcapability/oraclecreator/oraclecreator.go @@ -0,0 +1,63 @@ +package oraclecreator + +import ( + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + cctypes "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" +) + +type oracleCreator struct { + ocrKeyBundles map[chaintype.ChainType]ocr2key.KeyBundle + transmitters map[types.RelayID]string + relayers map[types.RelayID]loop.Relayer + peerWrapper *ocrcommon.SingletonPeerWrapper + externalJobID uuid.UUID + jobID int32 + isNewlyCreatedJob bool + relayConfigs map[string]job.JSONConfig + pluginConfig job.JSONConfig + db ocr3types.Database +} + +func New( + ocrKeyBundles map[chaintype.ChainType]ocr2key.KeyBundle, + transmitters map[types.RelayID]string, + relayers map[types.RelayID]loop.Relayer, + peerWrapper *ocrcommon.SingletonPeerWrapper, + externalJobID uuid.UUID, + jobID int32, + isNewlyCreatedJob bool, + relayConfigs map[string]job.JSONConfig, + pluginConfig job.JSONConfig, + db ocr3types.Database, +) cctypes.OracleCreator { + return &oracleCreator{ + ocrKeyBundles: ocrKeyBundles, + transmitters: transmitters, + relayers: relayers, + peerWrapper: peerWrapper, + externalJobID: externalJobID, + jobID: jobID, + isNewlyCreatedJob: isNewlyCreatedJob, + relayConfigs: relayConfigs, + pluginConfig: pluginConfig, + db: db, + } +} + +// CreateCommitOracle implements types.OracleCreator. +func (o *oracleCreator) CreateCommitOracle(config cctypes.OCRConfig) (cctypes.CCIPOracle, error) { + panic("unimplemented") +} + +// CreateExecOracle implements types.OracleCreator. +func (o *oracleCreator) CreateExecOracle(config cctypes.OCRConfig) (cctypes.CCIPOracle, error) { + panic("unimplemented") +} diff --git a/core/services/ccipcapability/types/mocks/ccip_oracle.go b/core/services/ccipcapability/types/mocks/ccip_oracle.go new file mode 100644 index 00000000000..08ef31854f4 --- /dev/null +++ b/core/services/ccipcapability/types/mocks/ccip_oracle.go @@ -0,0 +1,60 @@ +// Code generated by mockery v2.42.2. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// CCIPOracle is an autogenerated mock type for the CCIPOracle type +type CCIPOracle struct { + mock.Mock +} + +// Shutdown provides a mock function with given fields: +func (_m *CCIPOracle) Shutdown() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Shutdown") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: +func (_m *CCIPOracle) Start() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewCCIPOracle creates a new instance of CCIPOracle. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewCCIPOracle(t interface { + mock.TestingT + Cleanup(func()) +}) *CCIPOracle { + mock := &CCIPOracle{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ccipcapability/types/mocks/home_chain_reader.go b/core/services/ccipcapability/types/mocks/home_chain_reader.go new file mode 100644 index 00000000000..cd6fcc1e4d7 --- /dev/null +++ b/core/services/ccipcapability/types/mocks/home_chain_reader.go @@ -0,0 +1,107 @@ +// Code generated by mockery v2.42.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + mock "github.com/stretchr/testify/mock" +) + +// HomeChainReader is an autogenerated mock type for the HomeChainReader type +type HomeChainReader struct { + mock.Mock +} + +// GetAllChainConfigs provides a mock function with given fields: ctx +func (_m *HomeChainReader) GetAllChainConfigs(ctx context.Context) (map[uint64]types.ChainConfig, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetAllChainConfigs") + } + + var r0 map[uint64]types.ChainConfig + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[uint64]types.ChainConfig, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[uint64]types.ChainConfig); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[uint64]types.ChainConfig) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetOCRConfigs provides a mock function with given fields: ctx, donID, pluginType +func (_m *HomeChainReader) GetOCRConfigs(ctx context.Context, donID uint32, pluginType types.PluginType) ([]types.OCRConfig, error) { + ret := _m.Called(ctx, donID, pluginType) + + if len(ret) == 0 { + panic("no return value specified for GetOCRConfigs") + } + + var r0 []types.OCRConfig + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint32, types.PluginType) ([]types.OCRConfig, error)); ok { + return rf(ctx, donID, pluginType) + } + if rf, ok := ret.Get(0).(func(context.Context, uint32, types.PluginType) []types.OCRConfig); ok { + r0 = rf(ctx, donID, pluginType) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.OCRConfig) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint32, types.PluginType) error); ok { + r1 = rf(ctx, donID, pluginType) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// IsHealthy provides a mock function with given fields: +func (_m *HomeChainReader) IsHealthy() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IsHealthy") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// NewHomeChainReader creates a new instance of HomeChainReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHomeChainReader(t interface { + mock.TestingT + Cleanup(func()) +}) *HomeChainReader { + mock := &HomeChainReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ccipcapability/types/mocks/ocr_config.go b/core/services/ccipcapability/types/mocks/ocr_config.go new file mode 100644 index 00000000000..bf2b4bcde23 --- /dev/null +++ b/core/services/ccipcapability/types/mocks/ocr_config.go @@ -0,0 +1,195 @@ +// Code generated by mockery v2.42.2. DO NOT EDIT. + +package mocks + +import ( + types "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + mock "github.com/stretchr/testify/mock" +) + +// OCRConfig is an autogenerated mock type for the OCRConfig type +type OCRConfig struct { + mock.Mock +} + +// ChainSelector provides a mock function with given fields: +func (_m *OCRConfig) ChainSelector() uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ChainSelector") + } + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// F provides a mock function with given fields: +func (_m *OCRConfig) F() uint8 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for F") + } + + var r0 uint8 + if rf, ok := ret.Get(0).(func() uint8); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint8) + } + + return r0 +} + +// OffchainConfig provides a mock function with given fields: +func (_m *OCRConfig) OffchainConfig() []byte { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for OffchainConfig") + } + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// OffchainConfigVersion provides a mock function with given fields: +func (_m *OCRConfig) OffchainConfigVersion() uint64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for OffchainConfigVersion") + } + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// OfframpAddress provides a mock function with given fields: +func (_m *OCRConfig) OfframpAddress() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for OfframpAddress") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// PluginType provides a mock function with given fields: +func (_m *OCRConfig) PluginType() types.PluginType { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for PluginType") + } + + var r0 types.PluginType + if rf, ok := ret.Get(0).(func() types.PluginType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(types.PluginType) + } + + return r0 +} + +// Signers provides a mock function with given fields: +func (_m *OCRConfig) Signers() [][2][32]byte { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Signers") + } + + var r0 [][2][32]byte + if rf, ok := ret.Get(0).(func() [][2][32]byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([][2][32]byte) + } + } + + return r0 +} + +// String provides a mock function with given fields: +func (_m *OCRConfig) String() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for String") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Transmitters provides a mock function with given fields: +func (_m *OCRConfig) Transmitters() [][2][32]byte { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Transmitters") + } + + var r0 [][2][32]byte + if rf, ok := ret.Get(0).(func() [][2][32]byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([][2][32]byte) + } + } + + return r0 +} + +// NewOCRConfig creates a new instance of OCRConfig. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewOCRConfig(t interface { + mock.TestingT + Cleanup(func()) +}) *OCRConfig { + mock := &OCRConfig{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ccipcapability/types/mocks/oracle_creator.go b/core/services/ccipcapability/types/mocks/oracle_creator.go new file mode 100644 index 00000000000..fcab246b28c --- /dev/null +++ b/core/services/ccipcapability/types/mocks/oracle_creator.go @@ -0,0 +1,87 @@ +// Code generated by mockery v2.42.2. DO NOT EDIT. + +package mocks + +import ( + types "github.com/smartcontractkit/chainlink/v2/core/services/ccipcapability/types" + mock "github.com/stretchr/testify/mock" +) + +// OracleCreator is an autogenerated mock type for the OracleCreator type +type OracleCreator struct { + mock.Mock +} + +// CreateCommitOracle provides a mock function with given fields: config +func (_m *OracleCreator) CreateCommitOracle(config types.OCRConfig) (types.CCIPOracle, error) { + ret := _m.Called(config) + + if len(ret) == 0 { + panic("no return value specified for CreateCommitOracle") + } + + var r0 types.CCIPOracle + var r1 error + if rf, ok := ret.Get(0).(func(types.OCRConfig) (types.CCIPOracle, error)); ok { + return rf(config) + } + if rf, ok := ret.Get(0).(func(types.OCRConfig) types.CCIPOracle); ok { + r0 = rf(config) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.CCIPOracle) + } + } + + if rf, ok := ret.Get(1).(func(types.OCRConfig) error); ok { + r1 = rf(config) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateExecOracle provides a mock function with given fields: config +func (_m *OracleCreator) CreateExecOracle(config types.OCRConfig) (types.CCIPOracle, error) { + ret := _m.Called(config) + + if len(ret) == 0 { + panic("no return value specified for CreateExecOracle") + } + + var r0 types.CCIPOracle + var r1 error + if rf, ok := ret.Get(0).(func(types.OCRConfig) (types.CCIPOracle, error)); ok { + return rf(config) + } + if rf, ok := ret.Get(0).(func(types.OCRConfig) types.CCIPOracle); ok { + r0 = rf(config) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.CCIPOracle) + } + } + + if rf, ok := ret.Get(1).(func(types.OCRConfig) error); ok { + r1 = rf(config) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewOracleCreator creates a new instance of OracleCreator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewOracleCreator(t interface { + mock.TestingT + Cleanup(func()) +}) *OracleCreator { + mock := &OracleCreator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ccipcapability/types/types.go b/core/services/ccipcapability/types/types.go new file mode 100644 index 00000000000..15f530c28b0 --- /dev/null +++ b/core/services/ccipcapability/types/types.go @@ -0,0 +1,78 @@ +package types + +import ( + "context" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry" +) + +type RegistryState struct { + DONs []keystone_capability_registry.CapabilityRegistryDONInfo + Capabilities []keystone_capability_registry.CapabilityRegistryCapability + Nodes []keystone_capability_registry.CapabilityRegistryNodeInfo +} + +type CapabilityRegistry interface { + // LatestState returns the latest state of the on-chain capability registry. + LatestState() (RegistryState, error) +} + +type ChainConfig interface { + Readers() [][32]byte + FChain() uint8 +} + +type PluginType uint8 + +const ( + PluginTypeCCIPCommit PluginType = 0 + PluginTypeCCIPExec PluginType = 1 +) + +//go:generate mockery --name OCRConfig --output ./mocks/ --case underscore +type OCRConfig interface { + PluginType() PluginType + ChainSelector() uint64 + F() uint8 + OffchainConfigVersion() uint64 + OfframpAddress() string + Signers() [][2][32]byte + Transmitters() [][2][32]byte + OffchainConfig() []byte + String() string +} + +//go:generate mockery --name CCIPOracle --output ./mocks/ --case underscore +type CCIPOracle interface { + Shutdown() error + Start() error +} + +// OracleCreator is an interface for creating CCIP oracles. +// Whether the oracle uses a LOOPP or not is an implementation detail. +// +//go:generate mockery --name OracleCreator --output ./mocks/ --case underscore +type OracleCreator interface { + // CreateCommitOracle creates a new oracle that will run the CCIP commit plugin. + // The oracle must be returned unstarted. + CreateCommitOracle(config OCRConfig) (CCIPOracle, error) + + // CreateExecOracle creates a new oracle that will run the CCIP exec plugin. + // The oracle must be returned unstarted. + CreateExecOracle(config OCRConfig) (CCIPOracle, error) +} + +// HomeChainReader is an interface for reading CCIP chain and OCR configurations from the home chain. +// +//go:generate mockery --name HomeChainReader --output ./mocks/ --case underscore +type HomeChainReader interface { + // GetAllChainConfigs returns all chain configurations defined on the home chain. + // The key is the chain selector. + GetAllChainConfigs(ctx context.Context) (map[uint64]ChainConfig, error) + + // GetOCRConfigs returns all OCR configurations for a given DON ID and plugin type. + GetOCRConfigs(ctx context.Context, donID uint32, pluginType PluginType) ([]OCRConfig, error) + + // IsHealthy returns true if the home chain reader is healthy. + IsHealthy() bool +} diff --git a/core/services/ccipcapability/validate.go b/core/services/ccipcapability/validate.go new file mode 100644 index 00000000000..cadd5c95032 --- /dev/null +++ b/core/services/ccipcapability/validate.go @@ -0,0 +1,8 @@ +package ccipcapability + +import "github.com/smartcontractkit/chainlink/v2/core/services/job" + +func ValidatedCCIPSpec(tomlString string) (job.Job, error) { + // TODO: implement. + return job.Job{}, nil +} diff --git a/core/services/job/models.go b/core/services/job/models.go index 7b8e5f79174..c809ae40d75 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -50,6 +50,7 @@ const ( Webhook Type = (Type)(pipeline.WebhookJobType) Workflow Type = (Type)(pipeline.WorkflowJobType) StandardCapabilities Type = (Type)(pipeline.StandardCapabilitiesJobType) + CCIP Type = (Type)(pipeline.CCIPJobType) ) //revive:disable:redefines-builtin-id @@ -90,6 +91,7 @@ var ( Webhook: true, Workflow: false, StandardCapabilities: false, + CCIP: false, } supportsAsync = map[Type]bool{ BlockHeaderFeeder: false, @@ -109,6 +111,7 @@ var ( Webhook: true, Workflow: false, StandardCapabilities: false, + CCIP: false, } schemaVersions = map[Type]uint32{ BlockHeaderFeeder: 1, @@ -128,6 +131,7 @@ var ( Webhook: 1, Workflow: 1, StandardCapabilities: 1, + CCIP: 1, } ) @@ -174,7 +178,9 @@ type Job struct { StandardCapabilitiesSpecID *int32 StandardCapabilitiesSpec *StandardCapabilitiesSpec CCIPSpecID *int32 + CCIPSpec *CCIPSpec CCIPBootstrapSpecID *int32 + CCIPBootstrapSpec *CCIPBootstrapSpec JobSpecErrors []SpecError Type Type `toml:"type"` SchemaVersion uint32 `toml:"schemaVersion"` @@ -916,3 +922,76 @@ func (w *StandardCapabilitiesSpec) SetID(value string) error { w.ID = int32(ID) return nil } + +type CCIPSpec struct { + ID int32 + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + + // P2PV2Bootstrappers is a list of "peer_id@ip_address:port" strings that are used to + // identify the bootstrap nodes of the P2P network. + // These bootstrappers will be used to bootstrap all CCIP DONs. + P2PV2Bootstrappers pq.StringArray `toml:"p2pv2Bootstrappers" db:"p2pv2_bootstrappers"` + + // CapabilityVersion is the semantic version of the CCIP capability. + // This capability version must exist in the onchain capability registry. + CapabilityVersion string `toml:"capabilityVersion" db:"capability_version"` + + // CapabilityLabelledName is the labelled name of the CCIP capability. + // Corresponds to the labelled name of the capability in the onchain capability registry. + CapabilityLabelledName string `toml:"capabilityLabelledName" db:"capability_labelled_name"` + + // OCRKeyBundleIDs is a mapping from chain type to OCR key bundle ID. + // These are explicitly specified here so that we don't run into strange errors auto-detecting + // the valid bundle, since nops can create as many bundles as they want. + // This means that the job spec must be updated for every new chain family supported by CCIP. + // {"evm": "evm_key_bundle_id", "solana": "solana_key_bundle_id", ... } + OCRKeyBundleIDs JSONConfig `toml:"ocrKeyBundleIDs" db:"ocr_key_bundle_ids"` + + // P2PKeyID is the ID of the P2P key of the node. + // This must be present in the capability registry otherwise the job will not start correctly. + P2PKeyID string `toml:"p2pKeyID" db:"p2p_key_id"` + + // TransmitterIDs is a mapping from relay id to transmitter id. + // Safest thing to do is to whitelist them in the job spec. + // This means that the job spec must be updated for every new chain added to CCIP. + TransmitterIDs JSONConfig `toml:"transmitterIDs" db:"transmitter_ids"` + + // RelayConfig consists of relay specific configuration. + // Chain reader configurations are stored here, and are defined on a chain family basis, e.g + // we will have one chain reader config for EVM, one for solana, starknet, etc. + // Chain writer configurations are also stored here, and are also defined on a chain family basis, + // e.g we will have one chain writer config for EVM, one for solana, starknet, etc. + // See tests for examples of relay configs in TOML. + // { "evm": {"chainReader": {...}, "chainWriter": {...}}, "solana": {...}, ... } + // Relay config structure + // see core/services/relay/evm/types/types.go + RelayConfigs map[string]JSONConfig `toml:"relayConfigs" db:"relay_configs"` + + // PluginConfig contains plugin-specific config, like token price pipelines. + // The job spec will have to be updated once a new token needs its price posted. + // TODO: might get axed in favor of workflows' ccip price posting. + PluginConfig JSONConfig `toml:"pluginConfig"` +} + +// CCIPBootstrapSpec is the spec for the CCIP role don bootstrap job. +// This job launches a bootstrap node for each new CCIP role DON OCR instance. +type CCIPBootstrapSpec struct { + ID int32 + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + + // CapabilityVersion is the semantic version of the CCIP capability. + // This capability version must exist in the onchain capability registry. + // Bootstrap nodes will be launched based on DONs with this capability version. + CapabilityVersion string `toml:"capabilityVersion" db:"capability_version"` + + // CapabilityLabelledName is the labelled name of the CCIP capability. + // Corresponds to the labelled name of the capability in the onchain capability registry. + // Bootstrap nodes will be launched based on DONs with this capability labelled name. + CapabilityLabelledName string `toml:"capabilityLabelledName" db:"capability_labelled_name"` + + // RelayConfig defines the chain reader configurations for the bootstrap node. + // Note that it must be EVM, since home chain is EVM. + RelayConfig JSONConfig `toml:"relayConfig" db:"relay_config"` +} diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 4dc5478d112..2bffe3080f1 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -426,7 +426,15 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { return errors.Wrap(err, "failed to create StandardCapabilities for jobSpec") } jb.StandardCapabilitiesSpecID = &specID - + case CCIP: + sql := `INSERT INTO ccip_specs (capability_version, ocr_key_bundle_ids, transmitter_ids, relay_configs, plugin_config, created_at, updated_at) + VALUES (:capability_version, :capability_labelled_name, :ocr_key_bundle_ids, :transmitter_ids, :relay_configs, :plugin_config, NOW(), NOW()) + RETURNING id;` + specID, err := tx.prepareQuerySpecID(ctx, sql, jb.CCIPSpec) + if err != nil { + return errors.Wrap(err, "failed to create CCIPSpec for jobSpec") + } + jb.CCIPSpecID = &specID default: o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type) } @@ -639,19 +647,19 @@ func (o *orm) InsertJob(ctx context.Context, job *Job) error { // if job has id, emplace otherwise insert with a new id. if job.ID == 0 { query = `INSERT INTO jobs (name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id, - keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id, - legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, standard_capabilities_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at) + keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id, + legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, standard_capabilities_spec_id, ccip_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at) VALUES (:name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id, - :keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id, - :legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :standard_capabilities_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW()) + :keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id, + :legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :standard_capabilities_spec_id, :ccip_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW()) RETURNING *;` } else { query = `INSERT INTO jobs (id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id, - keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id, - legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, standard_capabilities_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at) + keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id, + legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, standard_capabilities_spec_id, ccip_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at) VALUES (:id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id, - :keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id, - :legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :standard_capabilities_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW()) + :keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id, + :legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :standard_capabilities_spec_id, :ccip_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW()) RETURNING *;` } query, args, err := tx.ds.BindNamed(query, job) @@ -695,7 +703,8 @@ func (o *orm) DeleteJob(ctx context.Context, id int32) error { block_header_feeder_spec_id, gateway_spec_id, workflow_spec_id, - standard_capabilities_spec_id + standard_capabilities_spec_id, + ccip_spec_id ), deleted_oracle_specs AS ( DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs) @@ -738,7 +747,10 @@ func (o *orm) DeleteJob(ctx context.Context, id int32) error { ), deleted_standardcapabilities_specs AS ( DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs) - ), + ), + deleted_ccip_specs AS ( + DELETE FROM ccip_specs WHERE id in (SELECT ccip_spec_id FROM deleted_jobs) + ), deleted_job_pipeline_specs AS ( DELETE FROM job_pipeline_specs WHERE job_id IN (SELECT id FROM deleted_jobs) RETURNING pipeline_spec_id ) @@ -812,7 +824,7 @@ func (o *orm) FindJobs(ctx context.Context, offset, limit int) (jobs []Job, coun return fmt.Errorf("failed to query jobs count: %w", err) } - sql = `SELECT jobs.*, job_pipeline_specs.pipeline_spec_id as pipeline_spec_id + sql = `SELECT jobs.*, job_pipeline_specs.pipeline_spec_id as pipeline_spec_id FROM jobs JOIN job_pipeline_specs ON (jobs.id = job_pipeline_specs.job_id) ORDER BY jobs.created_at DESC, jobs.id DESC OFFSET $1 LIMIT $2;` @@ -1026,8 +1038,8 @@ func (o *orm) findJob(ctx context.Context, jb *Job, col string, arg interface{}) } func (o *orm) FindJobIDsWithBridge(ctx context.Context, name string) (jids []int32, err error) { - query := `SELECT - jobs.id, pipeline_specs.dot_dag_source + query := `SELECT + jobs.id, pipeline_specs.dot_dag_source FROM jobs JOIN job_pipeline_specs ON job_pipeline_specs.job_id = jobs.id JOIN pipeline_specs ON pipeline_specs.id = job_pipeline_specs.pipeline_spec_id @@ -1074,7 +1086,7 @@ func (o *orm) FindJobIDsWithBridge(ctx context.Context, name string) (jids []int func (o *orm) FindJobIDByWorkflow(ctx context.Context, spec WorkflowSpec) (jobID int32, err error) { stmt := ` SELECT jobs.id FROM jobs -INNER JOIN workflow_specs ws on jobs.workflow_spec_id = ws.id AND ws.workflow_owner = $1 AND ws.workflow_name = $2 +INNER JOIN workflow_specs ws on jobs.workflow_spec_id = ws.id AND ws.workflow_owner = $1 AND ws.workflow_name = $2 ` err = o.ds.GetContext(ctx, &jobID, stmt, spec.WorkflowOwner, spec.WorkflowName) if err != nil { @@ -1387,6 +1399,7 @@ func (o *orm) loadAllJobTypes(ctx context.Context, job *Job) error { o.loadJobType(ctx, job, "GatewaySpec", "gateway_specs", job.GatewaySpecID), o.loadJobType(ctx, job, "WorkflowSpec", "workflow_specs", job.WorkflowSpecID), o.loadJobType(ctx, job, "StandardCapabilitiesSpec", "standardcapabilities_specs", job.StandardCapabilitiesSpecID), + o.loadJobType(ctx, job, "CCIPSpec", "ccip_specs", job.CCIPSpecID), ) } @@ -1424,7 +1437,7 @@ func (o *orm) loadJobPipelineSpec(ctx context.Context, job *Job, id *int32) erro ctx, pipelineSpecRow, `SELECT pipeline_specs.*, job_pipeline_specs.job_id as job_id - FROM pipeline_specs + FROM pipeline_specs JOIN job_pipeline_specs ON(pipeline_specs.id = job_pipeline_specs.pipeline_spec_id) WHERE job_pipeline_specs.job_id = $1 AND job_pipeline_specs.pipeline_spec_id = $2`, job.ID, *id, diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index 1e2c52dad66..92011ef1bf3 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -41,6 +41,7 @@ const ( WebhookJobType string = "webhook" WorkflowJobType string = "workflow" StandardCapabilitiesJobType string = "standardcapabilities" + CCIPJobType string = "ccip" ) //go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore