Skip to content

Commit

Permalink
Merge branch 'refs/heads/develop' into ecPablo/link-transfer-mcms
Browse files Browse the repository at this point in the history
# Conflicts:
#	deployment/common/changeset/state.go
#	deployment/common/changeset/transfer_to_mcms_with_timelock_test.go
  • Loading branch information
ecPablo committed Dec 12, 2024
2 parents 6fd93b9 + dde1751 commit f6596c4
Show file tree
Hide file tree
Showing 38 changed files with 540 additions and 200 deletions.
1 change: 1 addition & 0 deletions core/services/ocr3/promwrapper/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, conf
config.ConfigDigest.String(),
promOCR3ReportsGenerated,
promOCR3Durations,
promOCR3Sizes,
promOCR3PluginStatus,
)
return wrapped, info, err
Expand Down
20 changes: 18 additions & 2 deletions core/services/ocr3/promwrapper/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type reportingPlugin[RI any] struct {
// Prometheus components for tracking metrics
reportsGenerated *prometheus.CounterVec
durations *prometheus.HistogramVec
sizes *prometheus.CounterVec
status *prometheus.GaugeVec
}

Expand All @@ -31,6 +32,7 @@ func newReportingPlugin[RI any](
configDigest string,
reportsGenerated *prometheus.CounterVec,
durations *prometheus.HistogramVec,
sizes *prometheus.CounterVec,
status *prometheus.GaugeVec,
) *reportingPlugin[RI] {
return &reportingPlugin[RI]{
Expand All @@ -40,6 +42,7 @@ func newReportingPlugin[RI any](
configDigest: configDigest,
reportsGenerated: reportsGenerated,
durations: durations,
sizes: sizes,
status: status,
}
}
Expand All @@ -51,9 +54,11 @@ func (p *reportingPlugin[RI]) Query(ctx context.Context, outctx ocr3types.Outcom
}

func (p *reportingPlugin[RI]) Observation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query) (ocrtypes.Observation, error) {
return withObservedExecution(p, observation, func() (ocrtypes.Observation, error) {
result, err := withObservedExecution(p, observation, func() (ocrtypes.Observation, error) {
return p.ReportingPlugin.Observation(ctx, outctx, query)
})
p.trackSize(observation, len(result), err)
return result, err
}

func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, ao ocrtypes.AttributedObservation) error {
Expand All @@ -65,9 +70,11 @@ func (p *reportingPlugin[RI]) ValidateObservation(ctx context.Context, outctx oc
}

func (p *reportingPlugin[RI]) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, query ocrtypes.Query, aos []ocrtypes.AttributedObservation) (ocr3types.Outcome, error) {
return withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) {
result, err := withObservedExecution(p, outcome, func() (ocr3types.Outcome, error) {
return p.ReportingPlugin.Outcome(ctx, outctx, query, aos)
})
p.trackSize(outcome, len(result), err)
return result, err
}

func (p *reportingPlugin[RI]) Reports(ctx context.Context, seqNr uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) {
Expand Down Expand Up @@ -111,6 +118,15 @@ func (p *reportingPlugin[RI]) updateStatus(status bool) {
Set(float64(boolToInt(status)))
}

func (p *reportingPlugin[RI]) trackSize(function functionType, size int, err error) {
if err != nil {
return
}
p.sizes.
WithLabelValues(p.chainID, p.plugin, string(function)).
Add(float64(size))
}

func boolToInt(arg bool) int {
if arg {
return 1
Expand Down
40 changes: 29 additions & 11 deletions core/services/ocr3/promwrapper/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ import (
)

func Test_ReportsGeneratedGauge(t *testing.T) {
pluginObservationSize := 5
pluginOutcomeSize := 3

plugin1 := newReportingPlugin(
fakePlugin[uint]{reports: make([]ocr3types.ReportPlus[uint], 2)},
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin2 := newReportingPlugin(
fakePlugin[bool]{reports: make([]ocr3types.ReportPlus[bool], 10)},
"solana", "different_plugin", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
fakePlugin[bool]{reports: make([]ocr3types.ReportPlus[bool], 10), observationSize: pluginObservationSize, outcomeSize: pluginOutcomeSize},
"solana", "different_plugin", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin3 := newReportingPlugin(
fakePlugin[string]{err: errors.New("error")},
"1234", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"1234", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)

r1, err := plugin1.Reports(tests.Context(t), 1, nil)
Expand Down Expand Up @@ -64,20 +67,33 @@ func Test_ReportsGeneratedGauge(t *testing.T) {
require.NoError(t, plugin1.Close())
pluginHealth = testutil.ToFloat64(promOCR3PluginStatus.WithLabelValues("123", "empty", "abc"))
require.Equal(t, 0, int(pluginHealth))

iterations := 10
for i := 0; i < iterations; i++ {
_, err1 := plugin2.Outcome(tests.Context(t), ocr3types.OutcomeContext{}, nil, nil)
require.NoError(t, err1)
}
_, err1 := plugin2.Observation(tests.Context(t), ocr3types.OutcomeContext{}, nil)
require.NoError(t, err1)

outcomesLen := testutil.ToFloat64(promOCR3Sizes.WithLabelValues("solana", "different_plugin", "outcome"))
require.Equal(t, pluginOutcomeSize*iterations, int(outcomesLen))
observationLen := testutil.ToFloat64(promOCR3Sizes.WithLabelValues("solana", "different_plugin", "observation"))
require.Equal(t, pluginObservationSize, int(observationLen))
}

func Test_DurationHistograms(t *testing.T) {
plugin1 := newReportingPlugin(
fakePlugin[uint]{},
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin2 := newReportingPlugin(
fakePlugin[uint]{err: errors.New("error")},
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"123", "empty", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)
plugin3 := newReportingPlugin(
fakePlugin[uint]{},
"solana", "commit", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3PluginStatus,
"solana", "commit", "abc", promOCR3ReportsGenerated, promOCR3Durations, promOCR3Sizes, promOCR3PluginStatus,
)

for _, p := range []*reportingPlugin[uint]{plugin1, plugin2, plugin3} {
Expand All @@ -102,8 +118,10 @@ func Test_DurationHistograms(t *testing.T) {
}

type fakePlugin[RI any] struct {
reports []ocr3types.ReportPlus[RI]
err error
reports []ocr3types.ReportPlus[RI]
observationSize int
outcomeSize int
err error
}

func (f fakePlugin[RI]) Query(context.Context, ocr3types.OutcomeContext) (ocrtypes.Query, error) {
Expand All @@ -117,7 +135,7 @@ func (f fakePlugin[RI]) Observation(context.Context, ocr3types.OutcomeContext, o
if f.err != nil {
return nil, f.err
}
return ocrtypes.Observation{}, nil
return make([]byte, f.observationSize), nil
}

func (f fakePlugin[RI]) ValidateObservation(context.Context, ocr3types.OutcomeContext, ocrtypes.Query, ocrtypes.AttributedObservation) error {
Expand All @@ -132,7 +150,7 @@ func (f fakePlugin[RI]) Outcome(context.Context, ocr3types.OutcomeContext, ocrty
if f.err != nil {
return nil, f.err
}
return ocr3types.Outcome{}, nil
return make([]byte, f.outcomeSize), nil
}

func (f fakePlugin[RI]) Reports(context.Context, uint64, ocr3types.Outcome) ([]ocr3types.ReportPlus[RI], error) {
Expand Down
7 changes: 7 additions & 0 deletions core/services/ocr3/promwrapper/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ var (
},
[]string{"chainID", "plugin", "function", "success"},
)
promOCR3Sizes = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ocr3_reporting_plugin_data_sizes",
Help: "Tracks the size of the data produced by OCR3 plugin in bytes (e.g. reports, observations etc.)",
},
[]string{"chainID", "plugin", "function"},
)
promOCR3PluginStatus = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "ocr3_reporting_plugin_status",
Expand Down
7 changes: 5 additions & 2 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,13 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
secrets_id = CASE
WHEN workflow_specs.secrets_id IS NULL THEN EXCLUDED.secrets_id
ELSE workflow_specs.secrets_id
END
RETURNING id
`

Expand Down
83 changes: 83 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,86 @@ func Test_GetContentsByWorkflowID_SecretsProvidedButEmpty(t *testing.T) {
_, _, err = orm.GetContentsByWorkflowID(ctx, workflowID)
require.ErrorIs(t, err, ErrEmptySecrets)
}

func Test_UpsertWorkflowSpecWithSecrets(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

t.Run("inserts new spec and new secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

// Verify the record exists in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)

// Verify the secrets exists in the database
contents, err := orm.GetContents(ctx, giveURL)
require.NoError(t, err)
require.Equal(t, giveContent, contents)
})

t.Run("updates existing spec and secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

// Update the status
spec.Status = job.WorkflowSpecStatusPaused

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, "new contents")
require.NoError(t, err)

// Verify the record is updated in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Config, dbSpec.Config)

// Verify the secrets is updated in the database
contents, err := orm.GetContents(ctx, giveURL)
require.NoError(t, err)
require.Equal(t, "new contents", contents)
})
}
8 changes: 5 additions & 3 deletions deployment/ccip/changeset/accept_ownership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"golang.org/x/exp/maps"

commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset"
"github.com/smartcontractkit/chainlink/deployment/common/proposalutils"
)

func Test_NewAcceptOwnershipChangeset(t *testing.T) {
t.Parallel()
e := NewMemoryEnvironment(t)
state, err := LoadOnchainState(e.Env)
require.NoError(t, err)
Expand All @@ -20,12 +22,12 @@ func Test_NewAcceptOwnershipChangeset(t *testing.T) {
source := allChains[0]
dest := allChains[1]

timelockContracts := map[uint64]*commonchangeset.TimelockExecutionContracts{
source: &commonchangeset.TimelockExecutionContracts{
timelockContracts := map[uint64]*proposalutils.TimelockExecutionContracts{
source: &proposalutils.TimelockExecutionContracts{
Timelock: state.Chains[source].Timelock,
CallProxy: state.Chains[source].CallProxy,
},
dest: &commonchangeset.TimelockExecutionContracts{
dest: &proposalutils.TimelockExecutionContracts{
Timelock: state.Chains[dest].Timelock,
CallProxy: state.Chains[dest].CallProxy,
},
Expand Down
14 changes: 5 additions & 9 deletions deployment/ccip/changeset/cs_add_chain_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package changeset

import (
"math/big"
"testing"
"time"

"github.com/smartcontractkit/chainlink/deployment/ccip/changeset/internal"
commonchangeset "github.com/smartcontractkit/chainlink/deployment/common/changeset"
"github.com/smartcontractkit/chainlink/deployment/common/proposalutils"
commontypes "github.com/smartcontractkit/chainlink/deployment/common/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"

Expand All @@ -30,6 +30,7 @@ import (
)

func TestAddChainInbound(t *testing.T) {
t.Parallel()
// 4 chains where the 4th is added after initial deployment.
e := NewMemoryEnvironment(t,
WithChains(4),
Expand All @@ -46,12 +47,7 @@ func TestAddChainInbound(t *testing.T) {
require.NoError(t, err)
require.NoError(t, e.Env.ExistingAddresses.Merge(newAddresses))

cfg := commontypes.MCMSWithTimelockConfig{
Canceller: commonchangeset.SingleGroupMCMS(t),
Bypasser: commonchangeset.SingleGroupMCMS(t),
Proposer: commonchangeset.SingleGroupMCMS(t),
TimelockMinDelay: big.NewInt(0),
}
cfg := proposalutils.SingleGroupTimelockConfig(t)
e.Env, err = commonchangeset.ApplyChangesets(t, e.Env, nil, []commonchangeset.ChangesetApplication{
{
Changeset: commonchangeset.WrapChangeSet(commonchangeset.DeployLinkToken),
Expand Down Expand Up @@ -152,7 +148,7 @@ func TestAddChainInbound(t *testing.T) {
}

// transfer ownership to timelock
_, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*commonchangeset.TimelockExecutionContracts{
_, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*proposalutils.TimelockExecutionContracts{
initialDeploy[0]: {
Timelock: state.Chains[initialDeploy[0]].Timelock,
CallProxy: state.Chains[initialDeploy[0]].CallProxy,
Expand Down Expand Up @@ -194,7 +190,7 @@ func TestAddChainInbound(t *testing.T) {
nodeIDs = append(nodeIDs, node.NodeID)
}

_, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*commonchangeset.TimelockExecutionContracts{
_, err = commonchangeset.ApplyChangesets(t, e.Env, map[uint64]*proposalutils.TimelockExecutionContracts{
e.HomeChainSel: {
Timelock: state.Chains[e.HomeChainSel].Timelock,
CallProxy: state.Chains[e.HomeChainSel].CallProxy,
Expand Down
1 change: 1 addition & 0 deletions deployment/ccip/changeset/cs_add_lane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

func TestAddLanesWithTestRouter(t *testing.T) {
t.Parallel()
e := NewMemoryEnvironment(t)
// Here we have CR + nodes set up, but no CCIP contracts deployed.
state, err := LoadOnchainState(e.Env)
Expand Down
Loading

0 comments on commit f6596c4

Please sign in to comment.