Skip to content

Commit

Permalink
performables limit: use a heuristic to calculate number of perfomables
Browse files Browse the repository at this point in the history
- doing encoding once upon init
- includes a large buffer as we use max values for all fields
  • Loading branch information
amirylm committed Jan 24, 2024
1 parent d6adf47 commit 5ee9f33
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 188 deletions.
7 changes: 3 additions & 4 deletions pkg/v3/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
// as different nodes would upgrade at different times and would need to
// adhere to each others' limits
const (
ObservationPerformablesLimit = 50
ObservationLogRecoveryProposalsLimit = 5
ObservationConditionalsProposalsLimit = 5
ObservationBlockHistoryLimit = 256
Expand Down Expand Up @@ -48,6 +47,9 @@ func (observation AutomationObservation) Encode() ([]byte, error) {
}

func DecodeAutomationObservation(data []byte, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) (AutomationObservation, error) {
if len(data) > MaxObservationLength {
return AutomationObservation{}, fmt.Errorf("observation length cannot be greater than %d", MaxObservationLength)
}
ao := AutomationObservation{}
err := json.Unmarshal(data, &ao)
if err != nil {
Expand Down Expand Up @@ -75,9 +77,6 @@ func validateAutomationObservation(o AutomationObservation, utg types.UpkeepType
}

// Validate Performables
if (len(o.Performable)) > ObservationPerformablesLimit {
return fmt.Errorf("performable length cannot be greater than %d", ObservationPerformablesLimit)
}
seenPerformables := make(map[string]bool)
for _, res := range o.Performable {
if err := validateCheckResult(res, utg, wg); err != nil {
Expand Down
69 changes: 14 additions & 55 deletions pkg/v3/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,33 @@ func TestDuplicateBlockHistory(t *testing.T) {
assert.ErrorContains(t, err, "block history cannot have duplicate block numbers")
}

func TestLargePerformable(t *testing.T) {
func TestLargeObservation(t *testing.T) {
ao := AutomationObservation{
Performable: []commontypes.CheckResult{},
UpkeepProposals: []commontypes.CoordinatedBlockProposal{validConditionalProposal, validLogProposal},
BlockHistory: validBlockHistory,
}
for i := 0; i < ObservationPerformablesLimit+1; i++ {
newConditionalResult := validConditionalResult
size := types.BlockHistorySize(ao.BlockHistory)
size += types.CoordinatedBlockProposalSize(len(ao.UpkeepProposals))

for i := 0; size < MaxObservationLength*2; i++ {
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 1)))
newConditionalResult.UpkeepID = uid
newConditionalResult.WorkID = mockWorkIDGenerator(newConditionalResult.UpkeepID, newConditionalResult.Trigger)
ao.Performable = append(ao.Performable, newConditionalResult)
newResult := validLogResult
if mockUpkeepTypeGetter(uid) == types.ConditionTrigger {
newResult = validConditionalResult
}
newResult.UpkeepID = uid
newResult.WorkID = mockWorkIDGenerator(newResult.UpkeepID, newResult.Trigger)
ao.Performable = append(ao.Performable, newResult)
size += types.CheckResultSize(newResult)
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation observation")

_, err = DecodeAutomationObservation(encoded, mockUpkeepTypeGetter, mockWorkIDGenerator)
assert.Error(t, err)
assert.ErrorContains(t, err, "performable length cannot be greater than")
assert.ErrorContains(t, err, "observation length cannot be greater than")
}

func TestDuplicatePerformable(t *testing.T) {
Expand Down Expand Up @@ -564,54 +571,6 @@ func TestInvalidLogProposal(t *testing.T) {
assert.ErrorContains(t, err, "log trigger extension cannot be empty for log upkeep")
}

func TestLargeObservationSize(t *testing.T) {
ao := AutomationObservation{
Performable: []commontypes.CheckResult{},
UpkeepProposals: []commontypes.CoordinatedBlockProposal{},
BlockHistory: commontypes.BlockHistory{},
}
for i := 0; i < ObservationBlockHistoryLimit; i++ {
ao.BlockHistory = append(ao.BlockHistory, commontypes.BlockKey{
Number: commontypes.BlockNumber(i + 1),
Hash: [32]byte{1},
})
}
largePerformData := [10001]byte{}
for i := 0; i < ObservationPerformablesLimit; i++ {
newResult := validLogResult
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 10001)))
newResult.UpkeepID = uid
newResult.WorkID = mockWorkIDGenerator(newResult.UpkeepID, newResult.Trigger)
newResult.PerformData = largePerformData[:]
ao.Performable = append(ao.Performable, newResult)
}
for i := 0; i < ObservationConditionalsProposalsLimit; i++ {
newProposal := validConditionalProposal
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 1)))
newProposal.UpkeepID = uid
newProposal.WorkID = mockWorkIDGenerator(newProposal.UpkeepID, newProposal.Trigger)
ao.UpkeepProposals = append(ao.UpkeepProposals, newProposal)
}
for i := 0; i < ObservationLogRecoveryProposalsLimit; i++ {
newProposal := validLogProposal
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 1001)))
newProposal.UpkeepID = uid
newProposal.WorkID = mockWorkIDGenerator(newProposal.UpkeepID, newProposal.Trigger)
ao.UpkeepProposals = append(ao.UpkeepProposals, newProposal)
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation observation")

