From 13a97eb728b4db9cc04a46b9051e60dd85139499 Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Wed, 26 Jun 2024 16:11:08 +0200 Subject: [PATCH] covermerger: parallelize --- pkg/covermerger/covermerger.go | 53 +++++++++++++++++++----- pkg/covermerger/repos.go | 27 +++++++++++- tools/syz-bq.sh | 13 ++++-- tools/syz-covermerger/syz_covermerger.go | 7 ++++ 4 files changed, 85 insertions(+), 15 deletions(-) diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index b5bc8cbdb33e..afe7a6330cf3 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -4,13 +4,17 @@ package covermerger import ( + "context" "encoding/csv" + "errors" "fmt" "io" "log" "strconv" + "sync" "golang.org/x/exp/maps" + "golang.org/x/sync/semaphore" ) const ( @@ -73,13 +77,8 @@ type FileCoverageMerger interface { Result() *MergeResult } -func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{}, -) (*MergeResult, error) { +func batchFileData(c *Config, targetFilePath string, records FileRecords) (*MergeResult, error) { log.Printf("processing %d records for %s", len(records), targetFilePath) - if _, exists := processedFiles[targetFilePath]; exists { - return nil, fmt.Errorf("file was already processed, check the input ordering") - } - processedFiles[targetFilePath] = struct{}{} repoBranchCommitsMap := make(map[RepoBranchCommit]bool) for _, record := range records { repoBranchCommitsMap[record.RepoBranchCommit()] = true @@ -125,6 +124,7 @@ func makeRecord(fields, schema []string) FileRecord { } type Config struct { + Jobs int64 Workdir string skipRepoClone bool Base RepoBranchCommit @@ -187,24 +187,55 @@ func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeR targetFile := "" var records []FileRecord processedFiles := map[string]struct{}{} + var mu sync.Mutex + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var err error + var batchesRunning sync.WaitGroup + sem := semaphore.NewWeighted(c.Jobs) for record := range recordsChan { curTargetFile := record[KeyFilePath] if targetFile == "" { targetFile = curTargetFile } if curTargetFile != targetFile { - var err error - if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil { - return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err) + if sem.Acquire(ctx, 1) != nil { + break // context was cancelled } + batchesRunning.Add(1) + go func(records []FileRecord, targetFile string) { + defer batchesRunning.Done() + defer sem.Release(1) + var mr *MergeResult + mr, err = batchFileData(c, targetFile, records) + mu.Lock() + { + if _, exists := processedFiles[targetFile]; exists { + err = errors.New("file was already processed, check the input ordering") + } + processedFiles[targetFile] = struct{}{} + if err != nil { + cancel() + err = fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err) + } else { + stat[targetFile] = mr + } + } + mu.Unlock() + + }(records, targetFile) records = nil targetFile = curTargetFile } records = append(records, record) } + batchesRunning.Wait() if records != nil { - var err error - if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil { + stat[targetFile], err = batchFileData(c, targetFile, records) + if _, exists := processedFiles[targetFile]; exists { + err = errors.New("file was already processed, check the input ordering") + } + if err != nil { return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err) } } diff --git a/pkg/covermerger/repos.go b/pkg/covermerger/repos.go index 2c63e2d5d8a5..161be970720a 100644 --- a/pkg/covermerger/repos.go +++ b/pkg/covermerger/repos.go @@ -6,6 +6,7 @@ package covermerger import ( "fmt" "log" + "sync" "github.com/google/syzkaller/pkg/vcs" "github.com/google/syzkaller/sys/targets" @@ -17,13 +18,16 @@ type fileVersion struct { type fileVersions map[RepoBranchCommit]fileVersion +var gitMutex sync.RWMutex + func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, ) (fileVersions, error) { repos, err := cloneRepos(c, rbcs) + gitMutex.RLock() + defer gitMutex.RUnlock() if err != nil { return nil, fmt.Errorf("failed to clone repos: %w", err) } - res := make(fileVersions) for _, rbc := range rbcs { fileBytes, err := repos[rbc].Object(targetFilePath, rbc.Commit) @@ -40,10 +44,25 @@ func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, type repoCache struct { cache map[RepoBranchCommit]vcs.Repo + mu sync.RWMutex +} + +func (rc *repoCache) allCached(rbcs []RepoBranchCommit) bool { + rc.mu.RLock() + defer rc.mu.RUnlock() + for _, rbc := range rbcs { + rbc.Commit = "" + if _, ok := rc.cache[rbc]; !ok { + return false + } + } + return true } func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo { rbc.Commit = "" + rc.mu.RLock() + defer rc.mu.RUnlock() if repo, ok := rc.cache[rbc]; ok { return repo } @@ -52,6 +71,8 @@ func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo { func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) { rbc.Commit = "" + rc.mu.Lock() + defer rc.mu.Unlock() if rc.cache == nil { rc.cache = map[RepoBranchCommit]vcs.Repo{} } @@ -61,6 +82,10 @@ func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) { func cloneRepos(c *Config, rbcs []RepoBranchCommit) (map[RepoBranchCommit]vcs.Repo, error) { cache := &c.repoCache repos := make(map[RepoBranchCommit]vcs.Repo) + if !cache.allCached(rbcs) { + gitMutex.Lock() + defer gitMutex.Unlock() + } for _, rbc := range rbcs { repos[rbc] = cache.get(rbc) if repos[rbc] != nil { diff --git a/tools/syz-bq.sh b/tools/syz-bq.sh index 343d6c81e778..bf657793f6e4 100755 --- a/tools/syz-bq.sh +++ b/tools/syz-bq.sh @@ -5,7 +5,7 @@ set -e # exit on any problem set -o pipefail -while getopts w:d:t:n:r:b: option +while getopts w:d:t:n:r:b:j: option do case "${option}" in @@ -15,6 +15,7 @@ do n)namespace=${OPTARG};; r)repo=${OPTARG};; b)branch=${OPTARG};; + j)jobs=${OPTARG};; esac done @@ -49,6 +50,11 @@ then exit fi +if [ -z "$jobs" ] +then + jobs=1 +fi + # it also allows to early check gcloud credentials echo "making sure spanner table 'files' exists" create_table=$( echo -n ' @@ -121,14 +127,15 @@ sessionDir="$workdir/sessions/$sessionID" mkdir -p $sessionDir gcloud storage cp $gsURI $sessionDir cat $sessionDir/*.csv.gz | gunzip | \ -go run ./tools/syz-covermerger/ -workdir $workdir \ +go run -ldflags="-s=false" ./tools/syz-covermerger/ -workdir $workdir \ + -jobs $jobs \ -repo $repo \ -branch $branch \ -commit $base_commit \ -save-to-spanner true \ -namespace $namespace \ -duration $duration \ - -date-to $to_date + -date-to $to_date &> /dev/null echo Cleanup rm -rf $sessionDir diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go index afa518487fc1..935064fb7bb3 100644 --- a/tools/syz-covermerger/syz_covermerger.go +++ b/tools/syz-covermerger/syz_covermerger.go @@ -8,6 +8,8 @@ import ( "flag" "fmt" "log" + "net/http" + _ "net/http/pprof" "os" "sort" @@ -27,11 +29,16 @@ var ( flagDateTo = flag.String("date-to", "", "[optional] used to mark DB records") flagSaveToSpanner = flag.String("save-to-spanner", "", "[optional] save aggregation to spanner") flagProjectID = flag.String("project-id", "syzkaller", "[optional] target spanner db project") + flagJobs = flag.Int64("jobs", 1, "[optional] jobs count") ) func main() { + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() flag.Parse() config := &covermerger.Config{ + Jobs: *flagJobs, Workdir: *flagWorkdir, Base: covermerger.RepoBranchCommit{ Repo: *flagRepo,