From 13d6bb71839e02a1cebd81e428cf62cac3783ba2 Mon Sep 17 00:00:00 2001 From: Amir Y Date: Mon, 8 Apr 2024 15:33:25 +0300 Subject: [PATCH] Scale observation building with massive amount of results (#317) * Option 4: change random seed every x round * extract sorter and add unit tests --- pkg/v3/plugin/hooks/add_from_staging.go | 70 ++++++++--- pkg/v3/plugin/hooks/add_from_staging_test.go | 117 +++++++++++++++++++ pkg/v3/plugin/ocr3.go | 6 +- 3 files changed, 177 insertions(+), 16 deletions(-) diff --git a/pkg/v3/plugin/hooks/add_from_staging.go b/pkg/v3/plugin/hooks/add_from_staging.go index 6655e016..7b6bfad0 100644 --- a/pkg/v3/plugin/hooks/add_from_staging.go +++ b/pkg/v3/plugin/hooks/add_from_staging.go @@ -1,9 +1,13 @@ package hooks import ( + "bytes" "fmt" "log" "sort" + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" "github.com/smartcontractkit/chainlink-automation/pkg/v3/random" @@ -11,20 +15,24 @@ import ( "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" ) +type AddFromStagingHook struct { + store types.ResultStore + logger *log.Logger + coord types.Coordinator + sorter stagedResultSorter +} + func NewAddFromStagingHook(store types.ResultStore, coord types.Coordinator, logger *log.Logger) AddFromStagingHook { return AddFromStagingHook{ store: store, coord: coord, logger: log.New(logger.Writer(), fmt.Sprintf("[%s | build hook:add-from-staging]", telemetry.ServiceName), telemetry.LogPkgStdFlags), + sorter: stagedResultSorter{ + shuffledIDs: make(map[string]string), + }, } } -type AddFromStagingHook struct { - store types.ResultStore - logger *log.Logger - coord types.Coordinator -} - // RunHook adds results from the store to the observation. // It sorts by a shuffled workID. workID for all items is shuffled using a pseudorandom source // that is the same across all nodes for a given round. This ensures that all nodes try to @@ -39,20 +47,52 @@ func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation if err != nil { return err } - // creating a map to hold the shuffled workIDs - shuffledIDs := make(map[string]string, len(results)) - for _, result := range results { - shuffledIDs[result.WorkID] = random.ShuffleString(result.WorkID, rSrc) + + results = hook.sorter.orderResults(results, rSrc) + if n := len(results); n > limit { + results = results[:limit] + hook.logger.Printf("skipped %d available results in staging", n-limit) } + hook.logger.Printf("adding %d results to observation", len(results)) + obs.Performable = append(obs.Performable, results...) + + return nil +} + +type stagedResultSorter struct { + lastRandSrc [16]byte + shuffledIDs map[string]string + lock sync.Mutex +} + +// orderResults orders the results by the shuffled workID +func (sorter *stagedResultSorter) orderResults(results []automation.CheckResult, rSrc [16]byte) []automation.CheckResult { + sorter.lock.Lock() + defer sorter.lock.Unlock() + + shuffledIDs := sorter.updateShuffledIDs(results, rSrc) // sort by the shuffled workID sort.Slice(results, func(i, j int) bool { return shuffledIDs[results[i].WorkID] < shuffledIDs[results[j].WorkID] }) - if len(results) > limit { - results = results[:limit] + + return results +} + +// updateShuffledIDs updates the shuffledIDs cache with the new random source or items. +// NOTE: This function is not thread-safe and should be called with a lock +func (sorter *stagedResultSorter) updateShuffledIDs(results []automation.CheckResult, rSrc [16]byte) map[string]string { + // once the random source changes, the workIDs needs to be shuffled again with the new source + if !bytes.Equal(sorter.lastRandSrc[:], rSrc[:]) { + sorter.lastRandSrc = rSrc + sorter.shuffledIDs = make(map[string]string) } - hook.logger.Printf("adding %d results to observation", len(results)) - obs.Performable = append(obs.Performable, results...) - return nil + for _, result := range results { + if _, ok := sorter.shuffledIDs[result.WorkID]; !ok { + sorter.shuffledIDs[result.WorkID] = random.ShuffleString(result.WorkID, rSrc) + } + } + + return sorter.shuffledIDs } diff --git a/pkg/v3/plugin/hooks/add_from_staging_test.go b/pkg/v3/plugin/hooks/add_from_staging_test.go index a24cfd9d..7bd723cb 100644 --- a/pkg/v3/plugin/hooks/add_from_staging_test.go +++ b/pkg/v3/plugin/hooks/add_from_staging_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/mock" ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3" + "github.com/smartcontractkit/chainlink-automation/pkg/v3/random" "github.com/smartcontractkit/chainlink-automation/pkg/v3/types/mocks" types "github.com/smartcontractkit/chainlink-common/pkg/types/automation" ) @@ -235,6 +236,122 @@ func TestAddFromStagingHook_RunHook_Limits(t *testing.T) { } } +func TestAddFromStagingHook_stagedResultSorter(t *testing.T) { + tests := []struct { + name string + cached []types.CheckResult + lastRandSrc [16]byte + input []types.CheckResult + rSrc [16]byte + expected []types.CheckResult + expectedCache map[string]string + expectedLastRandSrc [16]byte + }{ + { + name: "empty results", + cached: []types.CheckResult{}, + input: []types.CheckResult{}, + rSrc: [16]byte{1}, + expected: []types.CheckResult{}, + expectedLastRandSrc: [16]byte{1}, + }, + { + name: "happy path", + input: []types.CheckResult{ + {UpkeepID: [32]byte{3}, WorkID: "30a"}, + {UpkeepID: [32]byte{1}, WorkID: "10c"}, + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + }, + rSrc: [16]byte{1}, + expected: []types.CheckResult{ + {UpkeepID: [32]byte{1}, WorkID: "10c"}, + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + {UpkeepID: [32]byte{3}, WorkID: "30a"}, + }, + expectedCache: map[string]string{ + "10c": "1c0", + "20b": "2b0", + "30a": "3a0", + }, + expectedLastRandSrc: [16]byte{1}, + }, + { + name: "with cached results", + cached: []types.CheckResult{ + {UpkeepID: [32]byte{1}, WorkID: "10c"}, + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + }, + lastRandSrc: [16]byte{1}, + input: []types.CheckResult{ + {UpkeepID: [32]byte{3}, WorkID: "30a"}, + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + }, + rSrc: [16]byte{1}, + expected: []types.CheckResult{ + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + {UpkeepID: [32]byte{3}, WorkID: "30a"}, + }, + expectedCache: map[string]string{ + "10c": "1c0", + "20b": "2b0", + "30a": "3a0", + }, + expectedLastRandSrc: [16]byte{1}, + }, + { + name: "with cached results of different rand src", + cached: []types.CheckResult{ + {UpkeepID: [32]byte{1}, WorkID: "10c"}, + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + }, + lastRandSrc: [16]byte{1}, + input: []types.CheckResult{ + {UpkeepID: [32]byte{3}, WorkID: "30a"}, + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + }, + rSrc: [16]byte{2}, + expected: []types.CheckResult{ + {UpkeepID: [32]byte{2}, WorkID: "20b"}, + {UpkeepID: [32]byte{3}, WorkID: "30a"}, + }, + expectedCache: map[string]string{ + "20b": "02b", + "30a": "03a", + }, + expectedLastRandSrc: [16]byte{2}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sorter := stagedResultSorter{ + shuffledIDs: make(map[string]string), + } + + if len(tc.cached) > 0 { + sorter.shuffledIDs = make(map[string]string) + for _, r := range tc.cached { + sorter.shuffledIDs[r.WorkID] = random.ShuffleString(r.WorkID, tc.lastRandSrc) + } + sorter.lastRandSrc = tc.lastRandSrc + } + + results := sorter.orderResults(tc.input, tc.rSrc) + assert.Equal(t, len(tc.expected), len(results)) + for i := range results { + assert.Equal(t, tc.expected[i].WorkID, results[i].WorkID) + } + sorter.lock.Lock() + defer sorter.lock.Unlock() + assert.Equal(t, tc.expectedLastRandSrc, sorter.lastRandSrc) + assert.Equal(t, len(tc.expectedCache), len(sorter.shuffledIDs)) + for k, v := range tc.expectedCache { + assert.Equal(t, v, sorter.shuffledIDs[k]) + } + }) + } +} + func getMocks(n int) (*mocks.MockResultStore, *mocks.MockCoordinator) { mockResults := make([]types.CheckResult, n) for i := 0; i < n; i++ { diff --git a/pkg/v3/plugin/ocr3.go b/pkg/v3/plugin/ocr3.go index e577b3c1..564c4edd 100644 --- a/pkg/v3/plugin/ocr3.go +++ b/pkg/v3/plugin/ocr3.go @@ -65,7 +65,11 @@ func (plugin *ocr3Plugin) Observation(ctx context.Context, outctx ocr3types.Outc plugin.AddBlockHistoryHook.RunHook(&observation, ocr2keepersv3.ObservationBlockHistoryLimit) - if err := plugin.AddFromStagingHook.RunHook(&observation, ocr2keepersv3.ObservationPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr)); err != nil { + // using the OCR seq number for randomness of the performables ordering. + // high randomness results in expesive ordering, therefore we reduce + // the range of the randomness by dividing the seq number by 10 + randSrcSeq := outctx.SeqNr / 10 + if err := plugin.AddFromStagingHook.RunHook(&observation, ocr2keepersv3.ObservationPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, randSrcSeq)); err != nil { return nil, err } prommetrics.AutomationPluginPerformables.WithLabelValues(prommetrics.PluginStepResultStore).Set(float64(len(observation.Performable)))