From a76e519cbe87b65f738132c4a52370e2449ea09c Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Fri, 14 Jun 2024 15:14:36 -0500 Subject: [PATCH 1/4] Optimize diff-states to diff storage elems in parallel This adds additional parallelism to diff-states command to the concurrency added by PR #6097, which was limited to: - parallel diff of separate accounts - parallel diff of separate domains within large accounts Elements stored in storage domain can be deeply nested data, which takes time to compare. This commit compares elements in storage domain in parallel for large accounts. This requires creating Cadence interpreter in each worker goroutine for value comparison. Also added duration logging when diffing large accounts. --- cmd/util/cmd/diff-states/cmd.go | 2 +- .../ledger/migrations/cadence_value_diff.go | 157 ++++++++++++++++-- 2 files changed, 146 insertions(+), 13 deletions(-) diff --git a/cmd/util/cmd/diff-states/cmd.go b/cmd/util/cmd/diff-states/cmd.go index d58fbe2001e..ed1d13ef30a 100644 --- a/cmd/util/cmd/diff-states/cmd.go +++ b/cmd/util/cmd/diff-states/cmd.go @@ -333,7 +333,7 @@ func diffAccount( chainID, rw, true, - flagNWorker, + flagNWorker/2, ).DiffStates( accountRegisters1, accountRegisters2, diff --git a/cmd/util/ledger/migrations/cadence_value_diff.go b/cmd/util/ledger/migrations/cadence_value_diff.go index 90d5172277d..9eb892b127c 100644 --- a/cmd/util/ledger/migrations/cadence_value_diff.go +++ b/cmd/util/ledger/migrations/cadence_value_diff.go @@ -2,10 +2,12 @@ package migrations import ( "fmt" + "time" "github.com/onflow/cadence/runtime" "github.com/onflow/cadence/runtime/common" "github.com/onflow/cadence/runtime/interpreter" + "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" "github.com/onflow/flow-go/cmd/util/ledger/reporters" @@ -74,7 +76,7 @@ type difference struct { NewValueStaticType string `json:",omitempty"` } -const minLargeAccountRegisterCount = 1_000 +const minLargeAccountRegisterCount = 50_000 type CadenceValueDiffReporter struct { address common.Address @@ -156,7 +158,7 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist return fmt.Errorf("failed to create runtime for new registers: %s", err) } - dr.diffStorageDomain(oldRuntime, newRuntime, common.PathDomainStorage.Identifier()) + dr.diffDomain(oldRuntime, newRuntime, common.PathDomainStorage.Identifier()) return nil }) @@ -174,7 +176,7 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist for _, domain := range domains { if domain != common.PathDomainStorage.Identifier() { - dr.diffStorageDomain(oldRuntime, newRuntime, domain) + dr.diffDomain(oldRuntime, newRuntime, domain) } } return nil @@ -217,11 +219,11 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist } for _, domain := range domains { - dr.diffStorageDomain(oldRuntime, newRuntime, domain) + dr.diffDomain(oldRuntime, newRuntime, domain) } } -func (dr *CadenceValueDiffReporter) diffStorageDomain( +func (dr *CadenceValueDiffReporter) diffDomain( oldRuntime *readonlyStorageRuntime, newRuntime *readonlyStorageRuntime, domain string, @@ -317,8 +319,11 @@ func (dr *CadenceValueDiffReporter) diffStorageDomain( }) } - // Compare elements present in both storage maps - for _, key := range sharedKeys { + if len(sharedKeys) == 0 { + return + } + + getValues := func(key any) (interpreter.Value, interpreter.Value, *util.Trace, bool) { trace := util.NewTrace(fmt.Sprintf("%s[%v]", domain, key)) @@ -350,17 +355,27 @@ func (dr *CadenceValueDiffReporter) diffStorageDomain( key, ), }) - continue + return nil, nil, nil, false } oldValue := oldStorageMap.ReadValue(nil, mapKey) newValue := newStorageMap.ReadValue(nil, mapKey) + return oldValue, newValue, trace, true + } + + diffValues := func( + oldInterpreter *interpreter.Interpreter, + oldValue interpreter.Value, + newInterpreter *interpreter.Interpreter, + newValue interpreter.Value, + trace *util.Trace, + ) { hasDifference := dr.diffValues( - oldRuntime.Interpreter, + oldInterpreter, oldValue, - newRuntime.Interpreter, + newInterpreter, newValue, domain, trace, @@ -377,13 +392,131 @@ func (dr *CadenceValueDiffReporter) diffStorageDomain( Trace: trace.String(), OldValue: oldValue.String(), NewValue: newValue.String(), - OldValueStaticType: oldValue.StaticType(oldRuntime.Interpreter).String(), - NewValueStaticType: newValue.StaticType(newRuntime.Interpreter).String(), + OldValueStaticType: oldValue.StaticType(oldInterpreter).String(), + NewValueStaticType: newValue.StaticType(newInterpreter).String(), }) } } + } + + // Skip goroutine overhead for non-storage domain and small accounts. + if domain != common.PathDomainStorage.Identifier() || + oldRuntime.PayloadCount < minLargeAccountRegisterCount || + len(sharedKeys) == 1 { + + for _, key := range sharedKeys { + oldValue, newValue, trace, canDiff := getValues(key) + if canDiff { + diffValues( + oldRuntime.Interpreter, + oldValue, + newRuntime.Interpreter, + newValue, + trace, + ) + } + } + return + } + + startTime := time.Now() + + log.Info().Msgf( + "Diffing %x storage domain containing %d elements (%d payloads) ...", + dr.address[:], + len(sharedKeys), + oldRuntime.PayloadCount, + ) + + // Diffing storage domain in large account + + type job struct { + oldValue interpreter.Value + newValue interpreter.Value + trace *util.Trace + } + nWorkers := dr.nWorkers + if len(sharedKeys) < nWorkers { + nWorkers = len(sharedKeys) } + + jobs := make(chan job, nWorkers) + + var g errgroup.Group + + for i := 0; i < nWorkers; i++ { + + g.Go(func() error { + oldInterpreter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: oldRuntime.Storage, + }, + ) + if err != nil { + dr.reportWriter.Write( + diffError{ + Address: dr.address.Hex(), + Kind: diffErrorKindString[abortErrorKind], + Msg: fmt.Sprintf("failed to create interpreter for old registers: %s", err), + }) + return nil + } + + newInterpreter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: newRuntime.Storage, + }, + ) + if err != nil { + dr.reportWriter.Write( + diffError{ + Address: dr.address.Hex(), + Kind: diffErrorKindString[abortErrorKind], + Msg: fmt.Sprintf("failed to create interpreter for new registers: %s", err), + }) + return nil + } + + for job := range jobs { + diffValues(oldInterpreter, job.oldValue, newInterpreter, job.newValue, job.trace) + } + + return nil + }) + } + + // Launch goroutine to send account registers to jobs channel + go func() { + defer close(jobs) + + for _, key := range sharedKeys { + oldValue, newValue, trace, canDiff := getValues(key) + if canDiff { + jobs <- job{ + oldValue: oldValue, + newValue: newValue, + trace: trace, + } + } + } + }() + + // Wait for workers + _ = g.Wait() + + log.Info(). + Msgf( + "Finished diffing %x storage domain containing %d elements (%d payloads) in %s", + dr.address[:], + len(sharedKeys), + oldRuntime.PayloadCount, + time.Since(startTime), + ) } func (dr *CadenceValueDiffReporter) diffValues( From a7a3c4bcdadc36bfb943d6ed6610146f9282a4b4 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 17 Jun 2024 07:20:47 -0500 Subject: [PATCH 2/4] Increase threshold of large account in diff-states This commit increases the minimum payload count threshold for large accounts to use extra goroutines. So only the largest accounts that would benefit most from extra goroutines use them. The minimum payload count was 50,000 and was increased to 1 million. --- cmd/util/ledger/migrations/cadence_value_diff.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/util/ledger/migrations/cadence_value_diff.go b/cmd/util/ledger/migrations/cadence_value_diff.go index 1a8ed656cb1..34c372ca603 100644 --- a/cmd/util/ledger/migrations/cadence_value_diff.go +++ b/cmd/util/ledger/migrations/cadence_value_diff.go @@ -76,7 +76,7 @@ type difference struct { NewValueStaticType string `json:",omitempty"` } -const minLargeAccountRegisterCount = 50_000 +const minLargeAccountRegisterCount = 1_000_000 type CadenceValueDiffReporter struct { address common.Address From bfb066da37e8189da78ac896d648731bc5c3b97a Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 17 Jun 2024 07:53:31 -0500 Subject: [PATCH 3/4] Optimize diff-states by removing old bottleneck Currently, diffCadenceValues() is used in diffCadenceDictionaryValue() to compare dict keys in old and new dict values. The complexity is O(n^2) where n is number of keys. This commit removes explicit key comparisons in diffCadenceDictionaryValue() and detects not-found keys when retrieving elements for comparisons. The complexity is O(n) where n is number of keys. Speedup for 1 account using 10 goroutines (from diff-states logs on gcp): - before: 1 hour 9 minutes - after: 1 minute 18 seconds --- .../ledger/migrations/cadence_value_diff.go | 130 +++++++----------- 1 file changed, 52 insertions(+), 78 deletions(-) diff --git a/cmd/util/ledger/migrations/cadence_value_diff.go b/cmd/util/ledger/migrations/cadence_value_diff.go index 68fd2c2598b..b06765e1d61 100644 --- a/cmd/util/ledger/migrations/cadence_value_diff.go +++ b/cmd/util/ledger/migrations/cadence_value_diff.go @@ -858,9 +858,36 @@ func (dr *CadenceValueDiffReporter) diffCadenceDictionaryValue( return true }) - onlyOldKeys, onlyNewKeys, sharedKeys := diffCadenceValues(vInterpreter, oldKeys, newKeys) + onlyOldKeys := make([]interpreter.Value, 0, len(oldKeys)) + + // Compare elements in both dict values + + for _, key := range oldKeys { + valueTrace := trace.Append(fmt.Sprintf("[%v]", key)) + + oldValue, _ := v.Get(vInterpreter, interpreter.EmptyLocationRange, key) + + newValue, found := otherDictionary.Get(otherInterpreter, interpreter.EmptyLocationRange, key) + if !found { + onlyOldKeys = append(onlyOldKeys, key) + continue + } + + elementHasDifference := dr.diffValues( + vInterpreter, + oldValue, + otherInterpreter, + newValue, + domain, + valueTrace, + ) + if elementHasDifference { + hasDifference = true + } + } // Log keys only present in old dict value + if len(onlyOldKeys) > 0 { hasDifference = true @@ -878,42 +905,34 @@ func (dr *CadenceValueDiffReporter) diffCadenceDictionaryValue( }) } - // Log field names only present in new composite value - if len(onlyNewKeys) > 0 { - hasDifference = true - - dr.reportWriter.Write( - difference{ - Address: dr.address.Hex(), - Domain: domain, - Kind: diffKindString[cadenceValueDiffKind], - Trace: trace.String(), - Msg: fmt.Sprintf( - "new dict value has %d elements with keys %v, that are not present in old dict value", - len(onlyNewKeys), - onlyNewKeys, - ), - }) - } - - // Compare elements in both dict values - for _, key := range sharedKeys { - valueTrace := trace.Append(fmt.Sprintf("[%v]", key)) + // Log keys only present in new dict value - oldValue, _ := v.Get(vInterpreter, interpreter.EmptyLocationRange, key) + if len(oldKeys) != len(newKeys) || len(onlyOldKeys) > 0 { + onlyNewKeys := make([]interpreter.Value, 0, len(newKeys)) - newValue, _ := otherDictionary.Get(otherInterpreter, interpreter.EmptyLocationRange, key) + // find keys only present in new dict + for _, key := range newKeys { + found := v.ContainsKey(vInterpreter, interpreter.EmptyLocationRange, key) + if !found { + onlyNewKeys = append(onlyNewKeys, key) + } + } - elementHasDifference := dr.diffValues( - vInterpreter, - oldValue, - otherInterpreter, - newValue, - domain, - valueTrace, - ) - if elementHasDifference { + if len(onlyNewKeys) > 0 { hasDifference = true + + dr.reportWriter.Write( + difference{ + Address: dr.address.Hex(), + Domain: domain, + Kind: diffKindString[cadenceValueDiffKind], + Trace: trace.String(), + Msg: fmt.Sprintf( + "new dict value has %d elements with keys %v, that are not present in old dict value", + len(onlyNewKeys), + onlyNewKeys, + ), + }) } } @@ -968,51 +987,6 @@ func diff[T comparable](old, new []T) (onlyOld, onlyNew, shared []T) { return } -func diffCadenceValues(oldInterpreter *interpreter.Interpreter, old, new []interpreter.Value) (onlyOld, onlyNew, shared []interpreter.Value) { - onlyOld = make([]interpreter.Value, 0, len(old)) - onlyNew = make([]interpreter.Value, 0, len(new)) - shared = make([]interpreter.Value, 0, min(len(old), len(new))) - - sharedNew := make([]bool, len(new)) - - for _, o := range old { - found := false - - for i, n := range new { - foundShared := false - - if ev, ok := o.(interpreter.EquatableValue); ok { - if ev.Equal(oldInterpreter, interpreter.EmptyLocationRange, n) { - foundShared = true - } - } else { - if o == n { - foundShared = true - } - } - - if foundShared { - shared = append(shared, o) - found = true - sharedNew[i] = true - break - } - } - - if !found { - onlyOld = append(onlyOld, o) - } - } - - for i, shared := range sharedNew { - if !shared { - onlyNew = append(onlyNew, new[i]) - } - } - - return -} - func min(a, b int) int { if a <= b { return a From 27b974725b290cb34b85203da100c44b126dd79e Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 17 Jun 2024 12:18:50 -0500 Subject: [PATCH 4/4] Check non-nil error (currently always nil) in diff-states This commit checks for error returned from a goroutine which currently does not return any error. This check is done as a precaution in case the goroutine is modified in the future to return error. --- cmd/util/ledger/migrations/cadence_value_diff.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/util/ledger/migrations/cadence_value_diff.go b/cmd/util/ledger/migrations/cadence_value_diff.go index 34c372ca603..706fa4f71a9 100644 --- a/cmd/util/ledger/migrations/cadence_value_diff.go +++ b/cmd/util/ledger/migrations/cadence_value_diff.go @@ -507,7 +507,15 @@ func (dr *CadenceValueDiffReporter) diffDomain( }() // Wait for workers - _ = g.Wait() + err := g.Wait() + if err != nil { + dr.reportWriter.Write( + diffError{ + Address: dr.address.Hex(), + Kind: diffErrorKindString[abortErrorKind], + Msg: fmt.Sprintf("failed to diff domain %s: %s", domain, err), + }) + } log.Info(). Msgf(