Skip to content

Commit

Permalink
covermerger: parallelize
Browse files Browse the repository at this point in the history
  • Loading branch information
tarasmadan committed Jun 26, 2024
1 parent 5c045c0 commit 13a97eb
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
53 changes: 42 additions & 11 deletions pkg/covermerger/covermerger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
package covermerger

import (
"context"
"encoding/csv"
"errors"
"fmt"
"io"
"log"
"strconv"
"sync"

"golang.org/x/exp/maps"
"golang.org/x/sync/semaphore"
)

const (
Expand Down Expand Up @@ -73,13 +77,8 @@ type FileCoverageMerger interface {
Result() *MergeResult
}

func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{},
) (*MergeResult, error) {
func batchFileData(c *Config, targetFilePath string, records FileRecords) (*MergeResult, error) {
log.Printf("processing %d records for %s", len(records), targetFilePath)
if _, exists := processedFiles[targetFilePath]; exists {
return nil, fmt.Errorf("file was already processed, check the input ordering")
}
processedFiles[targetFilePath] = struct{}{}
repoBranchCommitsMap := make(map[RepoBranchCommit]bool)
for _, record := range records {
repoBranchCommitsMap[record.RepoBranchCommit()] = true
Expand Down Expand Up @@ -125,6 +124,7 @@ func makeRecord(fields, schema []string) FileRecord {
}

type Config struct {
Jobs int64
Workdir string
skipRepoClone bool
Base RepoBranchCommit
Expand Down Expand Up @@ -187,24 +187,55 @@ func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeR
targetFile := ""
var records []FileRecord
processedFiles := map[string]struct{}{}
var mu sync.Mutex
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
var batchesRunning sync.WaitGroup
sem := semaphore.NewWeighted(c.Jobs)
for record := range recordsChan {
curTargetFile := record[KeyFilePath]
if targetFile == "" {
targetFile = curTargetFile
}
if curTargetFile != targetFile {
var err error
if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
if sem.Acquire(ctx, 1) != nil {
break // context was cancelled
}
batchesRunning.Add(1)
go func(records []FileRecord, targetFile string) {
defer batchesRunning.Done()
defer sem.Release(1)
var mr *MergeResult
mr, err = batchFileData(c, targetFile, records)
mu.Lock()
{
if _, exists := processedFiles[targetFile]; exists {
err = errors.New("file was already processed, check the input ordering")
}
processedFiles[targetFile] = struct{}{}
if err != nil {
cancel()
err = fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
} else {
stat[targetFile] = mr
}
}
mu.Unlock()

}(records, targetFile)
records = nil
targetFile = curTargetFile
}
records = append(records, record)
}
batchesRunning.Wait()
if records != nil {
var err error
if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
stat[targetFile], err = batchFileData(c, targetFile, records)
if _, exists := processedFiles[targetFile]; exists {
err = errors.New("file was already processed, check the input ordering")
}
if err != nil {
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
}
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/covermerger/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package covermerger
import (
"fmt"
"log"
"sync"

"github.com/google/syzkaller/pkg/vcs"
"github.com/google/syzkaller/sys/targets"
Expand All @@ -17,13 +18,16 @@ type fileVersion struct {

type fileVersions map[RepoBranchCommit]fileVersion

var gitMutex sync.RWMutex

func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
) (fileVersions, error) {
repos, err := cloneRepos(c, rbcs)
gitMutex.RLock()
defer gitMutex.RUnlock()
if err != nil {
return nil, fmt.Errorf("failed to clone repos: %w", err)
}

res := make(fileVersions)
for _, rbc := range rbcs {
fileBytes, err := repos[rbc].Object(targetFilePath, rbc.Commit)
Expand All @@ -40,10 +44,25 @@ func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,

type repoCache struct {
cache map[RepoBranchCommit]vcs.Repo
mu sync.RWMutex
}

func (rc *repoCache) allCached(rbcs []RepoBranchCommit) bool {
rc.mu.RLock()
defer rc.mu.RUnlock()
for _, rbc := range rbcs {
rbc.Commit = ""
if _, ok := rc.cache[rbc]; !ok {
return false
}
}
return true
}

func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo {
rbc.Commit = ""
rc.mu.RLock()
defer rc.mu.RUnlock()
if repo, ok := rc.cache[rbc]; ok {
return repo
}
Expand All @@ -52,6 +71,8 @@ func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo {

func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) {
rbc.Commit = ""
rc.mu.Lock()
defer rc.mu.Unlock()
if rc.cache == nil {
rc.cache = map[RepoBranchCommit]vcs.Repo{}
}
Expand All @@ -61,6 +82,10 @@ func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) {
func cloneRepos(c *Config, rbcs []RepoBranchCommit) (map[RepoBranchCommit]vcs.Repo, error) {
cache := &c.repoCache
repos := make(map[RepoBranchCommit]vcs.Repo)
if !cache.allCached(rbcs) {
gitMutex.Lock()
defer gitMutex.Unlock()
}
for _, rbc := range rbcs {
repos[rbc] = cache.get(rbc)
if repos[rbc] != nil {
Expand Down
13 changes: 10 additions & 3 deletions tools/syz-bq.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
set -e # exit on any problem
set -o pipefail

while getopts w:d:t:n:r:b: option
while getopts w:d:t:n:r:b:j: option
do
case "${option}"
in
Expand All @@ -15,6 +15,7 @@ do
n)namespace=${OPTARG};;
r)repo=${OPTARG};;
b)branch=${OPTARG};;
j)jobs=${OPTARG};;
esac
done

Expand Down Expand Up @@ -49,6 +50,11 @@ then
exit
fi

if [ -z "$jobs" ]
then
jobs=1
fi

# it also allows to early check gcloud credentials
echo "making sure spanner table 'files' exists"
create_table=$( echo -n '
Expand Down Expand Up @@ -121,14 +127,15 @@ sessionDir="$workdir/sessions/$sessionID"
mkdir -p $sessionDir
gcloud storage cp $gsURI $sessionDir
cat $sessionDir/*.csv.gz | gunzip | \
go run ./tools/syz-covermerger/ -workdir $workdir \
go run -ldflags="-s=false" ./tools/syz-covermerger/ -workdir $workdir \
-jobs $jobs \
-repo $repo \
-branch $branch \
-commit $base_commit \
-save-to-spanner true \
-namespace $namespace \
-duration $duration \
-date-to $to_date
-date-to $to_date &> /dev/null

echo Cleanup
rm -rf $sessionDir
7 changes: 7 additions & 0 deletions tools/syz-covermerger/syz_covermerger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"sort"

Expand All @@ -27,11 +29,16 @@ var (
flagDateTo = flag.String("date-to", "", "[optional] used to mark DB records")
flagSaveToSpanner = flag.String("save-to-spanner", "", "[optional] save aggregation to spanner")
flagProjectID = flag.String("project-id", "syzkaller", "[optional] target spanner db project")
flagJobs = flag.Int64("jobs", 1, "[optional] jobs count")
)

func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
flag.Parse()
config := &covermerger.Config{
Jobs: *flagJobs,
Workdir: *flagWorkdir,
Base: covermerger.RepoBranchCommit{
Repo: *flagRepo,
Expand Down

0 comments on commit 13a97eb

Please sign in to comment.