diff --git a/core/capabilities/streams/codec.go b/core/capabilities/streams/codec.go index 21784cdcf40..af574f98fa7 100644 --- a/core/capabilities/streams/codec.go +++ b/core/capabilities/streams/codec.go @@ -23,8 +23,7 @@ func (c *codec) UnwrapValid(wrapped values.Value, allowedSigners [][]byte, minRe for _, signer := range allowedSigners { signersMap[common.BytesToAddress(signer)] = struct{}{} } - dest := []datastreams.FeedReport{} - err := wrapped.UnwrapTo(&dest) + dest, err := datastreams.UnwrapFeedReportList(wrapped) if err != nil { return nil, fmt.Errorf("failed to unwrap: %v", err) } @@ -44,6 +43,9 @@ func (c *codec) UnwrapValid(wrapped values.Value, allowedSigners [][]byte, minRe continue } validated[signerAddr] = struct{}{} + if len(validated) >= minRequiredSignatures { + break // early exit + } } if len(validated) < minRequiredSignatures { return nil, fmt.Errorf("not enough valid signatures %d, needed %d", len(validated), minRequiredSignatures) diff --git a/core/capabilities/streams/consensus_agg_test.go b/core/capabilities/streams/consensus_agg_test.go new file mode 100644 index 00000000000..506ad26f86f --- /dev/null +++ b/core/capabilities/streams/consensus_agg_test.go @@ -0,0 +1,123 @@ +package streams_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/libocr/commontypes" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/datafeeds" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// Integration/load test that combines Data Feeds Consensus Aggregator and Streams Codec. +// For more meaningful measurements, increase the values of parameters P and T. +func TestStreamsConsensusAggregator(t *testing.T) { + Nt := 10 // trigger DON nodes + Ft := 3 // trigger DON faulty nodes + Nw := 10 // workflow DON nodes + Fw := 3 // workflow DON faulty nodes + P := 40 // feeds + T := 2 // test iterations + + triggerNodes := newNodes(t, Nt) + feeds := newFeedsWithSignedReports(t, triggerNodes, Nt, P, 1) + allowedSigners := make([][]byte, Nt) + for i := 0; i < Nt; i++ { + allowedSigners[i] = triggerNodes[i].bundle.PublicKey() // bad name - see comment on evmKeyring.PublicKey + } + + config := newAggConfig(t, feeds) + lggr := logger.TestLogger(t) + codec := streams.NewCodec(lggr) + agg, err := datafeeds.NewDataFeedsAggregator(*config, codec, lggr) + require.NoError(t, err) + + // init round - empty previous Outcome, empty observations + outcome, err := agg.Aggregate(nil, map[commontypes.OracleID][]values.Value{}, Fw) + require.NoError(t, err) + require.False(t, outcome.ShouldReport) + + // validate metadata + newState := &datafeeds.DataFeedsOutcomeMetadata{} + err = proto.Unmarshal(outcome.Metadata, newState) + require.NoError(t, err) + require.Equal(t, P, len(newState.FeedInfo)) + + // run test aggregations + startTs := time.Now().UnixMilli() + processingTime := int64(0) + for c := 0; c < T; c++ { + obs := newObservations(t, Nw, feeds, Ft+1, allowedSigners) + processingStart := time.Now().UnixMilli() + outcome, err = agg.Aggregate(outcome, obs, Fw) + processingTime += time.Now().UnixMilli() - processingStart + require.NoError(t, err) + } + totalTime := time.Now().UnixMilli() - startTs + lggr.Infow("elapsed", "totalMs", totalTime, "processingMs", processingTime) +} + +func newAggConfig(t *testing.T, feeds []feed) *values.Map { + feedMap := map[string]any{} + for _, feed := range feeds { + feedMap[feed.feedIDStr] = map[string]any{ + "deviation": "0.1", + "heartbeat": 1, + "remappedID": feed.feedIDStr, + } + } + unwrappedConfig := map[string]any{ + "feeds": feedMap, + "allowedPartialStaleness": "0.2", + } + config, err := values.NewMap(unwrappedConfig) + require.NoError(t, err) + return config +} + +func newObservations(t *testing.T, nNodes int, feeds []feed, minRequiredSignatures int, allowedSigners [][]byte) map[commontypes.OracleID][]values.Value { + observations := map[commontypes.OracleID][]values.Value{} + for i := 0; i < nNodes; i++ { + reportList := []datastreams.FeedReport{} + for j := 0; j < len(feeds); j++ { + reportIdx := 0 + signedStreamsReport := datastreams.FeedReport{ + FeedID: feeds[j].feedIDStr, + FullReport: feeds[j].reports[reportIdx].rawReport, + ReportContext: feeds[j].reports[reportIdx].reportCtx, + Signatures: feeds[j].reports[reportIdx].signatures, + } + reportList = append(reportList, signedStreamsReport) + } + + payloadVal, err := values.Wrap(reportList) + require.NoError(t, err) + + meta := datastreams.SignersMetadata{ + Signers: allowedSigners, + MinRequiredSignatures: minRequiredSignatures, + } + metaVal, err := values.Wrap(meta) + require.NoError(t, err) + + triggerEvent := capabilities.TriggerEvent{ + TriggerType: triggerID, + ID: "unused", + Timestamp: "1234", + Metadata: metaVal, + Payload: payloadVal, + } + wrappedEvent, err := values.Wrap(triggerEvent) + require.NoError(t, err) + observations[commontypes.OracleID(i)] = []values.Value{wrappedEvent} + } + return observations +} diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 3adf2183999..4bbd5467664 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -24,7 +24,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c diff --git a/core/scripts/go.sum b/core/scripts/go.sum index d15cf62a754..21d3207fdd4 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1212,8 +1212,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 h1:ewTiUC5thYTuoLCW67VwQzM7DXnz4XQU4GOk4IRYM/o= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 h1:YTE+iBNaRNZplGiiTZFYlnUrVYq1pyQr8Yiz3V3gsN8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/go.mod b/go.mod index 5f2a2c175db..02624a20c61 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 diff --git a/go.sum b/go.sum index da0951445f7..dbcf086fdcb 100644 --- a/go.sum +++ b/go.sum @@ -1169,8 +1169,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 h1:ewTiUC5thYTuoLCW67VwQzM7DXnz4XQU4GOk4IRYM/o= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 h1:YTE+iBNaRNZplGiiTZFYlnUrVYq1pyQr8Yiz3V3gsN8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 96c004d492a..123811ebe26 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -28,7 +28,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 github.com/smartcontractkit/chainlink-testing-framework v1.31.7 github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240405215812-5a72bc9af239 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index d76b199515d..b31b5487eb1 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1513,8 +1513,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 h1:ewTiUC5thYTuoLCW67VwQzM7DXnz4XQU4GOk4IRYM/o= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 h1:YTE+iBNaRNZplGiiTZFYlnUrVYq1pyQr8Yiz3V3gsN8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 9437ca3c98b..800f514722e 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.31.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 github.com/smartcontractkit/chainlink-testing-framework v1.31.7 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 3ad51a7ec0d..d284b8484b2 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1503,8 +1503,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40 h1:ewTiUC5thYTuoLCW67VwQzM7DXnz4XQU4GOk4IRYM/o= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627134843-36665f8f9f40/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16 h1:YTE+iBNaRNZplGiiTZFYlnUrVYq1pyQr8Yiz3V3gsN8= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240627145530-c769d7129f16/go.mod h1:EWvSuqIJUYXZLEHewC7WCaPylM2jyjF3Q36BZPS4MoI= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141 h1:TMOoYaeSDkkI3jkCH7lKHOZaLkeDuxFTNC+XblD6M0M= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240621143432-85370a54b141/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=