decoded, err := DecodeAutomationObservation(encoded, mockUpkeepTypeGetter, mockWorkIDGenerator)
assert.NoError(t, err, "no error in decoding valid automation observation")

assert.Equal(t, ao, decoded, "final result from encoding and decoding should match")
assert.Less(t, len(encoded), MaxObservationLength, "encoded observation should be less than maxObservationSize")
}

func mockUpkeepTypeGetter(id commontypes.UpkeepIdentifier) types.UpkeepType {
if id == conditionalUpkeepID {
return types.ConditionTrigger
Expand Down
5 changes: 4 additions & 1 deletion pkg/v3/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type AutomationOutcome struct {
func validateAutomationOutcome(o AutomationOutcome, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) error {
// Validate AgreedPerformables
if (len(o.AgreedPerformables)) > OutcomeAgreedPerformablesLimit {
return fmt.Errorf("outcome performable length cannot be greater than %d", OutcomeAgreedPerformablesLimit)
return fmt.Errorf("outcome length cannot be greater than %d", OutcomeAgreedPerformablesLimit)
}
seenPerformables := make(map[string]bool)
for _, res := range o.AgreedPerformables {
Expand Down Expand Up @@ -100,6 +100,9 @@ func (outcome AutomationOutcome) Encode() ([]byte, error) {
// DecodeAutomationOutcome decodes an AutomationOutcome from an encoded array
// of bytes. Possible errors come from the encoding/json package
func DecodeAutomationOutcome(data []byte, utg types.UpkeepTypeGetter, wg types.WorkIDGenerator) (AutomationOutcome, error) {
if len(data) > MaxOutcomeLength {
return AutomationOutcome{}, fmt.Errorf("outcome length cannot be greater than %d", MaxOutcomeLength)
}
ao := AutomationOutcome{}
err := json.Unmarshal(data, &ao)
if err != nil {
Expand Down
63 changes: 36 additions & 27 deletions pkg/v3/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (

"github.com/stretchr/testify/assert"

types "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)

var validOutcome = AutomationOutcome{
AgreedPerformables: []types.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]types.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
AgreedPerformables: []commontypes.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
}
var expectedEncodedOutcome []byte

Expand Down Expand Up @@ -54,27 +55,35 @@ func TestAutomationOutcomeEncodeBackwardsCompatibility(t *testing.T) {

func TestLargeAgreedPerformables(t *testing.T) {
ao := AutomationOutcome{
AgreedPerformables: []types.CheckResult{},
SurfacedProposals: [][]types.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
AgreedPerformables: []commontypes.CheckResult{},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
}
for i := 0; i < OutcomeAgreedPerformablesLimit+1; i++ {
newConditionalResult := validConditionalResult
newConditionalResult.Trigger.BlockNumber = types.BlockNumber(i + 1)
newConditionalResult.WorkID = mockWorkIDGenerator(newConditionalResult.UpkeepID, newConditionalResult.Trigger)
size := 0
for _, r := range ao.SurfacedProposals {
size += types.CoordinatedBlockProposalSize(len(r))
}
for i := 0; size < MaxOutcomeLength; i++ {
newResult := validLogResult
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 10001)))
newResult.UpkeepID = uid
newResult.Trigger.BlockNumber = commontypes.BlockNumber(i + 1)
newResult.WorkID = mockWorkIDGenerator(newResult.UpkeepID, newResult.Trigger)
ao.AgreedPerformables = append(ao.AgreedPerformables, validConditionalResult)
size += types.CheckResultSize(newResult)
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation outcome")

_, err = DecodeAutomationOutcome(encoded, mockUpkeepTypeGetter, mockWorkIDGenerator)
assert.Error(t, err)
assert.ErrorContains(t, err, "outcome performable length cannot be greater than")
assert.ErrorContains(t, err, "outcome length cannot be greater than")
}

func TestDuplicateAgreedPerformables(t *testing.T) {
ao := AutomationOutcome{
AgreedPerformables: []types.CheckResult{},
SurfacedProposals: [][]types.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
AgreedPerformables: []commontypes.CheckResult{},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{{validConditionalProposal, validLogProposal}},
}
for i := 0; i < 2; i++ {
ao.AgreedPerformables = append(ao.AgreedPerformables, validConditionalResult)
Expand All @@ -89,16 +98,16 @@ func TestDuplicateAgreedPerformables(t *testing.T) {

func TestLargeProposalHistory(t *testing.T) {
ao := AutomationOutcome{
AgreedPerformables: []types.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]types.CoordinatedBlockProposal{},
AgreedPerformables: []commontypes.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{},
}
for i := 0; i < OutcomeSurfacedProposalsRoundHistoryLimit+1; i++ {
newProposal := validConditionalProposal
uid := types.UpkeepIdentifier{}
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 1)))
newProposal.UpkeepID = uid
newProposal.WorkID = mockWorkIDGenerator(newProposal.UpkeepID, newProposal.Trigger)
ao.SurfacedProposals = append(ao.SurfacedProposals, []types.CoordinatedBlockProposal{newProposal})
ao.SurfacedProposals = append(ao.SurfacedProposals, []commontypes.CoordinatedBlockProposal{newProposal})
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation outcome")
Expand All @@ -110,12 +119,12 @@ func TestLargeProposalHistory(t *testing.T) {

func TestLargeSurfacedProposalInSingleRound(t *testing.T) {
ao := AutomationOutcome{
AgreedPerformables: []types.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]types.CoordinatedBlockProposal{{}},
AgreedPerformables: []commontypes.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{{}},
}
for i := 0; i < OutcomeSurfacedProposalsLimit+1; i++ {
newProposal := validConditionalProposal
uid := types.UpkeepIdentifier{}
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 1)))
newProposal.UpkeepID = uid
newProposal.WorkID = mockWorkIDGenerator(newProposal.UpkeepID, newProposal.Trigger)
Expand All @@ -131,11 +140,11 @@ func TestLargeSurfacedProposalInSingleRound(t *testing.T) {

func TestDuplicateSurfaced(t *testing.T) {
ao := AutomationOutcome{
AgreedPerformables: []types.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]types.CoordinatedBlockProposal{{}},
AgreedPerformables: []commontypes.CheckResult{validConditionalResult, validLogResult},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{{}},
}
for i := 0; i < 2; i++ {
ao.SurfacedProposals = append(ao.SurfacedProposals, []types.CoordinatedBlockProposal{validConditionalProposal})
ao.SurfacedProposals = append(ao.SurfacedProposals, []commontypes.CoordinatedBlockProposal{validConditionalProposal})
}
encoded, err := ao.Encode()
assert.NoError(t, err, "no error in encoding valid automation outcome")
Expand All @@ -147,24 +156,24 @@ func TestDuplicateSurfaced(t *testing.T) {

func TestLargeOutcomeSize(t *testing.T) {
ao := AutomationOutcome{
AgreedPerformables: []types.CheckResult{},
SurfacedProposals: [][]types.CoordinatedBlockProposal{},
AgreedPerformables: []commontypes.CheckResult{},
SurfacedProposals: [][]commontypes.CoordinatedBlockProposal{},
}
largePerformData := [10001]byte{}
for i := 0; i < OutcomeAgreedPerformablesLimit; i++ {
newResult := validLogResult
uid := types.UpkeepIdentifier{}
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i + 10001)))
newResult.UpkeepID = uid
newResult.WorkID = mockWorkIDGenerator(newResult.UpkeepID, newResult.Trigger)
newResult.PerformData = largePerformData[:]
ao.AgreedPerformables = append(ao.AgreedPerformables, newResult)
}
for i := 0; i < OutcomeSurfacedProposalsRoundHistoryLimit; i++ {
round := []types.CoordinatedBlockProposal{}
round := []commontypes.CoordinatedBlockProposal{}
for j := 0; j < OutcomeSurfacedProposalsLimit; j++ {
newProposal := validLogProposal
uid := types.UpkeepIdentifier{}
uid := commontypes.UpkeepIdentifier{}
uid.FromBigInt(big.NewInt(int64(i*OutcomeSurfacedProposalsLimit + j + 1001)))
newProposal.UpkeepID = uid
newProposal.WorkID = mockWorkIDGenerator(newProposal.UpkeepID, newProposal.Trigger)
Expand Down
23 changes: 18 additions & 5 deletions pkg/v3/plugin/hooks/add_from_staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AddFromStagingHook struct {
// that is the same across all nodes for a given round. This ensures that all nodes try to
// send the same subset of workIDs if they are available, while giving different priority
// to workIDs in different rounds.
func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation, limit int, rSrc [16]byte) error {
func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation, sizeLimit int, rSrc [16]byte) error {
results, err := hook.store.View()
if err != nil {
return err
Expand All @@ -48,11 +48,24 @@ func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation
sort.Slice(results, func(i, j int) bool {
return shuffledIDs[results[i].WorkID] < shuffledIDs[results[j].WorkID]
})
if len(results) > limit {
results = results[:limit]

observationSize := types.BlockHistorySize(obs.BlockHistory)
observationSize += types.CoordinatedBlockProposalSize(len(obs.UpkeepProposals))
for _, result := range obs.Performable {
observationSize += types.CheckResultSize(result)
}
// add results to observation until size limit is reached
added := 0
for _, result := range results {
observationSize += types.CheckResultSize(result)
if observationSize > sizeLimit {
break
}
obs.Performable = append(obs.Performable, result)
added++
}
hook.logger.Printf("adding %d results to observation", len(results))
obs.Performable = append(obs.Performable, results...)

hook.logger.Printf("adding %d results to observation", added)

return nil
}
Loading

0 comments on commit 5ee9f33

Please sign in to comment.