Skip to content

Commit

Permalink
Merge pull request onflow#6105 from onflow/fxamacker/optimize-diff-st…
Browse files Browse the repository at this point in the history
…ates-util-by-splitting-storage-domain

Optimize diff-states to diff storage elems in parallel for large accounts
  • Loading branch information
fxamacker authored Jun 17, 2024
2 parents 7e79743 + 64bd555 commit b219b03
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cmd/util/cmd/diff-states/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func diffAccount(
chainID,
rw,
true,
flagNWorker,
flagNWorker/2,
).DiffStates(
accountRegisters1,
accountRegisters2,
Expand Down
165 changes: 153 additions & 12 deletions cmd/util/ledger/migrations/cadence_value_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package migrations

import (
"fmt"
"time"

"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"
"github.com/onflow/cadence/runtime/interpreter"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-go/cmd/util/ledger/reporters"
Expand Down Expand Up @@ -74,7 +76,7 @@ type difference struct {
NewValueStaticType string `json:",omitempty"`
}

const minLargeAccountRegisterCount = 1_000
const minLargeAccountRegisterCount = 1_000_000

type CadenceValueDiffReporter struct {
address common.Address
Expand Down Expand Up @@ -156,7 +158,7 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist
return fmt.Errorf("failed to create runtime for new registers: %s", err)
}

dr.diffStorageDomain(oldRuntime, newRuntime, common.PathDomainStorage.Identifier())
dr.diffDomain(oldRuntime, newRuntime, common.PathDomainStorage.Identifier())
return nil
})

Expand All @@ -174,7 +176,7 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist

for _, domain := range domains {
if domain != common.PathDomainStorage.Identifier() {
dr.diffStorageDomain(oldRuntime, newRuntime, domain)
dr.diffDomain(oldRuntime, newRuntime, domain)
}
}
return nil
Expand Down Expand Up @@ -217,11 +219,11 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist
}

for _, domain := range domains {
dr.diffStorageDomain(oldRuntime, newRuntime, domain)
dr.diffDomain(oldRuntime, newRuntime, domain)
}
}

func (dr *CadenceValueDiffReporter) diffStorageDomain(
func (dr *CadenceValueDiffReporter) diffDomain(
oldRuntime *readonlyStorageRuntime,
newRuntime *readonlyStorageRuntime,
domain string,
Expand Down Expand Up @@ -317,8 +319,11 @@ func (dr *CadenceValueDiffReporter) diffStorageDomain(
})
}

// Compare elements present in both storage maps
for _, key := range sharedKeys {
if len(sharedKeys) == 0 {
return
}

getValues := func(key any) (interpreter.Value, interpreter.Value, *util.Trace, bool) {

trace := util.NewTrace(fmt.Sprintf("%s[%v]", domain, key))

Expand Down Expand Up @@ -350,17 +355,27 @@ func (dr *CadenceValueDiffReporter) diffStorageDomain(
key,
),
})
continue
return nil, nil, nil, false
}

oldValue := oldStorageMap.ReadValue(nil, mapKey)

newValue := newStorageMap.ReadValue(nil, mapKey)

return oldValue, newValue, trace, true
}

diffValues := func(
oldInterpreter *interpreter.Interpreter,
oldValue interpreter.Value,
newInterpreter *interpreter.Interpreter,
newValue interpreter.Value,
trace *util.Trace,
) {
hasDifference := dr.diffValues(
oldRuntime.Interpreter,
oldInterpreter,
oldValue,
newRuntime.Interpreter,
newInterpreter,
newValue,
domain,
trace,
Expand All @@ -377,13 +392,139 @@ func (dr *CadenceValueDiffReporter) diffStorageDomain(
Trace: trace.String(),
OldValue: oldValue.String(),
NewValue: newValue.String(),
OldValueStaticType: oldValue.StaticType(oldRuntime.Interpreter).String(),
NewValueStaticType: newValue.StaticType(newRuntime.Interpreter).String(),
OldValueStaticType: oldValue.StaticType(oldInterpreter).String(),
NewValueStaticType: newValue.StaticType(newInterpreter).String(),
})
}
}
}

// Skip goroutine overhead for non-storage domain and small accounts.
if domain != common.PathDomainStorage.Identifier() ||
oldRuntime.PayloadCount < minLargeAccountRegisterCount ||
len(sharedKeys) == 1 {

for _, key := range sharedKeys {
oldValue, newValue, trace, canDiff := getValues(key)
if canDiff {
diffValues(
oldRuntime.Interpreter,
oldValue,
newRuntime.Interpreter,
newValue,
trace,
)
}
}
return
}

startTime := time.Now()

log.Info().Msgf(
"Diffing %x storage domain containing %d elements (%d payloads) ...",
dr.address[:],
len(sharedKeys),
oldRuntime.PayloadCount,
)

// Diffing storage domain in large account

type job struct {
oldValue interpreter.Value
newValue interpreter.Value
trace *util.Trace
}

nWorkers := dr.nWorkers
if len(sharedKeys) < nWorkers {
nWorkers = len(sharedKeys)
}

jobs := make(chan job, nWorkers)

var g errgroup.Group

for i := 0; i < nWorkers; i++ {

g.Go(func() error {
oldInterpreter, err := interpreter.NewInterpreter(
nil,
nil,
&interpreter.Config{
Storage: oldRuntime.Storage,
},
)
if err != nil {
dr.reportWriter.Write(
diffError{
Address: dr.address.Hex(),
Kind: diffErrorKindString[abortErrorKind],
Msg: fmt.Sprintf("failed to create interpreter for old registers: %s", err),
})
return nil
}

newInterpreter, err := interpreter.NewInterpreter(
nil,
nil,
&interpreter.Config{
Storage: newRuntime.Storage,
},
)
if err != nil {
dr.reportWriter.Write(
diffError{
Address: dr.address.Hex(),
Kind: diffErrorKindString[abortErrorKind],
Msg: fmt.Sprintf("failed to create interpreter for new registers: %s", err),
})
return nil
}

for job := range jobs {
diffValues(oldInterpreter, job.oldValue, newInterpreter, job.newValue, job.trace)
}

return nil
})
}

// Launch goroutine to send account registers to jobs channel
go func() {
defer close(jobs)

for _, key := range sharedKeys {
oldValue, newValue, trace, canDiff := getValues(key)
if canDiff {
jobs <- job{
oldValue: oldValue,
newValue: newValue,
trace: trace,
}
}
}
}()

// Wait for workers
err := g.Wait()
if err != nil {
dr.reportWriter.Write(
diffError{
Address: dr.address.Hex(),
Kind: diffErrorKindString[abortErrorKind],
Msg: fmt.Sprintf("failed to diff domain %s: %s", domain, err),
})
}

log.Info().
Msgf(
"Finished diffing %x storage domain containing %d elements (%d payloads) in %s",
dr.address[:],
len(sharedKeys),
oldRuntime.PayloadCount,
time.Since(startTime),
)
}

func (dr *CadenceValueDiffReporter) diffValues(
Expand Down

0 comments on commit b219b03

Please sign in to comment.