Skip to content

Commit

Permalink
Test parallel downloads
Browse files Browse the repository at this point in the history
Ad sub tests for parallel downloads with "with cache" and "cahing-only
mode". The test run 10 concurrent downloads with the current test http
server and ensure that downloads succeeded.

To make it possible to test using the same process, we include now a
counter in the perProcessTempfile. The file name is now:

    {filename}.pid.{count}

The test revealed an issue with clonefile, not seen when testing 3
parallel `limactl create`. When the data file is replaced during a
clonefile syscall, it may fail with:

    clonefile failed: no such file or directory

This smells like a darwin bug, but we can avoid this issue by using the
first download result. When a download finishes, we check if data file
exists, and return success if it does. We take a lock for the very
short time needed to check and rename the temporary data file to the
target file.

Tested using:

    % go test ./pkg/downloader -run TestDownloadRemote/with_cache/parallel -count=1000 -timeout=0
    ok  	github.com/lima-vm/lima/pkg/downloader	487.422s

Signed-off-by: Nir Soffer <[email protected]>
  • Loading branch information
nirs committed Oct 13, 2024
1 parent 11eb750 commit c7ff03e
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 32 deletions.
29 changes: 22 additions & 7 deletions pkg/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/cheggaaa/pb/v3"
"github.com/containerd/continuity/fs"
"github.com/lima-vm/lima/pkg/httpclientutil"
"github.com/lima-vm/lima/pkg/localpathutil"
"github.com/lima-vm/lima/pkg/lockutil"
"github.com/lima-vm/lima/pkg/progressbar"
"github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -273,15 +274,15 @@ func Download(ctx context.Context, local, remote string, opts ...Opt) (*Result,
if err := downloadHTTP(ctx, shadData, shadTime, shadType, remote, o.description, o.expectedDigest); err != nil {
return nil, err
}
// no need to pass the digest to copyLocal(), as we already verified the digest
if err := copyLocal(ctx, localPath, shadData, ext, o.decompress, "", ""); err != nil {
return nil, err
}
if shadDigest != "" && o.expectedDigest != "" {
if err := atomicWrite(shadDigest, []byte(o.expectedDigest.String()), 0o644); err != nil {
return nil, err
}
}
// no need to pass the digest to copyLocal(), as we already verified the digest
if err := copyLocal(ctx, localPath, shadData, ext, o.decompress, "", ""); err != nil {
return nil, err
}
res := &Result{
Status: StatusDownloaded,
CachePath: shadData,
Expand Down Expand Up @@ -672,15 +673,29 @@ func downloadHTTP(ctx context.Context, localPath, lastModified, contentType, url
return err
}

return os.Rename(localPathTmp, localPath)
// If localPath was created by a parallel download keep it. Replacing it
// while another process is copying it to the destination may fail the
// clonefile syscall. We use a lock to ensure that only one process updates
// data, and when we return data file exists.

return lockutil.WithDirLock(filepath.Dir(localPath), func() error {
if _, err := os.Stat(localPath); err == nil {
return nil
}
return os.Rename(localPathTmp, localPath)
})
}

var tempfileCount atomic.Uint64

// To allow parallel download we use a per-process unique suffix for tempoary
// files. Renaming the temporary file to the final file is safe without
// synchronization on posix.
// To make it easy to test we also include a counter ensuring that each
// temporary file is unique in the same process.
// https://github.com/lima-vm/lima/issues/2722
func perProcessTempfile(path string) string {
return path + ".tmp." + strconv.FormatInt(int64(os.Getpid()), 10)
return fmt.Sprintf("%s.tmp.%d.%d", path, os.Getpid(), tempfileCount.Add(1))
}

// atomicWrite writes data to path, creating a new file or replacing existing
Expand Down
117 changes: 92 additions & 25 deletions pkg/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"path/filepath"
"runtime"
"slices"
"strings"
"testing"
"time"
Expand All @@ -21,6 +22,20 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

type downloadResult struct {
r *Result
err error
}

// We expect only few parallel downloads. Testing with larger number to find
// races quicker. 20 parallel downloads take about 0.5 seocnds on M1 Pro.
const parallelDownloads = 20

// When downloading in parallel usually all downloads completed with
// StatusDownload, but some may be delayed and find the data file when they
// start. Can be reproduced locally using 100 parallel downloads.
var parallelStatus []Status = []Status{StatusDownloaded, StatusUsedCache}

