Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into jtibs/heap-trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
jtibshirani committed Oct 29, 2024
2 parents 5686b76 + bfd8ee8 commit 8bc4f26
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 78 deletions.
26 changes: 24 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,32 @@ jobs:
steps:
- name: checkout
uses: actions/checkout@v3

- name: add dependencies
run: apk add go git
run: apk add go git tar

- name: Cache ctags
uses: actions/cache@v3
with:
path: /usr/local/bin/universal-ctags
key: ${{ runner.os }}-ctags-${{ hashFiles('install-ctags-alpine.sh') }}

- name: Cache Go modules
uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: install ctags
run: ./install-ctags-alpine.sh
run: |
if [ ! -f /usr/local/bin/universal-ctags ]; then
./install-ctags-alpine.sh
fi
- name: test
run: go test ./...

Expand Down
27 changes: 16 additions & 11 deletions cmd/zoekt-merge-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ import (
"github.com/sourcegraph/zoekt"
)

func merge(dstDir string, names []string) error {
// merge merges the input shards into a compound shard in dstDir. It returns the
// full path to the compound shard. The input shards are removed on success.
func merge(dstDir string, names []string) (string, error) {
var files []zoekt.IndexFile
for _, fn := range names {
f, err := os.Open(fn)
if err != nil {
return err
return "", nil
}
defer f.Close()

indexFile, err := zoekt.NewIndexFile(f)
if err != nil {
return err
return "", err
}
defer indexFile.Close()

Expand All @@ -31,39 +33,40 @@ func merge(dstDir string, names []string) error {

tmpName, dstName, err := zoekt.Merge(dstDir, files...)
if err != nil {
return err
return "", err
}

// Delete input shards.
for _, name := range names {
paths, err := zoekt.IndexFilePaths(name)
if err != nil {
return fmt.Errorf("zoekt-merge-index: %w", err)
return "", fmt.Errorf("zoekt-merge-index: %w", err)
}
for _, p := range paths {
if err := os.Remove(p); err != nil {
return fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err)
return "", fmt.Errorf("zoekt-merge-index: failed to remove simple shard: %w", err)
}
}
}

// We only rename the compound shard if all simple shards could be deleted in the
// previous step. This guarantees we won't have duplicate indexes.
if err := os.Rename(tmpName, dstName); err != nil {
return fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err)
return "", fmt.Errorf("zoekt-merge-index: failed to rename compound shard: %w", err)
}
return nil

return dstName, nil
}

func mergeCmd(paths []string) error {
func mergeCmd(paths []string) (string, error) {
if paths[0] == "-" {
paths = []string{}
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
paths = append(paths, strings.TrimSpace(scanner.Text()))
}
if err := scanner.Err(); err != nil {
return err
return "", err
}
log.Printf("merging %d paths from stdin", len(paths))
}
Expand Down Expand Up @@ -129,9 +132,11 @@ func explodeCmd(path string) error {
func main() {
switch subCommand := os.Args[1]; subCommand {
case "merge":
if err := mergeCmd(os.Args[2:]); err != nil {
compoundShardPath, err := mergeCmd(os.Args[2:])
if err != nil {
log.Fatal(err)
}
fmt.Println(compoundShardPath)
case "explode":
if err := explodeCmd(os.Args[2]); err != nil {
log.Fatal(err)
Expand Down
82 changes: 26 additions & 56 deletions cmd/zoekt-merge-index/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,80 @@ import (
"sort"
"testing"

"github.com/stretchr/testify/require"

"github.com/sourcegraph/zoekt"
"github.com/sourcegraph/zoekt/query"
"github.com/sourcegraph/zoekt/shards"
)

func TestMerge(t *testing.T) {
v16Shards, err := filepath.Glob("../../testdata/shards/*_v16.*.zoekt")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
sort.Strings(v16Shards)

testShards, err := copyTestShards(t.TempDir(), v16Shards)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log(testShards)

dir := t.TempDir()
err = merge(dir, testShards)
if err != nil {
t.Fatal(err)
}
cs, err := merge(dir, testShards)
require.NoError(t, err)
// The name of the compound shard is based on the merged repos, so it should be
// stable
require.Equal(t, filepath.Base(cs), "compound-ea9613e2ffba7d7361856aebfca75fb714856509_v17.00000.zoekt")

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
require.NoError(t, err)
defer ss.Close()

q, err := query.Parse("hello")
if err != nil {
t.Fatalf("Parse(hello): %v", err)
}
require.NoError(t, err)

var sOpts zoekt.SearchOptions
ctx := context.Background()
result, err := ss.Search(ctx, q, &sOpts)
if err != nil {
t.Fatalf("Search(%v): %v", q, err)
}
require.NoError(t, err)

// we are merging the same shard twice, so we expect the same file twice.
if len(result.Files) != 2 {
t.Errorf("got %v, want 2 files.", result.Files)
}
require.Len(t, result.Files, 2)
}

// Merge 2 simple shards and then explode them.
func TestExplode(t *testing.T) {
v16Shards, err := filepath.Glob("../../testdata/shards/repo*_v16.*.zoekt")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
sort.Strings(v16Shards)

testShards, err := copyTestShards(t.TempDir(), v16Shards)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log(testShards)

dir := t.TempDir()
err = merge(dir, testShards)
if err != nil {
t.Fatal(err)
}
_, err = merge(dir, testShards)
require.NoError(t, err)

cs, err := filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
err = explode(dir, cs[0])
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

cs, err = filepath.Glob(filepath.Join(dir, "compound-*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if len(cs) != 0 {
t.Fatalf("explode should have deleted the compound shard if it returned without error")
}

exploded, err := filepath.Glob(filepath.Join(dir, "*.zoekt"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if len(exploded) != len(testShards) {
t.Fatalf("the number of simple shards before %d and after %d should be the same", len(testShards), len(exploded))
}

ss, err := shards.NewDirectorySearcher(dir)
if err != nil {
t.Fatalf("NewDirectorySearcher(%s): %v", dir, err)
}
require.NoError(t, err)
defer ss.Close()

var sOpts zoekt.SearchOptions
Expand All @@ -132,16 +108,10 @@ func TestExplode(t *testing.T) {
for _, c := range cases {
t.Run(c.searchLiteral, func(t *testing.T) {
q, err := query.Parse(c.searchLiteral)
if err != nil {
t.Fatalf("Parse(%s): %v", c.searchLiteral, err)
}
require.NoError(t, err)
result, err := ss.Search(ctx, q, &sOpts)
if err != nil {
t.Fatalf("Search(%v): %v", q, err)
}
if got := len(result.Files); got != c.wantResults {
t.Fatalf("wanted %d results, got %d", c.wantResults, got)
}
require.NoError(t, err)
require.Len(t, result.Files, c.wantResults)
})
}
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/zoekt-mirror-gitlab/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/sourcegraph/zoekt/gitindex"
gitlab "github.com/xanzy/go-gitlab"
Expand All @@ -49,6 +50,7 @@ func main() {
excludeUserRepos := flag.Bool("exclude_user", false, "exclude user repos")
namePattern := flag.String("name", "", "only clone repos whose name matches the given regexp.")
excludePattern := flag.String("exclude", "", "don't mirror repos whose names match this regexp.")
lastActivityAfter := flag.String("last_activity_after", "", "only mirror repos that have been active since this date (format: 2006-01-02).")
flag.Parse()

if *dest == "" {
Expand Down Expand Up @@ -90,6 +92,14 @@ func main() {
opt.Visibility = gitlab.Visibility(gitlab.PublicVisibility)
}

if *lastActivityAfter != "" {
targetDate, err := time.Parse("2006-01-02", *lastActivityAfter)
if err != nil {
log.Fatal(err)
}
opt.LastActivityAfter = gitlab.Time(targetDate)
}

var gitlabProjects []*gitlab.Project
for {
projects, _, err := client.Projects.ListProjects(opt)
Expand Down
7 changes: 4 additions & 3 deletions cmd/zoekt-sourcegraph-indexserver/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (s *Server) vacuum() {
path := filepath.Join(s.IndexDir, fn)
info, err := os.Stat(path)
if err != nil {
debug.Printf("vacuum stat failed: %v", err)
log.Printf("vacuum stat failed: %v", err)
continue
}

Expand All @@ -422,8 +422,9 @@ func (s *Server) vacuum() {
})

if err != nil {
debug.Printf("failed to explode compound shard %s: %s", path, string(b))
log.Printf("failed to explode compound shard: shard=%s out=%s err=%s", path, string(b), err)
}
log.Printf("exploded compound shard: shard=%s", path)
continue
}

Expand All @@ -432,7 +433,7 @@ func (s *Server) vacuum() {
})

if err != nil {
debug.Printf("error while removing tombstones in %s: %s", fn, err)
log.Printf("error while removing tombstones in %s: %s", path, err)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/zoekt-sourcegraph-indexserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var (
metricIndexingDelay = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "index_indexing_delay_seconds",
Help: "A histogram of durations from when an index job is added to the queue, to the time it completes.",
Buckets: prometheus.ExponentialBuckets(60, 2, 12), // 1m -> ~3 days
Buckets: prometheus.ExponentialBuckets(60, 2, 14), // 1 minute -> 5.5 days
}, []string{
"state", // state is an indexState
"name", // the name of the repository that was indexed
Expand Down Expand Up @@ -1264,8 +1264,8 @@ func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) {
fs.BoolVar(&rc.disableShardMerging, "shard_merging", getEnvWithDefaultBool("SRC_DISABLE_SHARD_MERGING", false), "disable shard merging")
fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", getEnvWithDefaultDuration("SRC_VACUUM_INTERVAL", 24*time.Hour), "run vacuum this often")
fs.DurationVar(&rc.mergeInterval, "merge_interval", getEnvWithDefaultDuration("SRC_MERGE_INTERVAL", 8*time.Hour), "run merge this often")
fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 2000), "the target size of compound shards in MiB")
fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB")
fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_MERGE_TARGET_SIZE", 1000), "the target size of compound shards in MiB")
fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MERGE_MIN_SIZE", 800), "the minimum size of a compound shard in MiB")
fs.IntVar(&rc.minAgeDays, "merge_min_age", getEnvWithDefaultInt("SRC_MERGE_MIN_AGE", 7), "the time since the last commit in days. Shards with newer commits are excluded from merging.")
}

Expand Down
19 changes: 16 additions & 3 deletions cmd/zoekt-sourcegraph-indexserver/merge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"log"
"os"
"os/exec"
Expand Down Expand Up @@ -88,14 +89,26 @@ func (s *Server) merge(mergeCmd func(args ...string) *exec.Cmd) {
}

start := time.Now()
out, err := mergeCmd(paths...).CombinedOutput()

metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(time.Since(start).Seconds())
cmd := mergeCmd(paths...)

// zoekt-merge-index writes the full path of the new compound shard to stdout.
stdoutBuf := &bytes.Buffer{}
stderrBuf := &bytes.Buffer{}
cmd.Stdout = stdoutBuf
cmd.Stderr = stderrBuf

err := cmd.Run()

durationSeconds := time.Since(start).Seconds()
metricShardMergingDuration.WithLabelValues(strconv.FormatBool(err != nil)).Observe(durationSeconds)
if err != nil {
log.Printf("mergeCmd: out=%s, err=%s", out, err)
log.Printf("error merging shards: stdout=%s, stderr=%s, durationSeconds=%.2f err=%s", stdoutBuf.String(), stderrBuf.String(), durationSeconds, err)
return
}

log.Printf("finished merging: shard=%s durationSeconds=%.2f", stdoutBuf.String(), durationSeconds)

next = true
})
}
Expand Down

0 comments on commit 8bc4f26

Please sign in to comment.