Skip to content

Commit

Permalink
covermerger: add more control options and spanner integration
Browse files Browse the repository at this point in the history
  • Loading branch information
tarasmadan committed Jun 26, 2024
1 parent 6271cc7 commit 5c045c0
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 173 deletions.
164 changes: 102 additions & 62 deletions pkg/covermerger/covermerger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,19 @@ import (
"encoding/csv"
"fmt"
"io"
"log"
"strconv"

"golang.org/x/exp/maps"
)

const (
keyKernelRepo = "kernel_repo"
keyKernelBranch = "kernel_branch"
keyKernelCommit = "kernel_commit"
keyFilePath = "file_path"
keyStartLine = "sl"
keyStartCol = "sc"
keyEndLine = "el"
keyEndCol = "ec"
keyHitCount = "hit_count"
keyArch = "arch"
KeyKernelRepo = "kernel_repo"
KeyKernelBranch = "kernel_branch"
KeyKernelCommit = "kernel_commit"
KeyFilePath = "file_path"
KeyStartLine = "sl"
KeyHitCount = "hit_count"
)

type FileRecord map[string]string
Expand All @@ -35,9 +32,9 @@ type RepoBranchCommit struct {

func (fr FileRecord) RepoBranchCommit() RepoBranchCommit {
return RepoBranchCommit{
fr[keyKernelRepo],
fr[keyKernelBranch],
fr[keyKernelCommit],
fr[KeyKernelRepo],
fr[KeyKernelBranch],
fr[KeyKernelCommit],
}
}

Expand All @@ -48,65 +45,69 @@ type Frame struct {
EndCol int
}

func (fr FileRecord) Frame() Frame {
f := Frame{}
func (fr FileRecord) Frame() (*Frame, error) {
f := &Frame{}
var err error
if f.StartCol, err = strconv.Atoi(fr[keyStartCol]); err != nil {
panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyStartCol]))
if f.StartLine, err = strconv.Atoi(fr[KeyStartLine]); err != nil {
return nil, fmt.Errorf("failed to Atoi(%s): %w", fr[KeyStartLine], err)
}
if f.StartLine, err = strconv.Atoi(fr[keyStartLine]); err != nil {
panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyStartLine]))
}
if f.EndCol, err = strconv.Atoi(fr[keyEndCol]); err != nil {
panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyEndCol]))
}
if f.EndLine, err = strconv.Atoi(fr[keyEndLine]); err != nil {
panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyEndLine]))
}
return f
return f, nil
}

func (fr FileRecord) HitCount() int {
if hitCount, err := strconv.Atoi(fr[keyHitCount]); err != nil {
panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyHitCount]))
func (fr FileRecord) HitCount() (int, error) {
if hitCount, err := strconv.Atoi(fr[KeyHitCount]); err != nil {
return 0, fmt.Errorf("failed to Atoi(%s): %w", fr[KeyHitCount], err)
} else {
return hitCount
return hitCount, nil
}
}

func (fr FileRecord) Arch() string {
return fr[keyArch]
}

type MergeResult struct {
HitCounts map[int]int
FileExists bool
LostFrames map[RepoBranchCommit]int64
}

type FileCoverageMerger interface {
AddRecord(rbc RepoBranchCommit, arch string, f Frame, hitCount int)
AddRecord(rbc RepoBranchCommit, f *Frame, hitCount int)
Result() *MergeResult
}

