Skip to content

Commit

Permalink
Merge pull request moby#2344 from tonistiigi/zstd
Browse files Browse the repository at this point in the history
exporter: support creating blobs with zstd compression
  • Loading branch information
AkihiroSuda authored Sep 7, 2021
2 parents 9b010e7 + a5e0b86 commit ea773f6
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 247 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Keys supported by image output:
* `unpack=true`: unpack image after creation (for use with containerd)
* `dangling-name-prefix=[value]`: name image with `prefix@<digest>` , used for anonymous images
* `name-canonical=true`: add additional canonical name `name@<digest>`
* `compression=[uncompressed,gzip,estargz]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `compression=[uncompressed,gzip,estargz,zstd]`: choose compression type for layers newly created and cached, gzip is default value. estargz should be used with `oci-mediatypes=true`.
* `force-compression=true`: forcefully apply `compression` option to all layers (including already existing layers).

If credentials are required, `buildctl` will attempt to read Docker configuration file `$DOCKER_CONFIG/config.json`.
Expand Down
8 changes: 8 additions & 0 deletions cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/mount"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/flightcontrol"
Expand Down Expand Up @@ -79,6 +80,9 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
case compression.EStargz:
compressorFunc, finalize = writeEStargz()
mediaType = ocispecs.MediaTypeImageLayerGzip
case compression.Zstd:
compressorFunc = zstdWriter
mediaType = ocispecs.MediaTypeImageLayer + "+zstd"
default:
return nil, errors.Errorf("unknown layer compression type: %q", compressionType)
}
Expand Down Expand Up @@ -350,3 +354,7 @@ func ensureCompression(ctx context.Context, ref *immutableRef, compressionType c
})
return err
}

func zstdWriter(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
return zstd.NewWriter(dest)
}
4 changes: 3 additions & 1 deletion cache/blobs_linux.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build linux
// +build linux

package cache
Expand Down Expand Up @@ -34,7 +35,6 @@ var emptyDesc = ocispecs.Descriptor{}
// be computed (e.g. because the mounts aren't overlayfs), it returns
// an error.
func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper []mount.Mount, mediaType string, ref string, compressorFunc compressor) (_ ocispecs.Descriptor, ok bool, err error) {

// Get upperdir location if mounts are overlayfs that can be processed by this differ.
upperdir, err := getOverlayUpperdir(lower, upper)
if err != nil {
Expand All @@ -50,6 +50,8 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
compressorFunc = func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
return ctdcompression.CompressStream(dest, ctdcompression.Gzip)
}
case ocispecs.MediaTypeImageLayer + "+zstd":
compressorFunc = zstdWriter
default:
return emptyDesc, false, errors.Errorf("unsupported diff media type: %v", mediaType)
}
Expand Down
199 changes: 110 additions & 89 deletions cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"fmt"
"io"

cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/converter"
"github.com/containerd/containerd/images/converter/uncompress"
"github.com/containerd/containerd/labels"
"github.com/klauspost/compress/zstd"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -23,11 +24,15 @@ import (
func needsConversion(mediaType string, compressionType compression.Type) (bool, error) {
switch compressionType {
case compression.Uncompressed:
if !images.IsLayerType(mediaType) || uncompress.IsUncompressedType(mediaType) {
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Uncompressed {
return false, nil
}
case compression.Gzip:
if !images.IsLayerType(mediaType) || isGzipCompressedType(mediaType) {
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Gzip {
return false, nil
}
case compression.Zstd:
if !images.IsLayerType(mediaType) || compression.FromMediaType(mediaType) == compression.Zstd {
return false, nil
}
case compression.EStargz:
Expand All @@ -49,113 +54,129 @@ func getConverter(desc ocispecs.Descriptor, compressionType compression.Type) (c
// No conversion. No need to return an error here.
return nil, nil
}

c := conversion{target: compressionType}

from := compression.FromMediaType(desc.MediaType)
switch from {
case compression.Uncompressed:
case compression.Gzip, compression.Zstd:
c.decompress = cdcompression.DecompressStream
default:
return nil, errors.Errorf("unsupported source compression type %q from mediatype %q", from, desc.MediaType)
}

switch compressionType {
case compression.Uncompressed:
return uncompress.LayerConvertFunc, nil
case compression.Gzip:
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(w), nil }
return gzipLayerConvertFunc(compressionType, convertFunc, nil), nil
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(w), nil
}
case compression.Zstd:
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return zstd.NewWriter(w)
}
case compression.EStargz:
compressorFunc, finalize := writeEStargz()
convertFunc := func(w io.Writer) (io.WriteCloser, error) { return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip) }
return gzipLayerConvertFunc(compressionType, convertFunc, finalize), nil
c.compress = func(w io.Writer) (io.WriteCloser, error) {
return compressorFunc(w, ocispecs.MediaTypeImageLayerGzip)
}
c.finalize = finalize
default:
return nil, fmt.Errorf("unknown compression type during conversion: %q", compressionType)
return nil, errors.Errorf("unknown target compression type during conversion: %q", compressionType)
}

return (&c).convert, nil
}

func gzipLayerConvertFunc(compressionType compression.Type, convertFunc func(w io.Writer) (io.WriteCloser, error), finalize func(context.Context, content.Store) (map[string]string, error)) converter.ConvertFunc {
return func(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, compressionType.String())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
zw, err := convertFunc(w)
type conversion struct {
target compression.Type
decompress func(io.Reader) (cdcompression.DecompressReadCloser, error)
compress func(w io.Writer) (io.WriteCloser, error)
finalize func(context.Context, content.Store) (map[string]string, error)
}

func (c *conversion) convert(ctx context.Context, cs content.Store, desc ocispecs.Descriptor) (*ocispecs.Descriptor, error) {
// prepare the source and destination
info, err := cs.Info(ctx, desc.Digest)
if err != nil {
return nil, err
}
labelz := info.Labels
if labelz == nil {
labelz = make(map[string]string)
}
ra, err := cs.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
defer ra.Close()
ref := fmt.Sprintf("convert-from-%s-to-%s", desc.Digest, c.target.String())
w, err := cs.Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
defer w.Close()
if err := w.Truncate(0); err != nil { // Old written data possibly remains
return nil, err
}
var zw io.WriteCloser = w
var compress io.WriteCloser
if c.compress != nil {
zw, err = c.compress(zw)
if err != nil {
return nil, err
}
defer zw.Close()
compress = zw
}

// convert this layer
diffID := digest.Canonical.Digester()
if _, err := io.Copy(zw, io.TeeReader(io.NewSectionReader(ra, 0, ra.Size()), diffID.Hash())); err != nil {
return nil, err
}
if err := zw.Close(); err != nil { // Flush the writer
return nil, err
}
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
// convert this layer
diffID := digest.Canonical.Digester()
var rdr io.Reader = io.NewSectionReader(ra, 0, ra.Size())
if c.decompress != nil {
rc, err := c.decompress(rdr)
if err != nil {
return nil, err
}

newDesc := desc
newDesc.MediaType = convertMediaTypeToGzip(desc.MediaType)
newDesc.Digest = info.Digest
newDesc.Size = info.Size
if finalize != nil {
a, err := finalize(ctx, cs)
if err != nil {
return nil, errors.Wrapf(err, "failed finalize compression")
}
for k, v := range a {
if newDesc.Annotations == nil {
newDesc.Annotations = make(map[string]string)
}
newDesc.Annotations[k] = v
}
defer rc.Close()
rdr = rc
}
if _, err := io.Copy(zw, io.TeeReader(rdr, diffID.Hash())); err != nil {
return nil, err
}
if compress != nil {
if err := compress.Close(); err != nil { // Flush the writer
return nil, err
}
return &newDesc, nil
}
}

func isGzipCompressedType(mt string) bool {
switch mt {
case
images.MediaTypeDockerSchema2LayerGzip,
images.MediaTypeDockerSchema2LayerForeignGzip,
ocispecs.MediaTypeImageLayerGzip,
ocispecs.MediaTypeImageLayerNonDistributableGzip:
return true
default:
return false
labelz[labels.LabelUncompressed] = diffID.Digest().String() // update diffID label
if err = w.Commit(ctx, 0, "", content.WithLabels(labelz)); err != nil && !errdefs.IsAlreadyExists(err) {
return nil, err
}
if err := w.Close(); err != nil {
return nil, err
}
info, err = cs.Info(ctx, w.Digest())
if err != nil {
return nil, err
}
}

func convertMediaTypeToGzip(mt string) string {
if uncompress.IsUncompressedType(mt) {
if images.IsDockerType(mt) {
mt += ".gzip"
} else {
mt += "+gzip"
newDesc := desc
newDesc.MediaType = c.target.DefaultMediaType()
newDesc.Digest = info.Digest
newDesc.Size = info.Size
if c.finalize != nil {
a, err := c.finalize(ctx, cs)
if err != nil {
return nil, errors.Wrapf(err, "failed finalize compression")
}
for k, v := range a {
if newDesc.Annotations == nil {
newDesc.Annotations = make(map[string]string)
}
newDesc.Annotations[k] = v
}
return mt
}
return mt
return &newDesc, nil
}
9 changes: 5 additions & 4 deletions cache/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"io"
"sync"

"github.com/containerd/containerd/archive/compression"
cdcompression "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
Expand All @@ -22,8 +23,8 @@ func writeEStargz() (compressorFunc compressor, finalize func(context.Context, c
var bInfo blobInfo
var mu sync.Mutex
return func(dest io.Writer, requiredMediaType string) (io.WriteCloser, error) {
if !isGzipCompressedType(requiredMediaType) {
return nil, fmt.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
if compression.FromMediaType(requiredMediaType) != compression.Gzip {
return nil, errors.Errorf("unsupported media type for estargz compressor %q", requiredMediaType)
}
done := make(chan struct{})
pr, pw := io.Pipe()
Expand Down Expand Up @@ -127,7 +128,7 @@ func calculateBlob() (io.WriteCloser, chan blobInfo) {
c := new(counter)
dgstr := digest.Canonical.Digester()
diffID := digest.Canonical.Digester()
decompressR, err := compression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
decompressR, err := cdcompression.DecompressStream(io.TeeReader(pr, dgstr.Hash()))
if err != nil {
pr.CloseWithError(err)
return
Expand Down
Loading

0 comments on commit ea773f6

Please sign in to comment.