Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP #15547

Closed
wants to merge 1 commit into from
Closed

WIP #15547

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim
}

var logs []Log
o.lggr.Debugw("FilteredLogs", "query", query)
if err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...); err != nil {
return nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,19 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}, eventHandler)

globalLogger.Debugw("Creating WorkflowRegistrySyncer")
wfSyncer := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, cfg.Capabilities().WorkflowRegistry().Address(),
wfSyncer := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
},
cfg.Capabilities().WorkflowRegistry().Address(),
syncer.WorkflowEventPollerConfig{
QueryCount: 100,
}, eventHandler, loader, workflowDonNotifier)
},
eventHandler,
loader,
workflowDonNotifier,
)

srvcs = append(srvcs, fetcher, wfSyncer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"testing"
"time"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

Expand Down Expand Up @@ -157,8 +155,6 @@ func Test_SecretsWorker(t *testing.T) {
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
contractName = syncer.WorkflowRegistryContractName
forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent)
)

defer giveTicker.Stop()
Expand All @@ -176,36 +172,6 @@ func Test_SecretsWorker(t *testing.T) {

lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex())

// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
contractName: {
ContractPollingFilter: evmtypes.ContractPollingFilter{
GenericEventNames: []string{forceUpdateSecretsEvent},
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
forceUpdateSecretsEvent: {
ChainSpecificName: forceUpdateSecretsEvent,
ReadType: evmtypes.Event,
},
syncer.GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
require.NoError(t, err)

contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// Seed the DB
hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...))
require.NoError(t, err)
Expand All @@ -226,17 +192,23 @@ func Test_SecretsWorker(t *testing.T) {
handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{})

worker := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return contractReader, nil
}, wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
QueryCount: 20,
}, handler, &testWorkflowRegistryContractLoader{}, &testDonNotifier{
worker := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
}, syncer.WithTicker(giveTicker.C))
},
syncer.WithTicker(giveTicker.C),
)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
Expand All @@ -257,6 +229,80 @@ func Test_SecretsWorker(t *testing.T) {
}, 15*time.Second, time.Second)
}

func Test_RegistrySyncer_EventHandling(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
emitter = custmsg.NewLabeler()
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)

giveTicker = time.NewTicker(500 * time.Millisecond)
giveBinaryURL = "https://original-url.com"
donID = uint32(1)
giveWorkflow = RegisterWorkflowCMD{
Name: "test-wf",
DonID: donID,
Status: uint8(1),
BinaryURL: giveBinaryURL,
}
wantContents = "updated contents"
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
)

defer giveTicker.Stop()

id, err := hex.DecodeString("bc9b363332ecaebbf5e476bbcce29a6ee89856f64d345e967dbdaaf880c520cd")
require.NoError(t, err)
giveWorkflow.ID = [32]byte(id)

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex())

handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{})

worker := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
},
syncer.WithTicker(giveTicker.C),
)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)

servicetest.Run(t, worker)

// generate a log event
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

// Require the secrets contents to eventually be updated
require.Eventually(t, func() bool {
_, err := orm.GetWorkflowSpec(ctx, backendTH.ContractsOwner.From.Hex(), "test-wf")
return err == nil
}, 15*time.Second, time.Second)
}

func updateAuthorizedAddress(
t *testing.T,
th *testutils.EVMBackendTH,
Expand Down
20 changes: 14 additions & 6 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,22 @@ func (h *eventHandler) workflowRegisteredEvent(
return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err)
}

config, err := h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
var config []byte
if payload.ConfigURL != "" {
c, err := h.fetcher(ctx, payload.ConfigURL)
if err != nil {
return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err)
}
config = c
}

secrets, err := h.fetcher(ctx, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err)
var secrets []byte
if payload.SecretsURL != "" {
s, err := h.fetcher(ctx, payload.SecretsURL)
if err != nil {
return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err)
}
secrets = s
}

// Calculate the hash of the binary and config files
Expand Down
Loading
Loading