Skip to content

Commit

Permalink
Metadata compression config support for indirect content
Browse files Browse the repository at this point in the history
Signed-off-by: Prasad Ghangal <[email protected]>
  • Loading branch information
PrasadG193 committed Jul 23, 2024
1 parent 6f98674 commit 4b790e2
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 31 deletions.
4 changes: 2 additions & 2 deletions repo/grpc_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// maybeRetry executes the provided callback with or without automatic retries depending on how
Expand Down
5 changes: 3 additions & 2 deletions repo/object/object_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
Expand All @@ -133,14 +133,15 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: comp,
})
defer w.Close() //nolint:errcheck

if werr := writeIndirectObject(w, concatenatedEntries); werr != nil {
return EmptyID, werr
}

concatID, err := w.Result()
concatID, err := w.Result(comp)
if err != nil {
return EmptyID, errors.Wrap(err, "error writing concatenated index")
}
Expand Down
18 changes: 9 additions & 9 deletions repo/object/object_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type Writer interface {
// Checkpoint returns ID of an object consisting of all contents written to storage so far.
// This may not include some data buffered in the writer.
// In case nothing has been written yet, returns empty object ID.
Checkpoint() (ID, error)
Checkpoint(comp compression.Name) (ID, error)

// Result returns object ID representing all bytes written to the writer.
Result() (ID, error)
Result(comp compression.Name) (ID, error)
}

type contentIDTracker struct {
Expand Down Expand Up @@ -251,7 +251,7 @@ func maybeCompressedContentBytes(comp compression.Compressor, input gather.Bytes
return input, false, nil
}

func (w *objectWriter) Result() (ID, error) {
func (w *objectWriter) Result(comp compression.Name) (ID, error) {
w.mu.Lock()
defer w.mu.Unlock()

Expand All @@ -263,19 +263,19 @@ func (w *objectWriter) Result() (ID, error) {
}
}

return w.checkpointLocked()
return w.checkpointLocked(comp)
}

// Checkpoint returns object ID which represents portion of the object that has already been written.
// The result may be an empty object ID if nothing has been flushed yet.
func (w *objectWriter) Checkpoint() (ID, error) {
func (w *objectWriter) Checkpoint(comp compression.Name) (ID, error) {
w.mu.Lock()
defer w.mu.Unlock()

return w.checkpointLocked()
return w.checkpointLocked(comp)
}

func (w *objectWriter) checkpointLocked() (ID, error) {
func (w *objectWriter) checkpointLocked(comp compression.Name) (ID, error) {
// wait for any in-flight asynchronous writes to finish
w.asyncWritesWG.Wait()

Expand All @@ -294,7 +294,7 @@ func (w *objectWriter) checkpointLocked() (ID, error) {
iw := &objectWriter{
ctx: w.ctx,
om: w.om,
compressor: nil,
compressor: compression.ByName[comp],
description: "LIST(" + w.description + ")",
splitter: w.om.newDefaultSplitter(),
prefix: w.prefix,
Expand All @@ -311,7 +311,7 @@ func (w *objectWriter) checkpointLocked() (ID, error) {
return EmptyID, err
}

oid, err := iw.Result()
oid, err := iw.Result(comp)
if err != nil {
return EmptyID, err
}
Expand Down
7 changes: 4 additions & 3 deletions repo/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/format"
Expand Down Expand Up @@ -47,7 +48,7 @@ type RepositoryWriter interface {
Repository

NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer
ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error)
ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error)
PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
DeleteManifest(ctx context.Context, id manifest.ID) error
Expand Down Expand Up @@ -180,9 +181,9 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// DisableIndexRefresh disables index refresh for the duration of the write session.
Expand Down
9 changes: 8 additions & 1 deletion snapshot/snapshotfs/dir_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/object"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
)

var dirRewriterLog = logging.Module("dirRewriter")
Expand Down Expand Up @@ -283,7 +284,13 @@ func RewriteAsStub(rep repo.RepositoryWriter) RewriteFailedEntryCallback {
return nil, errors.Wrap(err, "error writing stub")
}

oid, err := w.Result()
// TODO: Check if source info can be made available here
pol, _, _, err := policy.GetEffectivePolicy(ctx, rep, policy.GlobalPolicySourceInfo)
if err != nil {
return nil, errors.Wrap(err, "error getting policy")

}
oid, err := w.Result(pol.MetadataCompressionPolicy.MetadataCompressor())
if err != nil {
return nil, errors.Wrap(err, "error writing stub")
}
Expand Down
2 changes: 1 addition & 1 deletion snapshot/snapshotfs/dir_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativ
return object.EmptyID, errors.Wrap(err, "unable to encode directory JSON")
}

oid, err := writer.Result()
oid, err := writer.Result(comp)
if err != nil {
return object.EmptyID, errors.Wrap(err, "unable to write directory")
}
Expand Down
29 changes: 16 additions & 13 deletions snapshot/snapshotfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,13 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
}

comp := pol.CompressionPolicy.CompressorForFile(f)
metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor()
splitterName := pol.SplitterPolicy.SplitterForFile(f)

chunkSize := pol.UploadPolicy.ParallelUploadAboveSize.OrDefault(-1)
if chunkSize < 0 || f.Size() <= chunkSize {
// all data fits in 1 full chunks, upload directly
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, splitterName)
return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, metadataComp, splitterName)
}