Check failure on line 37 in pkg/downloader/downloader_test.go

View workflow job for this annotation

GitHub Actions / Lints

var-declaration: should omit type []Status from declaration of var parallelStatus; it will be inferred from the right-hand side (revive)

func TestDownloadRemote(t *testing.T) {
ts := httptest.NewServer(http.FileServer(http.Dir("testdata")))
t.Cleanup(ts.Close)
Expand Down Expand Up @@ -57,38 +72,90 @@ func TestDownloadRemote(t *testing.T) {
})
})
t.Run("with cache", func(t *testing.T) {
cacheDir := filepath.Join(t.TempDir(), "cache")
localPath := filepath.Join(t.TempDir(), t.Name())
r, err := Download(context.Background(), localPath, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusDownloaded, r.Status)
t.Run("serial", func(t *testing.T) {
cacheDir := filepath.Join(t.TempDir(), "cache")
localPath := filepath.Join(t.TempDir(), t.Name())
r, err := Download(context.Background(), localPath, dummyRemoteFileURL,
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusDownloaded, r.Status)

r, err = Download(context.Background(), localPath, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusSkipped, r.Status)
r, err = Download(context.Background(), localPath, dummyRemoteFileURL,
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusSkipped, r.Status)

localPath2 := localPath + "-2"
r, err = Download(context.Background(), localPath2, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusUsedCache, r.Status)
localPath2 := localPath + "-2"
r, err = Download(context.Background(), localPath2, dummyRemoteFileURL,
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusUsedCache, r.Status)
})
t.Run("parallel", func(t *testing.T) {
cacheDir := filepath.Join(t.TempDir(), "cache")
results := make(chan downloadResult, parallelDownloads)
for i := 0; i < parallelDownloads; i++ {
go func() {
// Parallel download is supported only for different instances with unique localPath.
localPath := filepath.Join(t.TempDir(), t.Name())
r, err := Download(context.Background(), localPath, dummyRemoteFileURL,
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
results <- downloadResult{r, err}
}()
}
// We must process all results before cleanup.
for i := 0; i < parallelDownloads; i++ {
result := <-results
if result.err != nil {
t.Errorf("Download failed: %s", result.err)
} else if !slices.Contains(parallelStatus, result.r.Status) {
t.Errorf("Expected download status %s, got %s", parallelStatus, result.r.Status)
}
}
})
})
t.Run("caching-only mode", func(t *testing.T) {
_, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest))
assert.ErrorContains(t, err, "cache directory to be specified")
t.Run("serial", func(t *testing.T) {
_, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest))
assert.ErrorContains(t, err, "cache directory to be specified")

cacheDir := filepath.Join(t.TempDir(), "cache")
r, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusDownloaded, r.Status)
cacheDir := filepath.Join(t.TempDir(), "cache")
r, err := Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest),
WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusDownloaded, r.Status)

r, err = Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusUsedCache, r.Status)
r, err = Download(context.Background(), "", dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest),
WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusUsedCache, r.Status)

localPath := filepath.Join(t.TempDir(), t.Name())
r, err = Download(context.Background(), localPath, dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusUsedCache, r.Status)
localPath := filepath.Join(t.TempDir(), t.Name())
r, err = Download(context.Background(), localPath, dummyRemoteFileURL,
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
assert.NilError(t, err)
assert.Equal(t, StatusUsedCache, r.Status)
})
t.Run("parallel", func(t *testing.T) {
cacheDir := filepath.Join(t.TempDir(), "cache")
results := make(chan downloadResult, parallelDownloads)
for i := 0; i < parallelDownloads; i++ {
go func() {
r, err := Download(context.Background(), "", dummyRemoteFileURL,
WithExpectedDigest(dummyRemoteFileDigest), WithCacheDir(cacheDir))
results <- downloadResult{r, err}
}()
}
// We must process all results before cleanup.
for i := 0; i < parallelDownloads; i++ {
result := <-results
if result.err != nil {
t.Errorf("Download failed: %s", result.err)
} else if !slices.Contains(parallelStatus, result.r.Status) {
t.Errorf("Expected download status %s, got %s", parallelStatus, result.r.Status)
}
}
})
})
t.Run("cached", func(t *testing.T) {
_, err := Cached(dummyRemoteFileURL, WithExpectedDigest(dummyRemoteFileDigest))
Expand Down

0 comments on commit c7ff03e

Please sign in to comment.