Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repository): Add support to configure compressor for metadata content #560

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cli/command_policy_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type commandPolicySet struct {

policyActionFlags
policyCompressionFlags
policyMetadataCompressionFlags
policySplitterFlags
policyErrorFlags
policyFilesFlags
Expand All @@ -36,6 +37,7 @@ func (c *commandPolicySet) setup(svc appServices, parent commandParent) {

c.policyActionFlags.setup(cmd)
c.policyCompressionFlags.setup(cmd)
c.policyMetadataCompressionFlags.setup(cmd)
c.policySplitterFlags.setup(cmd)
c.policyErrorFlags.setup(cmd)
c.policyFilesFlags.setup(cmd)
Expand Down Expand Up @@ -108,6 +110,10 @@ func (c *commandPolicySet) setPolicyFromFlags(ctx context.Context, p *policy.Pol
return errors.Wrap(err, "compression policy")
}

if err := c.setMetadataCompressionPolicyFromFlags(ctx, &p.MetadataCompressionPolicy, changeCount); err != nil {
return errors.Wrap(err, "metadata compression policy")
}

if err := c.setSplitterPolicyFromFlags(ctx, &p.SplitterPolicy, changeCount); err != nil {
return errors.Wrap(err, "splitter policy")
}
Expand Down
31 changes: 31 additions & 0 deletions cli/command_policy_set_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,37 @@ type policyCompressionFlags struct {
policySetClearNeverCompress bool
}

type policyMetadataCompressionFlags struct {
policySetMetadataCompressionAlgorithm string
}

func (c *policyMetadataCompressionFlags) setup(cmd *kingpin.CmdClause) {
// Name of compression algorithm.
cmd.Flag("metadata-compression", "Metadata Compression algorithm").EnumVar(&c.policySetMetadataCompressionAlgorithm, supportedCompressionAlgorithms()...)
}

func (c *policyMetadataCompressionFlags) setMetadataCompressionPolicyFromFlags(
ctx context.Context,
p *policy.MetadataCompressionPolicy,
changeCount *int,
) error { //nolint:unparam
if v := c.policySetMetadataCompressionAlgorithm; v != "" {
*changeCount++

if v == inheritPolicyString {
log(ctx).Info(" - resetting compression algorithm to default value inherited from parent")

p.CompressorName = ""
} else {
log(ctx).Infof(" - setting compression algorithm to %v", v)

p.CompressorName = compression.Name(v)
}
}

return nil
}

func (c *policyCompressionFlags) setup(cmd *kingpin.CmdClause) {
// Name of compression algorithm.
cmd.Flag("compression", "Compression algorithm").EnumVar(&c.policySetCompressionAlgorithm, supportedCompressionAlgorithms()...)
Expand Down
13 changes: 13 additions & 0 deletions cli/command_policy_show.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func printPolicy(out *textOutput, p *policy.Policy, def *policy.Definition) {
rows = append(rows, policyTableRow{})
rows = appendCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendMetadataCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendSplitterPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendActionsPolicyRows(rows, p, def)
Expand Down Expand Up @@ -388,6 +390,17 @@ func appendCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *p
return rows
}

func appendMetadataCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
if p.MetadataCompressionPolicy.CompressorName == "" || p.MetadataCompressionPolicy.CompressorName == "none" {
rows = append(rows, policyTableRow{"Metadata compression disabled.", "", ""})
return rows
}

return append(rows,
policyTableRow{"Metadata compression:", "", ""},
policyTableRow{" Compressor:", string(p.MetadataCompressionPolicy.CompressorName), definitionPointToString(p.Target(), def.MetadataCompressionPolicy.CompressorName)})
}

func appendSplitterPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
algorithm := p.SplitterPolicy.Algorithm
if algorithm == "" {
Expand Down
10 changes: 9 additions & 1 deletion cli/command_snapshot_fix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)

Expand Down Expand Up @@ -90,12 +91,19 @@ func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, r
for _, mg := range snapshot.GroupBySource(manifests) {
log(ctx).Infof("Processing snapshot %v", mg[0].Source)

policyTree, err := policy.TreeForSource(ctx, rep, mg[0].Source)
if err != nil {
return errors.Wrap(err, "unable to get policy tree")
}

metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()

for _, man := range snapshot.SortByTime(mg, false) {
log(ctx).Debugf(" %v (%v)", formatTimestamp(man.StartTime.ToTime()), man.ID)

old := man.Clone()

changed, err := rw.RewriteSnapshotManifest(ctx, man)
changed, err := rw.RewriteSnapshotManifest(ctx, man, metadataComp)
if err != nil {
return errors.Wrap(err, "error rewriting manifest")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/grpc_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func handleWriteContentRequest(ctx context.Context, dw repo.DirectRepositoryWrit
return accessDeniedResponse()
}

if strings.HasPrefix(req.GetPrefix(), manifest.ContentPrefix) {
if strings.HasPrefix(req.GetPrefix(), content.ManifestContentPrefix) {
// it's not allowed to create contents prefixed with 'm' since those could be mistaken for manifest contents.
return accessDeniedResponse()
}
Expand Down
5 changes: 3 additions & 2 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
mustReadObject(ctx, t, w, result, written)

ow := w.NewObjectWriter(ctx, object.WriterOptions{
Prefix: manifest.ContentPrefix,
Prefix: content.ManifestContentPrefix,
MetadataCompressor: "zstd-fastest",
})

_, err := ow.Write([]byte{2, 3, 4})
Expand Down Expand Up @@ -258,7 +259,7 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID {
t.Helper()

ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := ow.Write(data)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions repo/blob/storage_extend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.
nro.RetentionMode = ""
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
5 changes: 5 additions & 0 deletions repo/content/content_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
DefaultIndexVersion = 2
)

// ManifestContentPrefix is the prefix of the content id for manifests.
const (
ManifestContentPrefix = "m"
)

var tracer = otel.Tracer("kopia/content")

// PackBlobIDPrefixes contains all possible prefixes for pack blobs.
Expand Down
4 changes: 3 additions & 1 deletion repo/content/content_manager_lock_free.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes

// If the content is prefixed (which represents Kopia's own metadata as opposed to user data),
// and we're on V2 format or greater, enable internal compression even when not requested.
if contentID.HasPrefix() && comp == NoCompression && mp.IndexVersion >= index.Version2 {
// Set compression only for manifest metadata.
// TODO: Remove this check once metadata compression setting is implemented for manifest metadata.
if contentID.HasPrefix() && contentID.Prefix() == ManifestContentPrefix && comp == NoCompression && mp.IndexVersion >= index.Version2 {
// 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON.
comp = compression.HeaderZstdFastest
}
Expand Down
57 changes: 55 additions & 2 deletions repo/content/content_manager_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion repo/format/upgrade_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
t.Helper()

w := rep.NewObjectWriter(ctx, object.WriterOptions{})
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := w.Write(data)
require.NoError(t, err, testCaseID)
Expand Down
6 changes: 3 additions & 3 deletions repo/grpc_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// maybeRetry executes the provided callback with or without automatic retries depending on how
Expand Down Expand Up @@ -721,7 +721,7 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data gather.Byt
}

// we will be writing asynchronously and server will reject this write, fail early.
if prefix == manifest.ContentPrefix {
if prefix == content.ManifestContentPrefix {
return content.EmptyID, errors.Errorf("writing manifest contents not allowed")
}

Expand Down
2 changes: 1 addition & 1 deletion repo/maintenance/blob_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/blob_retain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/content_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
// run N sessions to create N individual pack blobs for each content prefix
for range tc.numPContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
Expand All @@ -88,7 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {

for range tc.numQContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/maintenance_safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create object that's immediately orphaned since nobody refers to it.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello world")
var err error
objectID, err = ow.Result()
Expand All @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create another object in separate pack.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello universe")
_, err := ow.Result()
return err
Expand Down
4 changes: 2 additions & 2 deletions repo/manifest/committed_manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
mustSucceed(gz.Flush())
mustSucceed(gz.Close())

contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
contentID, err := m.b.WriteContent(ctx, buf.Bytes(), content.ManifestContentPrefix, content.NoCompression)
if err != nil {
return nil, errors.Wrap(err, "unable to write content")
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte
manifests = map[content.ID]manifest{}

err := m.b.IterateContents(ctx, content.IterateOptions{
Range: index.PrefixRange(ContentPrefix),
Range: index.PrefixRange(content.ManifestContentPrefix),
Parallel: manifestLoadParallelism,
}, func(ci content.Info) error {
man, err := loadManifestContent(ctx, m.b, ci.ContentID)
Expand Down
8 changes: 2 additions & 6 deletions repo/manifest/manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@ import (
const (
manifestLoadParallelism = 8
manifestIDLength = 16

autoCompactionContentCountDefault = 16
)

var log = logging.Module("kopia/manifest") // +checklocksignore

// ErrNotFound is returned when the metadata item is not found.
var ErrNotFound = errors.New("not found")

// ContentPrefix is the prefix of the content id for manifests.
const (
ContentPrefix = "m"
autoCompactionContentCountDefault = 16
)

// TypeLabelKey is the label key for manifest type.
const TypeLabelKey = "type"

Expand Down
4 changes: 2 additions & 2 deletions repo/manifest/manifest_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestManifest(t *testing.T) {

if err := mgr.b.IterateContents(
ctx,
content.IterateOptions{Range: index.PrefixRange(ContentPrefix)},
content.IterateOptions{Range: index.PrefixRange(content.ManifestContentPrefix)},
func(ci content.Info) error {
foundContents++
return nil
Expand Down Expand Up @@ -447,7 +447,7 @@ func getManifestContentCount(ctx context.Context, t *testing.T, mgr *Manager) in

if err := mgr.b.IterateContents(
ctx,
content.IterateOptions{Range: index.PrefixRange(ContentPrefix)},
content.IterateOptions{Range: index.PrefixRange(content.ManifestContentPrefix)},
func(ci content.Info) error {
foundContents++
return nil
Expand Down
9 changes: 6 additions & 3 deletions repo/object/object_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.metadataCompressor = compression.ByName[opt.MetadataCompressor]
w.totalLength = 0
w.currentPosition = 0

Expand Down Expand Up @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
Expand All @@ -131,8 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)

w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: metadataComp,
MetadataCompressor: metadataComp,
})
defer w.Close() //nolint:errcheck

Expand Down
Loading
Loading