// we always have N+1 parts, first N are exactly chunkSize, last one has undetermined length
Expand All @@ -191,11 +192,11 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
if wg.CanShareWork(u.workerPool) {
// another goroutine is available, delegate to them
wg.RunAsync(u.workerPool, func(_ *workshare.Pool[*uploadWorkItem], _ *uploadWorkItem) {
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName)
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName)
}, nil)
} else {
// just do the work in the current goroutine
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName)
parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName)
}
}

Expand All @@ -206,10 +207,10 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis
return nil, errors.Wrap(err, "error uploading parts")
}

return concatenateParts(ctx, u.repo, f.Name(), parts)
return concatenateParts(ctx, u.repo, f.Name(), parts, metadataComp)
}

func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry) (*snapshot.DirEntry, error) {
func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) {
var (
objectIDs []object.ID
totalSize int64
Expand All @@ -221,7 +222,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin
objectIDs = append(objectIDs, part.ObjectID)
}

resultObject, err := rep.ConcatenateObjects(ctx, objectIDs)
resultObject, err := rep.ConcatenateObjects(ctx, objectIDs, comp)
if err != nil {
return nil, errors.Wrap(err, "concatenate")
}
Expand All @@ -234,7 +235,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin
return de, nil
}

func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name, splitterName string) (*snapshot.DirEntry, error) {
func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor, metadataComp compression.Name, splitterName string) (*snapshot.DirEntry, error) {
file, err := f.Open(ctx)
if err != nil {
return nil, errors.Wrap(err, "unable to open file")
Expand All @@ -250,7 +251,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry
defer writer.Close() //nolint:errcheck

parentCheckpointRegistry.addCheckpointCallback(fname, func() (*snapshot.DirEntry, error) {
checkpointID, err := writer.Checkpoint()
checkpointID, err := writer.Checkpoint(metadataComp)
if err != nil {
return nil, errors.Wrap(err, "checkpoint error")
}
Expand Down Expand Up @@ -280,7 +281,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry
return nil, err
}

r, err := writer.Result()
r, err := writer.Result(metadataComp)
if err != nil {
return nil, errors.Wrap(err, "unable to get result")
}
Expand All @@ -298,7 +299,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry
return de, nil
}

func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) (dirEntry *snapshot.DirEntry, ret error) {
func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink, metadataComp compression.Name) (dirEntry *snapshot.DirEntry, ret error) {
u.Progress.HashingFile(relativePath)

defer func() {
Expand All @@ -321,7 +322,7 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin
return nil, err
}

r, err := writer.Result()
r, err := writer.Result(metadataComp)
if err != nil {
return nil, errors.Wrap(err, "unable to get result")
}
Expand Down Expand Up @@ -353,6 +354,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath
}()

comp := pol.CompressionPolicy.CompressorForFile(f)
metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor()

writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{
Description: "STREAMFILE:" + f.Name(),
Expand All @@ -367,7 +369,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath
return nil, err
}

r, err := writer.Result()
r, err := writer.Result(metadataComp)
if err != nil {
return nil, errors.Wrap(err, "unable to get result")
}
Expand Down Expand Up @@ -903,7 +905,8 @@ func (u *Uploader) processSingle(
return nil

case fs.Symlink:
de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry)
childTree := policyTree.Child(entry.Name())
de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry, childTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor())

return u.processEntryUploadResult(ctx, de, err, entryRelativePath, parentDirBuilder,
policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreFileErrors.OrDefault(false),
Expand Down

0 comments on commit 4b790e2

Please sign in to comment.