Skip to content

Commit

Permalink
Merge pull request #2723 from nirs/parallel-downloads
Browse files Browse the repository at this point in the history
Fix parallel downloads of the same image
  • Loading branch information
AkihiroSuda authored Oct 11, 2024
2 parents 5d41ca2 + f2fb061 commit a07b7f9
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions pkg/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -262,14 +263,11 @@ func Download(ctx context.Context, local, remote string, opts ...Opt) (*Result,
return res, nil
}
}
if err := os.RemoveAll(shad); err != nil {
return nil, err
}
if err := os.MkdirAll(shad, 0o700); err != nil {
return nil, err
}
shadURL := filepath.Join(shad, "url")
if err := os.WriteFile(shadURL, []byte(remote), 0o644); err != nil {
if err := atomicWrite(shadURL, []byte(remote), 0o644); err != nil {
return nil, err
}
if err := downloadHTTP(ctx, shadData, shadTime, shadType, remote, o.description, o.expectedDigest); err != nil {
Expand All @@ -280,7 +278,7 @@ func Download(ctx context.Context, local, remote string, opts ...Opt) (*Result,
return nil, err
}
if shadDigest != "" && o.expectedDigest != "" {
if err := os.WriteFile(shadDigest, []byte(o.expectedDigest.String()), 0o644); err != nil {
if err := atomicWrite(shadDigest, []byte(o.expectedDigest.String()), 0o644); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -600,10 +598,8 @@ func downloadHTTP(ctx context.Context, localPath, lastModified, contentType, url
return fmt.Errorf("downloadHTTP: got empty localPath")
}
logrus.Debugf("downloading %q into %q", url, localPath)
localPathTmp := localPath + ".tmp"
if err := os.RemoveAll(localPathTmp); err != nil {
return err
}

localPathTmp := perProcessTempfile(localPath)
fileWriter, err := os.Create(localPathTmp)
if err != nil {
return err
Expand All @@ -616,13 +612,13 @@ func downloadHTTP(ctx context.Context, localPath, lastModified, contentType, url
}
if lastModified != "" {
lm := resp.Header.Get("Last-Modified")
if err := os.WriteFile(lastModified, []byte(lm), 0o644); err != nil {
if err := atomicWrite(lastModified, []byte(lm), 0o644); err != nil {
return err
}
}
if contentType != "" {
ct := resp.Header.Get("Content-Type")
if err := os.WriteFile(contentType, []byte(ct), 0o644); err != nil {
if err := atomicWrite(contentType, []byte(ct), 0o644); err != nil {
return err
}
}
Expand Down Expand Up @@ -674,10 +670,44 @@ func downloadHTTP(ctx context.Context, localPath, lastModified, contentType, url
if err := fileWriter.Close(); err != nil {
return err
}
if err := os.RemoveAll(localPath); err != nil {

return os.Rename(localPathTmp, localPath)
}

// To allow parallel download we use a per-process unique suffix for tempoary
// files. Renaming the temporary file to the final file is safe without
// synchronization on posix.
// https://github.com/lima-vm/lima/issues/2722
func perProcessTempfile(path string) string {
return path + ".tmp." + strconv.FormatInt(int64(os.Getpid()), 10)
}

// atomicWrite writes data to path, creating a new file or replacing existing
// one. Multiple processess can write to the same path safely. Safe on posix and
// likely safe on windows when using NTFS.
func atomicWrite(path string, data []byte, perm os.FileMode) error {
tmpPath := perProcessTempfile(path)
tmp, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
defer func() {
if err != nil {
tmp.Close()
os.RemoveAll(tmpPath)
}
}()
if _, err = tmp.Write(data); err != nil {
return err
}
return os.Rename(localPathTmp, localPath)
if err = tmp.Sync(); err != nil {
return err
}
if err = tmp.Close(); err != nil {
return err
}
err = os.Rename(tmpPath, path)
return err
}

// CacheEntries returns a map of cache entries.
Expand Down

0 comments on commit a07b7f9

Please sign in to comment.