Skip to content

Commit

Permalink
[CAPPL] Fixes to registry syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Dec 9, 2024
1 parent 5e5a4cd commit db0c19e
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 114 deletions.
5 changes: 5 additions & 0 deletions core/capabilities/triggers/logevent/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func newLogEventTrigger(ctx context.Context,
return nil, nil, err
}

err = contractReader.Start(ctx)
if err != nil {
return nil, nil, err
}

// Get current block HEAD/tip of the blockchain to start polling from
latestHead, err := relayer.LatestHead(ctx)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,19 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}, eventHandler)

globalLogger.Debugw("Creating WorkflowRegistrySyncer")
wfSyncer := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, cfg.Capabilities().WorkflowRegistry().Address(),
wfSyncer := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
},
cfg.Capabilities().WorkflowRegistry().Address(),
syncer.WorkflowEventPollerConfig{
QueryCount: 100,
}, eventHandler, loader, workflowDonNotifier)
},
eventHandler,
loader,
workflowDonNotifier,
)

srvcs = append(srvcs, fetcher, wfSyncer)
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/capabilities/testutils/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,5 @@ func (th *EVMBackendTH) NewContractReader(ctx context.Context, t *testing.T, cfg
return nil, err
}

return svc, svc.Start(ctx)
return svc, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -18,13 +18,13 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

Expand Down Expand Up @@ -157,8 +157,6 @@ func Test_SecretsWorker(t *testing.T) {
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
contractName = syncer.WorkflowRegistryContractName
forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent)
)

defer giveTicker.Stop()
Expand All @@ -174,38 +172,6 @@ func Test_SecretsWorker(t *testing.T) {
backendTH.Backend.Commit()
require.NoError(t, err)

lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex())

// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
contractName: {
ContractPollingFilter: evmtypes.ContractPollingFilter{
GenericEventNames: []string{forceUpdateSecretsEvent},
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
forceUpdateSecretsEvent: {
ChainSpecificName: forceUpdateSecretsEvent,
ReadType: evmtypes.Event,
},
syncer.GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
require.NoError(t, err)

contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// Seed the DB
hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...))
require.NoError(t, err)
Expand All @@ -226,17 +192,23 @@ func Test_SecretsWorker(t *testing.T) {
handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{})

worker := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return contractReader, nil
}, wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
QueryCount: 20,
}, handler, &testWorkflowRegistryContractLoader{}, &testDonNotifier{
worker := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
}, syncer.WithTicker(giveTicker.C))
},
syncer.WithTicker(giveTicker.C),
)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
Expand All @@ -257,6 +229,80 @@ func Test_SecretsWorker(t *testing.T) {
}, 15*time.Second, time.Second)
}

func Test_RegistrySyncer_EventHandling(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
emitter = custmsg.NewLabeler()
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)

giveTicker = time.NewTicker(500 * time.Millisecond)
giveBinaryURL = "https://original-url.com"
donID = uint32(1)
giveWorkflow = RegisterWorkflowCMD{
Name: "test-wf",
DonID: donID,
Status: uint8(1),
BinaryURL: giveBinaryURL,
}
wantContents = "updated contents"
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
)

defer giveTicker.Stop()

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

from := [20]byte(backendTH.ContractsOwner.From)
id, err := workflows.GenerateWorkflowID(from[:], []byte(wantContents), []byte(""), "")
require.NoError(t, err)
giveWorkflow.ID = id

handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{})

worker := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
},
syncer.WithTicker(giveTicker.C),
)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)

servicetest.Run(t, worker)

// generate a log event
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

// Require the secrets contents to eventually be updated
require.Eventually(t, func() bool {
owner := strings.ToLower(backendTH.ContractsOwner.From.Hex()[2:])
_, err := orm.GetWorkflowSpec(ctx, owner, "test-wf")
return err == nil
}, 15*time.Second, time.Second)
}

func updateAuthorizedAddress(
t *testing.T,
th *testutils.EVMBackendTH,
Expand Down
58 changes: 29 additions & 29 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ type WorkflowRegistryForceUpdateSecretsRequestedV1 struct {
}

type WorkflowRegistryWorkflowRegisteredV1 struct {
WorkflowID [32]byte
Owner []byte
DonID uint32
Status uint8
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
WorkflowID [32]byte
WorkflowOwner []byte
DonID uint32
Status uint8
WorkflowName string
BinaryURL string
ConfigURL string
SecretsURL string
}

type WorkflowRegistryWorkflowUpdatedV1 struct {
Expand Down Expand Up @@ -276,7 +276,7 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner),
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowRegisteredEvent(ctx, payload); err != nil {
Expand Down Expand Up @@ -394,7 +394,7 @@ func (h *eventHandler) workflowRegisteredEvent(
}

// Calculate the hash of the binary and config files
hash, err := pkgworkflows.GenerateWorkflowID(payload.Owner, binary, config, payload.SecretsURL)
hash, err := pkgworkflows.GenerateWorkflowID(payload.WorkflowOwner, binary, config, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to generate workflow id: %w", err)
}
Expand All @@ -405,7 +405,7 @@ func (h *eventHandler) workflowRegisteredEvent(
}

// Save the workflow secrets
urlHash, err := h.orm.GetSecretsURLHash(payload.Owner, []byte(payload.SecretsURL))
urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL))
if err != nil {
return fmt.Errorf("failed to get secrets URL hash: %w", err)
}
Expand All @@ -422,7 +422,7 @@ func (h *eventHandler) workflowRegisteredEvent(
Config: string(config),
WorkflowID: wfID,
Status: status,
WorkflowOwner: hex.EncodeToString(payload.Owner),
WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner),
WorkflowName: payload.WorkflowName,
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
Expand All @@ -448,7 +448,7 @@ func (h *eventHandler) workflowRegisteredEvent(
Lggr: h.lggr,
Workflow: *sdkSpec,
WorkflowID: wfID,
WorkflowOwner: string(payload.Owner), // this gets hex encoded in the engine.
WorkflowOwner: string(payload.WorkflowOwner), // this gets hex encoded in the engine.
WorkflowName: payload.WorkflowName,
Registry: h.capRegistry,
Store: h.workflowStore,
Expand Down Expand Up @@ -483,14 +483,14 @@ func (h *eventHandler) workflowUpdatedEvent(
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.NewWorkflowID,
Owner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
SecretsURL: payload.SecretsURL,
WorkflowID: payload.NewWorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
SecretsURL: payload.SecretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
Expand Down Expand Up @@ -545,14 +545,14 @@ func (h *eventHandler) workflowActivatedEvent(

// start a new workflow engine
registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.WorkflowID,
Owner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: spec.BinaryURL,
ConfigURL: spec.ConfigURL,
SecretsURL: secretsURL,
WorkflowID: payload.WorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: spec.BinaryURL,
ConfigURL: spec.ConfigURL,
SecretsURL: secretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
Expand Down
Loading

0 comments on commit db0c19e

Please sign in to comment.