func batchFileData(c *Config, targetFilePath string, records FileRecords, base RepoBranchCommit,
func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{},
) (*MergeResult, error) {
log.Printf("processing %d records for %s", len(records), targetFilePath)
if _, exists := processedFiles[targetFilePath]; exists {
return nil, fmt.Errorf("file was already processed, check the input ordering")
}
processedFiles[targetFilePath] = struct{}{}
repoBranchCommitsMap := make(map[RepoBranchCommit]bool)
for _, record := range records {
repoBranchCommitsMap[record.RepoBranchCommit()] = true
}
repoBranchCommitsMap[base] = true
repoBranchCommitsMap[c.Base] = true
repoBranchCommits := maps.Keys(repoBranchCommitsMap)
fileVersions, err := getFileVersions(c, targetFilePath, repoBranchCommits)
getFiles := getFileVersions
if c.getFileVersionsMock != nil {
getFiles = c.getFileVersionsMock
}
fvs, err := getFiles(c, targetFilePath, repoBranchCommits)
if err != nil {
return nil, fmt.Errorf("failed to getFileVersions: %w", err)
}
merger := makeFileLineCoverMerger(fileVersions, base)
merger := makeFileLineCoverMerger(fvs, c.Base)
for _, record := range records {
var f *Frame
if f, err = record.Frame(); err != nil {
return nil, fmt.Errorf("error parsing records: %w", err)
}
var hitCount int
if hitCount, err = record.HitCount(); err != nil {
return nil, fmt.Errorf("error parsing records: %w", err)
}
merger.AddRecord(
record.RepoBranchCommit(),
record.Arch(),
record.Frame(),
record.HitCount())
f,
hitCount)
}
return merger.Result(), nil
}
Expand All @@ -124,37 +125,76 @@ func makeRecord(fields, schema []string) FileRecord {
}

type Config struct {
Workdir string
skipRepoClone bool
Workdir string
skipRepoClone bool
Base RepoBranchCommit
getFileVersionsMock func(*Config, string, []RepoBranchCommit) (fileVersions, error)
repoCache repoCache
}

func AggregateStreamData(c *Config, stream io.Reader, base RepoBranchCommit,
) (map[string]*MergeResult, error) {
stat := make(map[string]*MergeResult)
func isSchema(fields, schema []string) bool {
if len(fields) != len(schema) {
return false
}
for i := 0; i < len(fields); i++ {
if fields[i] != schema[i] {
return false
}
}
return true
}

func MergeCSVData(config *Config, reader io.Reader) (map[string]*MergeResult, error) {
var schema []string
targetFile := ""
var records FileRecords
csvReader := csv.NewReader(stream)
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
return nil, fmt.Errorf("failed to read schema: %w", err)
} else {
schema = fields
}
for {
fields, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("failed to read CSV line: %w", err)
errStdinReadChan := make(chan error, 1)
recordsChan := make(chan FileRecord)
go func() {
defer close(recordsChan)
for {
fields, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
errStdinReadChan <- fmt.Errorf("failed to read CSV line: %w", err)
return
}
if isSchema(fields, schema) {
// The input may be the merged CVS files with multiple schemas.
continue
}
recordsChan <- makeRecord(fields, schema)
}
record := makeRecord(fields, schema)
curTargetFile := record[keyFilePath]
errStdinReadChan <- nil
}()
mergeResult, errMerging := mergeChanData(config, recordsChan)
errStdinRead := <-errStdinReadChan
if errMerging != nil || errStdinRead != nil {
return nil, fmt.Errorf("errors merging stdin data:\nmerger err: %w\nstdin reader err: %w",
errMerging, errStdinRead)
}
return mergeResult, nil
}

func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeResult, error) {
stat := make(map[string]*MergeResult)
targetFile := ""
var records []FileRecord
processedFiles := map[string]struct{}{}
for record := range recordsChan {
curTargetFile := record[KeyFilePath]
if targetFile == "" {
targetFile = curTargetFile
}
if curTargetFile != targetFile {
if stat[targetFile], err = batchFileData(c, targetFile, records, base); err != nil {
var err error
if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
}
records = nil
Expand All @@ -164,7 +204,7 @@ func AggregateStreamData(c *Config, stream io.Reader, base RepoBranchCommit,
}
if records != nil {
var err error
if stat[targetFile], err = batchFileData(c, targetFile, records, base); err != nil {
if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
}
}
Expand Down
34 changes: 25 additions & 9 deletions pkg/covermerger/covermerger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,41 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
aggregation, err := AggregateStreamData(
aggregation, err := MergeCSVData(
&Config{
Workdir: test.workdir,
skipRepoClone: true,
Base: RepoBranchCommit{
Repo: test.baseRepo,
Branch: test.baseBranch,
Commit: test.baseCommit,
},
getFileVersionsMock: mockGetFileVersions,
},
strings.NewReader(test.bqTable),
RepoBranchCommit{
Repo: test.baseRepo,
Branch: test.baseBranch,
Commit: test.baseCommit,
})
)
assert.Nil(t, err)
var simpleAggregationJSON map[string]*MergeResult
assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &simpleAggregationJSON))
assert.Equal(t, simpleAggregationJSON, aggregation)
var expectedAggregation map[string]*MergeResult
assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
assert.Equal(t, expectedAggregation, aggregation)
})
}
}

func mockGetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
) (fileVersions, error) {
res := make(fileVersions)
for _, rbc := range rbcs {
filePath := c.Workdir + "/repos/" + rbc.Commit + "/" + targetFilePath
if bytes, err := os.ReadFile(filePath); err == nil {
res[rbc] = fileVersion{
content: string(bytes),
}
}
}
return res, nil
}

func readFileOrFail(t *testing.T, path string) string {
absPath, err := filepath.Abs(path)
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/covermerger/deleted_file_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package covermerger
type DeletedFileLineMerger struct {
}

func (a *DeletedFileLineMerger) AddRecord(RepoBranchCommit, string, Frame, int) {
func (a *DeletedFileLineMerger) AddRecord(RepoBranchCommit, *Frame, int) {
}

func (a *DeletedFileLineMerger) Result() *MergeResult {
Expand Down
41 changes: 27 additions & 14 deletions pkg/covermerger/file_line_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package covermerger

func makeFileLineCoverMerger(
fileVersions map[RepoBranchCommit]string, base RepoBranchCommit) FileCoverageMerger {
fvs fileVersions, base RepoBranchCommit) FileCoverageMerger {
baseFile := ""
baseFileExists := false
for rbc, fileContent := range fileVersions {
for rbc, fv := range fvs {
if rbc == base {
baseFile = fileContent
baseFile = fv.content
baseFileExists = true
break
}
Expand All @@ -18,33 +18,46 @@ func makeFileLineCoverMerger(
return &DeletedFileLineMerger{}
}
a := &FileLineCoverMerger{
rbcToFile: fileVersions,
baseFile: baseFile,
hitCounts: make(map[int]int),
matchers: make(map[RepoBranchCommit]*LineToLineMatcher),
rbcToFile: fvs,
baseFile: baseFile,
hitCounts: make(map[int]int),
matchers: make(map[RepoBranchCommit]*LineToLineMatcher),
lostFrames: map[RepoBranchCommit]int64{},
}
for rbc, fileVersion := range fileVersions {
a.matchers[rbc] = makeLineToLineMatcher(fileVersion, baseFile)
for rbc, fv := range fvs {
a.matchers[rbc] = makeLineToLineMatcher(fv.content, baseFile)
}
return a
}

type FileLineCoverMerger struct {
rbcToFile map[RepoBranchCommit]string
baseFile string
hitCounts map[int]int
matchers map[RepoBranchCommit]*LineToLineMatcher
rbcToFile fileVersions
baseFile string
hitCounts map[int]int
matchers map[RepoBranchCommit]*LineToLineMatcher
lostFrames map[RepoBranchCommit]int64
}

func (a *FileLineCoverMerger) AddRecord(rbc RepoBranchCommit, arch string, f Frame, hitCount int) {
func (a *FileLineCoverMerger) AddRecord(rbc RepoBranchCommit, f *Frame, hitCount int) {
if a.matchers[rbc] == nil {
if hitCount > 0 {
a.lostFrames[rbc]++
}
return
}
if targetLine := a.matchers[rbc].SameLinePos(f.StartLine); targetLine != -1 {
a.hitCounts[f.StartLine] += hitCount
}
}

func (a *FileLineCoverMerger) Result() *MergeResult {
lostFrames := a.lostFrames
if len(lostFrames) == 0 {
lostFrames = nil
}
return &MergeResult{
HitCounts: a.hitCounts,
FileExists: true,
LostFrames: lostFrames,
}
}
Loading

0 comments on commit 5c045c0

Please sign in to comment.