From 77870103ccf58d6983e2bf16cab5c53d619111f0 Mon Sep 17 00:00:00 2001 From: Cedric Cordenier Date: Fri, 6 Dec 2024 16:03:20 +0000 Subject: [PATCH] WIP --- core/chains/evm/logpoller/orm.go | 1 + core/services/chainlink/application.go | 15 +- .../workflows/syncer/workflow_syncer_test.go | 128 ++++++++++++------ core/services/workflows/syncer/handler.go | 20 ++- .../workflows/syncer/workflow_registry.go | 113 ++++++++++++++-- 5 files changed, 216 insertions(+), 61 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 4d7cf33ebec..b4032cbf9dd 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -1136,6 +1136,7 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim } var logs []Log + o.lggr.Debugw("FilteredLogs", "query", query) if err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...); err != nil { return nil, err } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 29473c4d932..d8b9777cb5a 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -312,12 +312,19 @@ func NewApplication(opts ApplicationOpts) (Application, error) { }, eventHandler) globalLogger.Debugw("Creating WorkflowRegistrySyncer") - wfSyncer := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return relayer.NewContractReader(ctx, bytes) - }, cfg.Capabilities().WorkflowRegistry().Address(), + wfSyncer := syncer.NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return relayer.NewContractReader(ctx, bytes) + }, + cfg.Capabilities().WorkflowRegistry().Address(), syncer.WorkflowEventPollerConfig{ QueryCount: 100, - }, eventHandler, loader, workflowDonNotifier) + }, + eventHandler, + loader, + workflowDonNotifier, + ) srvcs = append(srvcs, fetcher, wfSyncer) } diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 3bcf8164a7b..37b85c67a41 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "encoding/hex" - "encoding/json" "fmt" "testing" "time" @@ -24,7 +23,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" @@ -157,8 +155,6 @@ func Test_SecretsWorker(t *testing.T) { fetcherFn = func(_ context.Context, _ string) ([]byte, error) { return []byte(wantContents), nil } - contractName = syncer.WorkflowRegistryContractName - forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) ) defer giveTicker.Stop() @@ -176,36 +172,6 @@ func Test_SecretsWorker(t *testing.T) { lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex()) - // Build the ContractReader config - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - contractName: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{forceUpdateSecretsEvent}, - }, - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - forceUpdateSecretsEvent: { - ChainSpecificName: forceUpdateSecretsEvent, - ReadType: evmtypes.Event, - }, - syncer.GetWorkflowMetadataListByDONMethodName: { - ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName, - }, - }, - }, - }, - } - - contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) - require.NoError(t, err) - - contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) - require.NoError(t, err) - - err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}}) - require.NoError(t, err) - // Seed the DB hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...)) require.NoError(t, err) @@ -226,17 +192,23 @@ func Test_SecretsWorker(t *testing.T) { handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - worker := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { - return contractReader, nil - }, wfRegistryAddr.Hex(), - syncer.WorkflowEventPollerConfig{ - QueryCount: 20, - }, handler, &testWorkflowRegistryContractLoader{}, &testDonNotifier{ + worker := syncer.NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, + wfRegistryAddr.Hex(), + syncer.WorkflowEventPollerConfig{QueryCount: 20}, + handler, + &testWorkflowRegistryContractLoader{}, + &testDonNotifier{ don: capabilities.DON{ ID: donID, }, err: nil, - }, syncer.WithTicker(giveTicker.C)) + }, + syncer.WithTicker(giveTicker.C), + ) // setup contract state to allow the secrets to be updated updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) @@ -257,6 +229,80 @@ func Test_SecretsWorker(t *testing.T) { }, 15*time.Second, time.Second) } +func Test_RegistrySyncer_EventHandling(t *testing.T) { + var ( + ctx = coretestutils.Context(t) + lggr = logger.TestLogger(t) + emitter = custmsg.NewLabeler() + backendTH = testutils.NewEVMBackendTH(t) + db = pgtest.NewSqlxDB(t) + orm = syncer.NewWorkflowRegistryDS(db, lggr) + + giveTicker = time.NewTicker(500 * time.Millisecond) + giveBinaryURL = "https://original-url.com" + donID = uint32(1) + giveWorkflow = RegisterWorkflowCMD{ + Name: "test-wf", + DonID: donID, + Status: uint8(1), + BinaryURL: giveBinaryURL, + } + wantContents = "updated contents" + fetcherFn = func(_ context.Context, _ string) ([]byte, error) { + return []byte(wantContents), nil + } + ) + + defer giveTicker.Stop() + + id, err := hex.DecodeString("bc9b363332ecaebbf5e476bbcce29a6ee89856f64d345e967dbdaaf880c520cd") + require.NoError(t, err) + giveWorkflow.ID = [32]byte(id) + + // Deploy a test workflow_registry + wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) + backendTH.Backend.Commit() + require.NoError(t, err) + + lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex()) + + handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, + emitter, clockwork.NewFakeClock(), workflowkey.Key{}) + + worker := syncer.NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, + wfRegistryAddr.Hex(), + syncer.WorkflowEventPollerConfig{QueryCount: 20}, + handler, + &testWorkflowRegistryContractLoader{}, + &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, + syncer.WithTicker(giveTicker.C), + ) + + // setup contract state to allow the secrets to be updated + updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) + updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) + + servicetest.Run(t, worker) + + // generate a log event + registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) + + // Require the secrets contents to eventually be updated + require.Eventually(t, func() bool { + _, err := orm.GetWorkflowSpec(ctx, backendTH.ContractsOwner.From.Hex(), "test-wf") + return err == nil + }, 15*time.Second, time.Second) +} + func updateAuthorizedAddress( t *testing.T, th *testutils.EVMBackendTH, diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 46dcd21ed90..8871aef46aa 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -383,14 +383,22 @@ func (h *eventHandler) workflowRegisteredEvent( return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) } - config, err := h.fetcher(ctx, payload.ConfigURL) - if err != nil { - return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + var config []byte + if payload.ConfigURL != "" { + c, err := h.fetcher(ctx, payload.ConfigURL) + if err != nil { + return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + } + config = c } - secrets, err := h.fetcher(ctx, payload.SecretsURL) - if err != nil { - return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err) + var secrets []byte + if payload.SecretsURL != "" { + s, err := h.fetcher(ctx, payload.SecretsURL) + if err != nil { + return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err) + } + secrets = s } // Calculate the hash of the binary and config files diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 024975539af..bf84aa29a1f 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -85,6 +85,8 @@ type ContractReaderFactory interface { // ContractReader is a subset of types.ContractReader defined locally to enable mocking. type ContractReader interface { + Start(ctx context.Context) error + Close() error Bind(context.Context, []types.BoundContract) error QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (head *types.Head, err error) @@ -171,7 +173,14 @@ func NewWorkflowRegistry( workflowDonNotifier donNotifier, opts ...func(*workflowRegistry), ) *workflowRegistry { - ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} + ets := []WorkflowRegistryEventType{ + ForceUpdateSecretsEvent, + WorkflowActivatedEvent, + WorkflowDeletedEvent, + WorkflowPausedEvent, + WorkflowRegisteredEvent, + WorkflowUpdatedEvent, + } wr := &workflowRegistry{ lggr: lggr, newContractReaderFn: newContractReaderFn, @@ -265,14 +274,14 @@ func (w *workflowRegistry) handlerLoop(ctx context.Context) { } if resp.Err != nil || resp.Event == nil { - w.lggr.Errorf("failed to handle event: %+v", resp.Err) + w.lggr.Errorf("failed to handle event", "err", resp.Err) continue } event := resp.Event w.lggr.Debugf("handling event: %+v", event) if err := w.handler.Handle(ctx, *event); err != nil { - w.lggr.Errorf("failed to handle event: %+v", event) + w.lggr.Errorw("failed to handle event", "event", event, "err", err) continue } } @@ -372,6 +381,7 @@ func (w *workflowRegistry) orderAndSend( return case batch := <-batchCh: for _, response := range batch { + fmt.Printf("response: %+v", response) w.heap.Push(response) } batchCount-- @@ -438,12 +448,11 @@ func queryEvent( ) { // create query var ( - responseBatch []WorkflowRegistryEventResponse - logData values.Value - cursor = "" - limitAndSort = query.LimitAndSort{ + logData values.Value + cursor = "" + limitAndSort = query.LimitAndSort{ SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: cfg.QueryCount}, + Limit: query.Limit{Count: 100000}, } bc = types.BoundContract{ Name: cfg.ContractName, @@ -457,6 +466,8 @@ func queryEvent( case <-ctx.Done(): return case <-ticker: + responseBatch := []WorkflowRegistryEventResponse{} + if cursor != "" { limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) } @@ -468,12 +479,17 @@ func queryEvent( Key: string(et), Expressions: []query.Expression{ query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gt), + query.Block(lastReadBlockNumber, primitives.Gte), }, }, limitAndSort, &logData, ) + lcursor := cursor + if lcursor == "" { + lcursor = "empty" + } + lggr.Debugw("QueryKeys called", "logs", len(logs), "eventType", et, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) if err != nil { lggr.Errorw("QueryKey failure", "err", err) @@ -497,6 +513,7 @@ func queryEvent( responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr)) cursor = log.Cursor } + lggr.Debug("composed response batch", "responseBatch", responseBatch) batchCh <- responseBatch } } @@ -511,7 +528,14 @@ func getWorkflowRegistryEventReader( Contracts: map[string]evmtypes.ChainContractReader{ WorkflowRegistryContractName: { ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{string(ForceUpdateSecretsEvent)}, + GenericEventNames: []string{ + string(ForceUpdateSecretsEvent), + string(WorkflowActivatedEvent), + string(WorkflowDeletedEvent), + string(WorkflowPausedEvent), + string(WorkflowRegisteredEvent), + string(WorkflowUpdatedEvent), + }, }, ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, Configs: map[string]*evmtypes.ChainReaderDefinition{ @@ -519,6 +543,26 @@ func getWorkflowRegistryEventReader( ChainSpecificName: string(ForceUpdateSecretsEvent), ReadType: evmtypes.Event, }, + string(WorkflowActivatedEvent): { + ChainSpecificName: string(WorkflowActivatedEvent), + ReadType: evmtypes.Event, + }, + string(WorkflowDeletedEvent): { + ChainSpecificName: string(WorkflowDeletedEvent), + ReadType: evmtypes.Event, + }, + string(WorkflowPausedEvent): { + ChainSpecificName: string(WorkflowPausedEvent), + ReadType: evmtypes.Event, + }, + string(WorkflowRegisteredEvent): { + ChainSpecificName: string(WorkflowRegisteredEvent), + ReadType: evmtypes.Event, + }, + string(WorkflowUpdatedEvent): { + ChainSpecificName: string(WorkflowUpdatedEvent), + ReadType: evmtypes.Event, + }, }, }, }, @@ -539,6 +583,10 @@ func getWorkflowRegistryEventReader( return nil, err } + if err := reader.Start(ctx); err != nil { + return nil, err + } + return reader, nil } @@ -682,6 +730,51 @@ func toWorkflowRegistryEventResponse( return resp } resp.Event.Data = data + case WorkflowRegisteredEvent: + var data WorkflowRegistryWorkflowRegisteredV1 + if err := dataAsValuesMap.UnwrapTo(&data); err != nil { + lggr.Errorf("failed to unwrap data: %+v", log.Data) + resp.Event = nil + resp.Err = err + return resp + } + resp.Event.Data = data + case WorkflowUpdatedEvent: + var data WorkflowRegistryWorkflowUpdatedV1 + if err := dataAsValuesMap.UnwrapTo(&data); err != nil { + lggr.Errorf("failed to unwrap data: %+v", log.Data) + resp.Event = nil + resp.Err = err + return resp + } + resp.Event.Data = data + case WorkflowPausedEvent: + var data WorkflowRegistryWorkflowPausedV1 + if err := dataAsValuesMap.UnwrapTo(&data); err != nil { + lggr.Errorf("failed to unwrap data: %+v", log.Data) + resp.Event = nil + resp.Err = err + return resp + } + resp.Event.Data = data + case WorkflowActivatedEvent: + var data WorkflowRegistryWorkflowActivatedV1 + if err := dataAsValuesMap.UnwrapTo(&data); err != nil { + lggr.Errorf("failed to unwrap data: %+v", log.Data) + resp.Event = nil + resp.Err = err + return resp + } + resp.Event.Data = data + case WorkflowDeletedEvent: + var data WorkflowRegistryWorkflowDeletedV1 + if err := dataAsValuesMap.UnwrapTo(&data); err != nil { + lggr.Errorf("failed to unwrap data: %+v", log.Data) + resp.Event = nil + resp.Err = err + return resp + } + resp.Event.Data = data default: lggr.Errorf("unknown event type: %s", evt) resp.Event = nil