From dff0752b6ab6b98d7d93189f448c1bc3d93636d7 Mon Sep 17 00:00:00 2001 From: Prasad Ghangal Date: Tue, 27 Aug 2024 17:25:04 +0530 Subject: [PATCH] Configure compressor for k and x prefixed content Adds metadata compression setting to policy Add support to configure compressor for k and x prefixed content Set zstd-fastest as the default compressor for metadata in the policy Adds support to set and show metadata compression to kopia policy commands Adds metadata compression config to dir writer Signed-off-by: Prasad Ghangal --- cli/command_policy_set.go | 6 + cli/command_policy_set_compression.go | 31 +++++ cli/command_policy_show.go | 13 +++ cli/command_snapshot_fix.go | 10 +- internal/server/grpc_session.go | 2 +- internal/server/server_test.go | 5 +- repo/blob/storage_extend_test.go | 4 +- repo/content/content_manager.go | 5 + repo/content/content_manager_lock_free.go | 4 +- repo/content/content_manager_test.go | 57 ++++++++- repo/format/upgrade_lock_test.go | 2 +- repo/grpc_repository_client.go | 6 +- repo/maintenance/blob_gc_test.go | 2 +- repo/maintenance/blob_retain_test.go | 4 +- repo/maintenance/content_rewrite_test.go | 4 +- repo/maintenance/maintenance_safety_test.go | 4 +- repo/manifest/committed_manifest_manager.go | 4 +- repo/manifest/manifest_manager.go | 8 +- repo/manifest/manifest_manager_test.go | 4 +- repo/object/object_manager.go | 9 +- repo/object/object_manager_test.go | 59 ++++++++-- repo/object/object_writer.go | 34 ++++-- repo/repo_benchmarks_test.go | 6 +- repo/repository.go | 7 +- repo/repository_test.go | 15 +-- snapshot/policy/compression_policy.go | 24 ++++ snapshot/policy/policy.go | 46 ++++---- snapshot/policy/policy_merge.go | 2 + snapshot/policy/policy_tree.go | 22 ++-- snapshot/snapshotfs/dir_rewriter.go | 42 ++++--- snapshot/snapshotfs/dir_writer.go | 9 +- snapshot/snapshotfs/upload.go | 47 +++++--- snapshot/snapshotfs/upload_test.go | 122 ++++++++++++++++++-- snapshot/snapshotgc/gc.go | 3 +- 34 files changed, 474 insertions(+), 148 deletions(-) diff --git a/cli/command_policy_set.go b/cli/command_policy_set.go index f0cde6351ed..78e1773501a 100644 --- a/cli/command_policy_set.go +++ b/cli/command_policy_set.go @@ -19,6 +19,7 @@ type commandPolicySet struct { policyActionFlags policyCompressionFlags + policyMetadataCompressionFlags policySplitterFlags policyErrorFlags policyFilesFlags @@ -36,6 +37,7 @@ func (c *commandPolicySet) setup(svc appServices, parent commandParent) { c.policyActionFlags.setup(cmd) c.policyCompressionFlags.setup(cmd) + c.policyMetadataCompressionFlags.setup(cmd) c.policySplitterFlags.setup(cmd) c.policyErrorFlags.setup(cmd) c.policyFilesFlags.setup(cmd) @@ -108,6 +110,10 @@ func (c *commandPolicySet) setPolicyFromFlags(ctx context.Context, p *policy.Pol return errors.Wrap(err, "compression policy") } + if err := c.setMetadataCompressionPolicyFromFlags(ctx, &p.MetadataCompressionPolicy, changeCount); err != nil { + return errors.Wrap(err, "metadata compression policy") + } + if err := c.setSplitterPolicyFromFlags(ctx, &p.SplitterPolicy, changeCount); err != nil { return errors.Wrap(err, "splitter policy") } diff --git a/cli/command_policy_set_compression.go b/cli/command_policy_set_compression.go index 14866e1c8a9..3f1b3aa39c5 100644 --- a/cli/command_policy_set_compression.go +++ b/cli/command_policy_set_compression.go @@ -24,6 +24,37 @@ type policyCompressionFlags struct { policySetClearNeverCompress bool } +type policyMetadataCompressionFlags struct { + policySetMetadataCompressionAlgorithm string +} + +func (c *policyMetadataCompressionFlags) setup(cmd *kingpin.CmdClause) { + // Name of compression algorithm. + cmd.Flag("metadata-compression", "Metadata Compression algorithm").EnumVar(&c.policySetMetadataCompressionAlgorithm, supportedCompressionAlgorithms()...) +} + +func (c *policyMetadataCompressionFlags) setMetadataCompressionPolicyFromFlags( + ctx context.Context, + p *policy.MetadataCompressionPolicy, + changeCount *int, +) error { //nolint:unparam + if v := c.policySetMetadataCompressionAlgorithm; v != "" { + *changeCount++ + + if v == inheritPolicyString { + log(ctx).Info(" - resetting compression algorithm to default value inherited from parent") + + p.CompressorName = "" + } else { + log(ctx).Infof(" - setting compression algorithm to %v", v) + + p.CompressorName = compression.Name(v) + } + } + + return nil +} + func (c *policyCompressionFlags) setup(cmd *kingpin.CmdClause) { // Name of compression algorithm. cmd.Flag("compression", "Compression algorithm").EnumVar(&c.policySetCompressionAlgorithm, supportedCompressionAlgorithms()...) diff --git a/cli/command_policy_show.go b/cli/command_policy_show.go index 1e314a0ea85..d197b4a6988 100644 --- a/cli/command_policy_show.go +++ b/cli/command_policy_show.go @@ -126,6 +126,8 @@ func printPolicy(out *textOutput, p *policy.Policy, def *policy.Definition) { rows = append(rows, policyTableRow{}) rows = appendCompressionPolicyRows(rows, p, def) rows = append(rows, policyTableRow{}) + rows = appendMetadataCompressionPolicyRows(rows, p, def) + rows = append(rows, policyTableRow{}) rows = appendSplitterPolicyRows(rows, p, def) rows = append(rows, policyTableRow{}) rows = appendActionsPolicyRows(rows, p, def) @@ -388,6 +390,17 @@ func appendCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *p return rows } +func appendMetadataCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow { + if p.MetadataCompressionPolicy.CompressorName == "" || p.MetadataCompressionPolicy.CompressorName == "none" { + rows = append(rows, policyTableRow{"Metadata compression disabled.", "", ""}) + return rows + } + + return append(rows, + policyTableRow{"Metadata compression:", "", ""}, + policyTableRow{" Compressor:", string(p.MetadataCompressionPolicy.CompressorName), definitionPointToString(p.Target(), def.MetadataCompressionPolicy.CompressorName)}) +} + func appendSplitterPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow { algorithm := p.SplitterPolicy.Algorithm if algorithm == "" { diff --git a/cli/command_snapshot_fix.go b/cli/command_snapshot_fix.go index 4bbcc1c221e..f276d9dda02 100644 --- a/cli/command_snapshot_fix.go +++ b/cli/command_snapshot_fix.go @@ -10,6 +10,7 @@ import ( "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" "github.com/kopia/kopia/snapshot/snapshotfs" ) @@ -90,12 +91,19 @@ func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, r for _, mg := range snapshot.GroupBySource(manifests) { log(ctx).Infof("Processing snapshot %v", mg[0].Source) + policyTree, err := policy.TreeForSource(ctx, rep, mg[0].Source) + if err != nil { + return errors.Wrap(err, "unable to get policy tree") + } + + metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor() + for _, man := range snapshot.SortByTime(mg, false) { log(ctx).Debugf(" %v (%v)", formatTimestamp(man.StartTime.ToTime()), man.ID) old := man.Clone() - changed, err := rw.RewriteSnapshotManifest(ctx, man) + changed, err := rw.RewriteSnapshotManifest(ctx, man, metadataComp) if err != nil { return errors.Wrap(err, "error rewriting manifest") } diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index b10fb06a1f9..8ea8363bc0f 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -264,7 +264,7 @@ func handleWriteContentRequest(ctx context.Context, dw repo.DirectRepositoryWrit return accessDeniedResponse() } - if strings.HasPrefix(req.GetPrefix(), manifest.ContentPrefix) { + if strings.HasPrefix(req.GetPrefix(), content.ManifestContentPrefix) { // it's not allowed to create contents prefixed with 'm' since those could be mistaken for manifest contents. return accessDeniedResponse() } diff --git a/internal/server/server_test.go b/internal/server/server_test.go index b15ec9bcca1..c99ab0e8176 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -210,7 +210,8 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository mustReadObject(ctx, t, w, result, written) ow := w.NewObjectWriter(ctx, object.WriterOptions{ - Prefix: manifest.ContentPrefix, + Prefix: content.ManifestContentPrefix, + MetadataCompressor: "zstd-fastest", }) _, err := ow.Write([]byte{2, 3, 4}) @@ -258,7 +259,7 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID { t.Helper() - ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := ow.Write(data) require.NoError(t, err) diff --git a/repo/blob/storage_extend_test.go b/repo/blob/storage_extend_test.go index fad91143434..c7cd60c244d 100644 --- a/repo/blob/storage_extend_test.go +++ b/repo/blob/storage_extend_test.go @@ -42,7 +42,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) { nro.RetentionPeriod = period }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() @@ -103,7 +103,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing. nro.RetentionMode = "" }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index ff37cf45dab..f6c79a51a3a 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -40,6 +40,11 @@ const ( DefaultIndexVersion = 2 ) +// ManifestContentPrefix is the prefix of the content id for manifests. +const ( + ManifestContentPrefix = "m" +) + var tracer = otel.Tracer("kopia/content") // PackBlobIDPrefixes contains all possible prefixes for pack blobs. diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index d43978e02a9..8eec6579e99 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -31,7 +31,9 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes // If the content is prefixed (which represents Kopia's own metadata as opposed to user data), // and we're on V2 format or greater, enable internal compression even when not requested. - if contentID.HasPrefix() && comp == NoCompression && mp.IndexVersion >= index.Version2 { + // Set compression only for manifest metadata. + // TODO: Remove this check once metadata compression setting is implemented for manifest metadata. + if contentID.HasPrefix() && contentID.Prefix() == ManifestContentPrefix && comp == NoCompression && mp.IndexVersion >= index.Version2 { // 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON. comp = compression.HeaderZstdFastest } diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 4d8bc9cf5ed..9ebb71e624d 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -1804,7 +1804,7 @@ func (s *contentManagerSuite) TestContentWriteAliasing(t *testing.T) { verifyContent(ctx, t, bm, id4, []byte{103, 0, 0}) } -func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) { +func (s *contentManagerSuite) TestDisableCompressionOfMetadata(t *testing.T) { ctx := testlogging.Context(t) data := blobtesting.DataMap{} st := blobtesting.NewMapStorage(data, nil, nil) @@ -1812,11 +1812,41 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) { //nolint:lll contentID, err := bm.WriteContent(ctx, - gather.FromSlice([]byte(`{"stream":"kopia:directory","entries":[{"name":".chglog","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.159239913-07:00","uid":501,"gid":20,"obj":"k18c2fa7d9108a2bf0d9d5b8e7993c48d","summ":{"size":1897,"files":2,"symlinks":0,"dirs":1,"maxTime":"2022-03-22T22:45:22.159499411-07:00","numFailed":0}},{"name":".git","type":"d","mode":"0755","mtime":"2022-04-03T17:47:38.340226306-07:00","uid":501,"gid":20,"obj":"k0ad4214eb961aa78cf06611ec4563086","summ":{"size":88602907,"files":7336,"symlinks":0,"dirs":450,"maxTime":"2022-04-03T17:28:54.030135198-07:00","numFailed":0}},{"name":".github","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.160470238-07:00","uid":501,"gid":20,"obj":"k76bee329054d5574d89a4e87c3f24088","summ":{"size":20043,"files":13,"symlinks":0,"dirs":2,"maxTime":"2022-03-22T22:45:22.162580934-07:00","numFailed":0}},{"name":".logs","type":"d","mode":"0750","mtime":"2021-11-06T13:43:35.082115457-07:00","uid":501,"gid":20,"obj":"k1e7d5bda28d6b684bb180cac16775c1c","summ":{"size":382943352,"files":1823,"symlinks":0,"dirs":122,"maxTime":"2021-11-06T13:43:45.111270118-07:00","numFailed":0}},{"name":".release","type":"d","mode":"0755","mtime":"2021-04-16T06:26:47-07:00","uid":501,"gid":20,"obj":"k0eb539316600015bf2861e593f68e18d","summ":{"size":159711446,"files":19,"symlinks":0,"dirs":1,"maxTime":"2021-04-16T06:26:47-07:00","numFailed":0}},{"name":".screenshots","type":"d","mode":"0755","mtime":"2022-01-29T00:12:29.023594487-08:00","uid":501,"gid":20,"obj":"k97f6dbc82e84c97c955364d12ddc44bd","summ":{"size":6770746,"files":53,"symlinks":0,"dirs":7,"maxTime":"2022-03-19T18:59:51.559099257-07:00","numFailed":0}},{"name":"app","type":"d","mode":"0755","mtime":"2022-03-26T22:28:51.863826565-07:00","uid":501,"gid":20,"obj":"k656b41b8679c2537392b3997648cf43e","summ":{"size":565633611,"files":44812,"symlinks":0,"dirs":7576,"maxTime":"2022-03-26T22:28:51.863946606-07:00","numFailed":0}},{"name":"cli","type":"d","mode":"0755","mtime":"2022-04-03T12:24:52.84319224-07:00","uid":501,"gid":20,"obj":"k04ab4f2a1da96c47f62a51f119dba14d","summ":{"size":468233,"files":164,"symlinks":0,"dirs":1,"maxTime":"2022-04-03T12:24:52.843267824-07:00","numFailed":0}},{"name":"dist","type":"d","mode":"0755","mtime":"2022-03-19T22:46:00.12834831-07:00","uid":501,"gid":20,"obj":"k19fc65da8a47b7702bf6b501b7f3e1b5","summ":{"size":3420732994,"files":315,"symlinks":0,"dirs":321,"maxTime":"2022-03-27T12:10:08.019195221-07:00","numFailed":0}},{"name":"fs","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.194955195-07:00","uid":501,"gid":20,"obj":"k1f0be83e34826450e651f16ba63c5b9c","summ":{"size":80421,"files":21,"symlinks":0,"dirs":6,"maxTime":"2022-03-22T22:45:22.195085778-07:00","numFailed":0}},{"name":"icons","type":"d","mode":"0755","mtime":"2022-01-23T12:06:14.739575928-08:00","uid":501,"gid":20,"obj":"k9e76c283312bdc6e562f66c7d6526396","summ":{"size":361744,"files":13,"symlinks":0,"dirs":1,"maxTime":"2021-03-12T19:28:45-08:00","numFailed":0}},{"name":"internal","type":"d","mode":"0755","mtime":"2022-04-02T18:14:02.459772332-07:00","uid":501,"gid":20,"obj":"k181db968f69045159753f8d6f3f3454f","summ":{"size":778467,"files":198,"symlinks":0,"dirs":56,"maxTime":"2022-04-03T12:24:52.844331708-07:00","numFailed":0}},{"name":"node_modules","type":"d","mode":"0755","mtime":"2021-05-16T15:45:19-07:00","uid":501,"gid":20,"obj":"kf2b636c57a7cc412739d2c10ca7ab0a3","summ":{"size":5061213,"files":361,"symlinks":0,"dirs":69,"maxTime":"2021-05-16T15:45:19-07:00","numFailed":0}},{"name":"repo","type":"d","mode":"0755","mtime":"2022-04-03T12:24:52.844407167-07:00","uid":501,"gid":20,"obj":"kb839dcd04d94a1b568f7f5e8fc809fab","summ":{"size":992877,"files":193,"symlinks":0,"dirs":27,"maxTime":"2022-04-03T17:47:31.211316848-07:00","numFailed":0}},{"name":"site","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.250939688-07:00","uid":501,"gid":20,"obj":"k5d8ce70ca4337c17219502963f0fe6d3","summ":{"size":58225583,"files":11387,"symlinks":0,"dirs":557,"maxTime":"2022-03-22T22:45:22.258280685-07:00","numFailed":0}},{"name":"snapshot","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.265723348-07:00","uid":501,"gid":20,"obj":"k6201166bd99c8fe85d53d742e92c81a6","summ":{"size":316009,"files":66,"symlinks":0,"dirs":6,"maxTime":"2022-03-26T23:04:24.313115653-07:00","numFailed":0}},{"name":"tests","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.2749515-07:00","uid":501,"gid":20,"obj":"k1e20890089f6cbad3c6fe79cbae71e09","summ":{"size":657360,"files":183,"symlinks":0,"dirs":30,"maxTime":"2022-04-02T18:41:02.232496031-07:00","numFailed":0}},{"name":"tools","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.279094142-07:00","uid":501,"gid":20,"obj":"k6464e940fea5ef916ab86eafdb68b1cd","summ":{"size":889231805,"files":12412,"symlinks":0,"dirs":3405,"maxTime":"2022-03-22T22:45:22.279144141-07:00","numFailed":0}},{"name":".DS_Store","type":"f","mode":"0644","size":14340,"mtime":"2022-02-12T20:06:35.60110891-08:00","uid":501,"gid":20,"obj":"d9295958410ae3b73f68033274cd7a8f"},{"name":".codecov.yml","type":"f","mode":"0644","size":620,"mtime":"2022-03-22T22:45:22.159772743-07:00","uid":501,"gid":20,"obj":"6f81038ca8d7b81804f42031142731ed"},{"name":".gitattributes","type":"f","mode":"0644","size":340,"mtime":"2022-03-22T22:45:22.159870909-07:00","uid":501,"gid":20,"obj":"5608c2d289164627e8bdb468bbee2643"},{"name":".gitignore","type":"f","mode":"0644","size":321,"mtime":"2022-03-22T22:45:22.162843932-07:00","uid":501,"gid":20,"obj":"c43ce513c6371e0838fc553b77f5cdb2"},{"name":".golangci.yml","type":"f","mode":"0644","size":3071,"mtime":"2022-03-22T22:45:22.163100014-07:00","uid":501,"gid":20,"obj":"4289f49e43fba6800fa75462bd2ad43e"},{"name":".gometalinter.json","type":"f","mode":"0644","size":163,"mtime":"2019-05-09T22:33:06-07:00","uid":501,"gid":20,"obj":"fe4fc9d77cfb5f1b062414fdfd121713"},{"name":".goreleaser.yml","type":"f","mode":"0644","size":1736,"mtime":"2022-03-22T22:45:22.163354888-07:00","uid":501,"gid":20,"obj":"91093a462f4f72c619fb9f144702c1bf"},{"name":".linterr.txt","type":"f","mode":"0644","size":425,"mtime":"2021-11-08T22:14:29.315279172-08:00","uid":501,"gid":20,"obj":"f6c165387b84c7fb0ebc26fdc812775d"},{"name":".tmp.integration-tests.json","type":"f","mode":"0644","size":5306553,"mtime":"2022-03-27T12:10:55.035217892-07:00","uid":501,"gid":20,"obj":"Ixbc27b9a704275d05a6505e794ce63e66"},{"name":".tmp.provider-tests.json","type":"f","mode":"0644","size":617740,"mtime":"2022-02-15T21:30:28.579546866-08:00","uid":501,"gid":20,"obj":"e7f69fc0222763628d5b294faf37a6d7"},{"name":".tmp.unit-tests.json","type":"f","mode":"0644","size":200525943,"mtime":"2022-04-03T10:08:51.453180251-07:00","uid":501,"gid":20,"obj":"Ixf5da1bbcdbc267fa123d93aaf90cbd75"},{"name":".wwhrd.yml","type":"f","mode":"0644","size":244,"mtime":"2022-03-22T22:45:22.163564803-07:00","uid":501,"gid":20,"obj":"cea0cac6d19d59dcf2818b08521f46b8"},{"name":"BUILD.md","type":"f","mode":"0644","size":4873,"mtime":"2022-03-22T22:45:22.163818593-07:00","uid":501,"gid":20,"obj":"bcd47eca7b520b3ea88e4799cc0c9fea"},{"name":"CODE_OF_CONDUCT.md","type":"f","mode":"0644","size":5226,"mtime":"2021-03-12T19:28:45-08:00","uid":501,"gid":20,"obj":"270e55b022ec0c7588b2dbb501791b3e"},{"name":"GOVERNANCE.md","type":"f","mode":"0644","size":12477,"mtime":"2020-03-15T23:40:35-07:00","uid":501,"gid":20,"obj":"96674fad8fcf2bdfb96b0583917bb617"},{"name":"LICENSE","type":"f","mode":"0644","size":10763,"mtime":"2019-05-27T15:50:18-07:00","uid":501,"gid":20,"obj":"e751b8a146e1dd5494564e9a8c26dd6a"},{"name":"Makefile","type":"f","mode":"0644","size":17602,"mtime":"2022-03-22T22:45:22.1639718-07:00","uid":501,"gid":20,"obj":"aa9cc80d567e94087ea9be8fef718c1a"},{"name":"README.md","type":"f","mode":"0644","size":3874,"mtime":"2022-03-22T22:45:22.164109925-07:00","uid":501,"gid":20,"obj":"d227c763b9cf476426da5d99e9fff694"},{"name":"a.log","type":"f","mode":"0644","size":3776,"mtime":"2022-03-08T19:19:40.196874627-08:00","uid":501,"gid":20,"obj":"6337190196e804297f92a17805600be7"},{"name":"build_architecture.svg","type":"f","mode":"0644","size":143884,"mtime":"2021-03-12T19:28:45-08:00","uid":501,"gid":20,"obj":"72c0aef8c43498b056236b2d46d7e44a"},{"name":"coverage.txt","type":"f","mode":"0644","size":194996,"mtime":"2022-03-26T07:09:37.533649628-07:00","uid":501,"gid":20,"obj":"fdf1a20cea21d4daf053b99711735d0e"},{"name":"go.mod","type":"f","mode":"0644","size":5447,"mtime":"2022-03-27T09:40:59.78753556-07:00","uid":501,"gid":20,"obj":"71eefc767aeea467b1d1f7ff0ee5c21b"},{"name":"go.sum","type":"f","mode":"0644","size":114899,"mtime":"2022-03-27T09:40:59.788485485-07:00","uid":501,"gid":20,"obj":"2e801e525d9e58208dff3c25bd30f296"},{"name":"main.go","type":"f","mode":"0644","size":2057,"mtime":"2022-03-22T22:45:22.22380977-07:00","uid":501,"gid":20,"obj":"73411f7e340e5cddc43faaa1d1fe5743"}],"summary":{"size":5787582078,"files":79395,"symlinks":0,"dirs":12639,"maxTime":"2022-04-03T17:47:38.340226306-07:00","numFailed":0}}`)), + dirMetadataContent(), "k", NoCompression) require.NoError(t, err) + info, err := bm.ContentInfo(ctx, contentID) + require.NoError(t, err) + require.Equal(t, NoCompression, info.CompressionHeaderID) + + contentID1, err1 := bm.WriteContent(ctx, + indirectMetadataContent(), + "x", + NoCompression) + require.NoError(t, err1) + + info1, err1 := bm.ContentInfo(ctx, contentID1) + require.NoError(t, err1) + require.Equal(t, NoCompression, info1.CompressionHeaderID) +} + +func (s *contentManagerSuite) TestCompressionOfMetadata(t *testing.T) { + ctx := testlogging.Context(t) + data := blobtesting.DataMap{} + st := blobtesting.NewMapStorage(data, nil, nil) + bm := s.newTestContentManagerWithTweaks(t, st, &contentManagerTestTweaks{ + indexVersion: index.Version2, + }) + + //nolint:lll + contentID, err := bm.WriteContent(ctx, + dirMetadataContent(), + "k", + compression.HeaderZstdFastest) + require.NoError(t, err) + info, err := bm.ContentInfo(ctx, contentID) require.NoError(t, err) @@ -1825,6 +1855,21 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) { } else { require.Equal(t, NoCompression, info.CompressionHeaderID) } + + contentID1, err1 := bm.WriteContent(ctx, + indirectMetadataContent(), + "x", + compression.HeaderZstdFastest) + require.NoError(t, err1) + + info1, err1 := bm.ContentInfo(ctx, contentID1) + require.NoError(t, err1) + + if bm.SupportsContentCompression() { + require.Equal(t, compression.HeaderZstdFastest, info1.CompressionHeaderID) + } else { + require.Equal(t, NoCompression, info1.CompressionHeaderID) + } } func (s *contentManagerSuite) TestContentReadAliasing(t *testing.T) { @@ -2680,3 +2725,11 @@ func randRead(b []byte) (n int, err error) { return } + +func dirMetadataContent() gather.Bytes { + return gather.FromSlice([]byte(`{"stream":"kopia:directory","entries":[{"name":".chglog","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.159239913-07:00","uid":501,"gid":20,"obj":"k18c2fa7d9108a2bf0d9d5b8e7993c48d","summ":{"size":1897,"files":2,"symlinks":0,"dirs":1,"maxTime":"2022-03-22T22:45:22.159499411-07:00","numFailed":0}},{"name":".git","type":"d","mode":"0755","mtime":"2022-04-03T17:47:38.340226306-07:00","uid":501,"gid":20,"obj":"k0ad4214eb961aa78cf06611ec4563086","summ":{"size":88602907,"files":7336,"symlinks":0,"dirs":450,"maxTime":"2022-04-03T17:28:54.030135198-07:00","numFailed":0}},{"name":".github","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.160470238-07:00","uid":501,"gid":20,"obj":"k76bee329054d5574d89a4e87c3f24088","summ":{"size":20043,"files":13,"symlinks":0,"dirs":2,"maxTime":"2022-03-22T22:45:22.162580934-07:00","numFailed":0}},{"name":".logs","type":"d","mode":"0750","mtime":"2021-11-06T13:43:35.082115457-07:00","uid":501,"gid":20,"obj":"k1e7d5bda28d6b684bb180cac16775c1c","summ":{"size":382943352,"files":1823,"symlinks":0,"dirs":122,"maxTime":"2021-11-06T13:43:45.111270118-07:00","numFailed":0}},{"name":".release","type":"d","mode":"0755","mtime":"2021-04-16T06:26:47-07:00","uid":501,"gid":20,"obj":"k0eb539316600015bf2861e593f68e18d","summ":{"size":159711446,"files":19,"symlinks":0,"dirs":1,"maxTime":"2021-04-16T06:26:47-07:00","numFailed":0}},{"name":".screenshots","type":"d","mode":"0755","mtime":"2022-01-29T00:12:29.023594487-08:00","uid":501,"gid":20,"obj":"k97f6dbc82e84c97c955364d12ddc44bd","summ":{"size":6770746,"files":53,"symlinks":0,"dirs":7,"maxTime":"2022-03-19T18:59:51.559099257-07:00","numFailed":0}},{"name":"app","type":"d","mode":"0755","mtime":"2022-03-26T22:28:51.863826565-07:00","uid":501,"gid":20,"obj":"k656b41b8679c2537392b3997648cf43e","summ":{"size":565633611,"files":44812,"symlinks":0,"dirs":7576,"maxTime":"2022-03-26T22:28:51.863946606-07:00","numFailed":0}},{"name":"cli","type":"d","mode":"0755","mtime":"2022-04-03T12:24:52.84319224-07:00","uid":501,"gid":20,"obj":"k04ab4f2a1da96c47f62a51f119dba14d","summ":{"size":468233,"files":164,"symlinks":0,"dirs":1,"maxTime":"2022-04-03T12:24:52.843267824-07:00","numFailed":0}},{"name":"dist","type":"d","mode":"0755","mtime":"2022-03-19T22:46:00.12834831-07:00","uid":501,"gid":20,"obj":"k19fc65da8a47b7702bf6b501b7f3e1b5","summ":{"size":3420732994,"files":315,"symlinks":0,"dirs":321,"maxTime":"2022-03-27T12:10:08.019195221-07:00","numFailed":0}},{"name":"fs","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.194955195-07:00","uid":501,"gid":20,"obj":"k1f0be83e34826450e651f16ba63c5b9c","summ":{"size":80421,"files":21,"symlinks":0,"dirs":6,"maxTime":"2022-03-22T22:45:22.195085778-07:00","numFailed":0}},{"name":"icons","type":"d","mode":"0755","mtime":"2022-01-23T12:06:14.739575928-08:00","uid":501,"gid":20,"obj":"k9e76c283312bdc6e562f66c7d6526396","summ":{"size":361744,"files":13,"symlinks":0,"dirs":1,"maxTime":"2021-03-12T19:28:45-08:00","numFailed":0}},{"name":"internal","type":"d","mode":"0755","mtime":"2022-04-02T18:14:02.459772332-07:00","uid":501,"gid":20,"obj":"k181db968f69045159753f8d6f3f3454f","summ":{"size":778467,"files":198,"symlinks":0,"dirs":56,"maxTime":"2022-04-03T12:24:52.844331708-07:00","numFailed":0}},{"name":"node_modules","type":"d","mode":"0755","mtime":"2021-05-16T15:45:19-07:00","uid":501,"gid":20,"obj":"kf2b636c57a7cc412739d2c10ca7ab0a3","summ":{"size":5061213,"files":361,"symlinks":0,"dirs":69,"maxTime":"2021-05-16T15:45:19-07:00","numFailed":0}},{"name":"repo","type":"d","mode":"0755","mtime":"2022-04-03T12:24:52.844407167-07:00","uid":501,"gid":20,"obj":"kb839dcd04d94a1b568f7f5e8fc809fab","summ":{"size":992877,"files":193,"symlinks":0,"dirs":27,"maxTime":"2022-04-03T17:47:31.211316848-07:00","numFailed":0}},{"name":"site","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.250939688-07:00","uid":501,"gid":20,"obj":"k5d8ce70ca4337c17219502963f0fe6d3","summ":{"size":58225583,"files":11387,"symlinks":0,"dirs":557,"maxTime":"2022-03-22T22:45:22.258280685-07:00","numFailed":0}},{"name":"snapshot","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.265723348-07:00","uid":501,"gid":20,"obj":"k6201166bd99c8fe85d53d742e92c81a6","summ":{"size":316009,"files":66,"symlinks":0,"dirs":6,"maxTime":"2022-03-26T23:04:24.313115653-07:00","numFailed":0}},{"name":"tests","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.2749515-07:00","uid":501,"gid":20,"obj":"k1e20890089f6cbad3c6fe79cbae71e09","summ":{"size":657360,"files":183,"symlinks":0,"dirs":30,"maxTime":"2022-04-02T18:41:02.232496031-07:00","numFailed":0}},{"name":"tools","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.279094142-07:00","uid":501,"gid":20,"obj":"k6464e940fea5ef916ab86eafdb68b1cd","summ":{"size":889231805,"files":12412,"symlinks":0,"dirs":3405,"maxTime":"2022-03-22T22:45:22.279144141-07:00","numFailed":0}},{"name":".DS_Store","type":"f","mode":"0644","size":14340,"mtime":"2022-02-12T20:06:35.60110891-08:00","uid":501,"gid":20,"obj":"d9295958410ae3b73f68033274cd7a8f"},{"name":".codecov.yml","type":"f","mode":"0644","size":620,"mtime":"2022-03-22T22:45:22.159772743-07:00","uid":501,"gid":20,"obj":"6f81038ca8d7b81804f42031142731ed"},{"name":".gitattributes","type":"f","mode":"0644","size":340,"mtime":"2022-03-22T22:45:22.159870909-07:00","uid":501,"gid":20,"obj":"5608c2d289164627e8bdb468bbee2643"},{"name":".gitignore","type":"f","mode":"0644","size":321,"mtime":"2022-03-22T22:45:22.162843932-07:00","uid":501,"gid":20,"obj":"c43ce513c6371e0838fc553b77f5cdb2"},{"name":".golangci.yml","type":"f","mode":"0644","size":3071,"mtime":"2022-03-22T22:45:22.163100014-07:00","uid":501,"gid":20,"obj":"4289f49e43fba6800fa75462bd2ad43e"},{"name":".gometalinter.json","type":"f","mode":"0644","size":163,"mtime":"2019-05-09T22:33:06-07:00","uid":501,"gid":20,"obj":"fe4fc9d77cfb5f1b062414fdfd121713"},{"name":".goreleaser.yml","type":"f","mode":"0644","size":1736,"mtime":"2022-03-22T22:45:22.163354888-07:00","uid":501,"gid":20,"obj":"91093a462f4f72c619fb9f144702c1bf"},{"name":".linterr.txt","type":"f","mode":"0644","size":425,"mtime":"2021-11-08T22:14:29.315279172-08:00","uid":501,"gid":20,"obj":"f6c165387b84c7fb0ebc26fdc812775d"},{"name":".tmp.integration-tests.json","type":"f","mode":"0644","size":5306553,"mtime":"2022-03-27T12:10:55.035217892-07:00","uid":501,"gid":20,"obj":"Ixbc27b9a704275d05a6505e794ce63e66"},{"name":".tmp.provider-tests.json","type":"f","mode":"0644","size":617740,"mtime":"2022-02-15T21:30:28.579546866-08:00","uid":501,"gid":20,"obj":"e7f69fc0222763628d5b294faf37a6d7"},{"name":".tmp.unit-tests.json","type":"f","mode":"0644","size":200525943,"mtime":"2022-04-03T10:08:51.453180251-07:00","uid":501,"gid":20,"obj":"Ixf5da1bbcdbc267fa123d93aaf90cbd75"},{"name":".wwhrd.yml","type":"f","mode":"0644","size":244,"mtime":"2022-03-22T22:45:22.163564803-07:00","uid":501,"gid":20,"obj":"cea0cac6d19d59dcf2818b08521f46b8"},{"name":"BUILD.md","type":"f","mode":"0644","size":4873,"mtime":"2022-03-22T22:45:22.163818593-07:00","uid":501,"gid":20,"obj":"bcd47eca7b520b3ea88e4799cc0c9fea"},{"name":"CODE_OF_CONDUCT.md","type":"f","mode":"0644","size":5226,"mtime":"2021-03-12T19:28:45-08:00","uid":501,"gid":20,"obj":"270e55b022ec0c7588b2dbb501791b3e"},{"name":"GOVERNANCE.md","type":"f","mode":"0644","size":12477,"mtime":"2020-03-15T23:40:35-07:00","uid":501,"gid":20,"obj":"96674fad8fcf2bdfb96b0583917bb617"},{"name":"LICENSE","type":"f","mode":"0644","size":10763,"mtime":"2019-05-27T15:50:18-07:00","uid":501,"gid":20,"obj":"e751b8a146e1dd5494564e9a8c26dd6a"},{"name":"Makefile","type":"f","mode":"0644","size":17602,"mtime":"2022-03-22T22:45:22.1639718-07:00","uid":501,"gid":20,"obj":"aa9cc80d567e94087ea9be8fef718c1a"},{"name":"README.md","type":"f","mode":"0644","size":3874,"mtime":"2022-03-22T22:45:22.164109925-07:00","uid":501,"gid":20,"obj":"d227c763b9cf476426da5d99e9fff694"},{"name":"a.log","type":"f","mode":"0644","size":3776,"mtime":"2022-03-08T19:19:40.196874627-08:00","uid":501,"gid":20,"obj":"6337190196e804297f92a17805600be7"},{"name":"build_architecture.svg","type":"f","mode":"0644","size":143884,"mtime":"2021-03-12T19:28:45-08:00","uid":501,"gid":20,"obj":"72c0aef8c43498b056236b2d46d7e44a"},{"name":"coverage.txt","type":"f","mode":"0644","size":194996,"mtime":"2022-03-26T07:09:37.533649628-07:00","uid":501,"gid":20,"obj":"fdf1a20cea21d4daf053b99711735d0e"},{"name":"go.mod","type":"f","mode":"0644","size":5447,"mtime":"2022-03-27T09:40:59.78753556-07:00","uid":501,"gid":20,"obj":"71eefc767aeea467b1d1f7ff0ee5c21b"},{"name":"go.sum","type":"f","mode":"0644","size":114899,"mtime":"2022-03-27T09:40:59.788485485-07:00","uid":501,"gid":20,"obj":"2e801e525d9e58208dff3c25bd30f296"},{"name":"main.go","type":"f","mode":"0644","size":2057,"mtime":"2022-03-22T22:45:22.22380977-07:00","uid":501,"gid":20,"obj":"73411f7e340e5cddc43faaa1d1fe5743"}],"summary":{"size":5787582078,"files":79395,"symlinks":0,"dirs":12639,"maxTime":"2022-04-03T17:47:38.340226306-07:00","numFailed":0}}`)) //nolint:lll +} + +func indirectMetadataContent() gather.Bytes { + return gather.FromSlice([]byte(`{"stream":"kopia:indirect","entries":[{"l":7616808,"o":"a6d555a7070f7e6c1e0c9cf90e8a6cc7"},{"s":7616808,"l":8388608,"o":"7ba10912378095851cff7da5f8083fc0"},{"s":16005416,"l":2642326,"o":"de41b93c1c1ba1f030d32e2cefffa0e9"},{"s":18647742,"l":2556388,"o":"25f391d185c3101006a45553efb67742"},{"s":21204130,"l":3156843,"o":"3b281271f7c0e17f533fe5edc0f79b31"},{"s":24360973,"l":8388608,"o":"4fb9395a4790fb0b6c5f0b91f102e9ab"},{"s":32749581,"l":8388608,"o":"bf0cfa2796354f0c74ee725af7a6824b"},{"s":41138189,"l":5788370,"o":"ecb6672792bfb433886b6e57d055ecd7"},{"s":46926559,"l":3828331,"o":"ac49ad086654c624f1e86a3d46ebdf04"},{"s":50754890,"l":6544699,"o":"951b34fddcc2cc679b23b074dabc7e4e"},{"s":57299589,"l":2523488,"o":"47965162d4ebc46b25a965854d4921d3"},{"s":59823077,"l":3510947,"o":"83d6c1f3ab9695075b93eeab6cc0761c"},{"s":63334024,"l":3239328,"o":"a8aa9f5ed5357520f0c0b04cb65293ec"},{"s":66573352,"l":8388608,"o":"9ca2f0ff2e50219759b4c07971ea4e84"},{"s":74961960,"l":3737528,"o":"5eaddb02c217c1d455078c858ae3ff96"},{"s":78699488,"l":2382189,"o":"513adbee65ed3f13fc6a6a27c1b683d1"},{"s":81081677,"l":3145876,"o":"a5968eb3ad727f4a6b263541a7847c7e"},{"s":84227553,"l":4302790,"o":"58929275a937192f01b1af8526c25cad"},{"s":88530343,"l":3795820,"o":"d2adf1e91029b37450ef988ff88bd861"},{"s":92326163,"l":8388608,"o":"9a14d257b93a9011a8d133ee3cd0c5bc"},{"s":100714771,"l":3885115,"o":"3ce2122c512d00744ab065ef8d782fe6"},{"s":104599886,"l":2109875,"o":"501a69a59ee5f3dd1b2c8add2fdc5cf8"},{"s":106709761,"l":6656155,"o":"6ba38db7fb389339b41dde4e8097e4ab"},{"s":113365916,"l":3789867,"o":"7b594f73ab9e3ad736aede2d1964e4e9"},{"s":117155783,"l":4156979,"o":"7215d07ec33b442aee52bd50234bf03d"},{"s":121312762,"l":4089475,"o":"d1ef2d9e330b11eec9365fefdc5434eb"},{"s":125402237,"l":8388608,"o":"38969b3114caf31a3501b34109063c25"},{"s":133790845,"l":8388608,"o":"cb1cf30e75d0fbbe058db1b8394e6e03"},{"s":142179453,"l":3645601,"o":"975e2cdb9ccbf36e3012a715c2a596de"},{"s":145825054,"l":2546129,"o":"2e2b6b2e98fbfcdc1855f5f36d8c2fb7"},{"s":148371183,"l":2830247,"o":"535dffb5b1df8f5f6f8d9787d961f81e"},{"s":151201430,"l":7158506,"o":"f953277da0845c6fe42d0e115219e6d6"},{"s":158359936,"l":2705426,"o":"83130d0e230071c5a94d38e3e94cf326"},{"s":161065362,"l":7085401,"o":"6b75fb5f5ab5728282bb043cf6d96cd3"},{"s":168150763,"l":5357359,"o":"431c63e39c20b879e517861acf12091f"},{"s":173508122,"l":5426372,"o":"0f329762d79c6948261dcde8fa26b3b8"},{"s":178934494,"l":6322719,"o":"dc8c1d8c09c0ce783e932ae2279c3db5"},{"s":185257213,"l":8388608,"o":"b5cb9fc5464c30f7bacfda0e5381ae91"},{"s":193645821,"l":3711229,"o":"494f1e15cfea3ab09523a391df0fbebc"},{"s":197357050,"l":6853193,"o":"a0c91d2654cfd2b4ca34542bb4b5d926"},{"s":204210243,"l":2645205,"o":"1cfcab6023b83e32c284c8eb1310f34c"},{"s":206855448,"l":5775640,"o":"84baf20ed2f84ba09f317028a366532d"},{"s":212631088,"l":2698898,"o":"7a6746a097f4506956f5e8d56eee6873"},{"s":215329986,"l":3444532,"o":"b11be0bf84341a0cbcd46ca14b6fed6d"},{"s":218774518,"l":5042437,"o":"3bc63ab43d9b7c19b42d51508f449b8b"},{"s":223816955,"l":4407710,"o":"f4cb0dcb6ad0d1d17c52ef7f5654d7b9"},{"s":228224665,"l":3288967,"o":"0a9254bb39e95e9a93c30b10f03e2f2a"},{"s":231513632,"l":6818881,"o":"fa22cfbe6caebb301dc4eae4d8d13a9b"},{"s":238332513,"l":4224104,"o":"29a1316a5157b0a3359b2760cbd0895c"},{"s":242556617,"l":4427385,"o":"0efe5d26d520d4ab114fcddb8d1a1931"},{"s":246984002,"l":3625567,"o":"8e6b4a4e1acc6100a271a9100518ff77"},{"s":250609569,"l":5412145,"o":"d3988a71021a70c0ff69eb0d80dca0c8"},{"s":256021714,"l":8388608,"o":"0b5c245c16e8fb845358f75a2f984585"},{"s":264410322,"l":8388608,"o":"70d149b1ec039dc716ae3b524f1ef0f8"},{"s":272798930,"l":5295221,"o":"a081eb5227d37e8d00343c450bc12117"},{"s":278094151,"l":3320852,"o":"7394c656b6278445ad39189dec6896f8"},{"s":281415003,"l":4569639,"o":"9e80f48dc5aa9378d1c4206d17dc3116"},{"s":285984642,"l":3227911,"o":"bd486cf43401ef78ae1199c6c18cb424"},{"s":289212553,"l":4408113,"o":"f73c366a16745ca5fe823c4074e026b4"},{"s":293620666,"l":5806890,"o":"fba0357b2a79b20ba3b942c0f22d545b"},{"s":299427556,"l":8388608,"o":"6e805d1757fa230794ab8445d377c832"},{"s":307816164,"l":5026069,"o":"88e75d7ba957fbe150e5c49a501540a6"},{"s":312842233,"l":8388608,"o":"17e65917f54e4e0b454c93eb08a8c342"},{"s":321230841,"l":2416356,"o":"e65ce9c2efe34ea01d015c737abc060a"},{"s":323647197,"l":2129020,"o":"b89cb59bb69a32e865d9afbf454d080e"},{"s":325776217,"l":6264283,"o":"6a80f62763f33d2946844ef3a8755517"},{"s":332040500,"l":7998871,"o":"59bce9d16094aef2e07f98098039bd91"},{"s":340039371,"l":3760705,"o":"53b191c6dfb41134b3430343438bf4ae"},{"s":343800076,"l":8388608,"o":"8d8945a17b9a819d03f414a337c2e47d"},{"s":352188684,"l":4370796,"o":"d216de504cdbc7a598c067e49f26c69b"},{"s":356559480,"l":8388608,"o":"e6f7e4cce390627c7030a9774ed885b1"},{"s":364948088,"l":4673010,"o":"32865f3c19fcf194e7fde39ef2e6aa28"},{"s":369621098,"l":8388608,"o":"26139bd21b4581d4b97be682f13005c9"},{"s":378009706,"l":3305716,"o":"5fe7a3d8d80e4dc367021ece1130b203"},{"s":381315422,"l":8388608,"o":"00a029bd5a9a63cde2ba9d25ebea11f7"},{"s":389704030,"l":8388608,"o":"67c10d19567b60a4193ab73bfc77ae99"},{"s":398092638,"l":5533146,"o":"045bcfb7416579d060c10f82946eae1b"},{"s":403625784,"l":8388608,"o":"72cda208c56f5c7bbfc99b65889bfc80"},{"s":412014392,"l":3760763,"o":"6cb3f59c8823c049e222b58c8c155d1e"},{"s":415775155,"l":3552185,"o":"d71b9f954d280b03f54c90db61168fc2"},{"s":419327340,"l":8388608,"o":"66df8620bdd389b079cc0334c4fb0f04"},{"s":427715948,"l":3653017,"o":"796520ac43adcaec6117760fc2699b78"},{"s":431368965,"l":2935638,"o":"01fea89a93279431a0a7f5188ceefed1"},{"s":434304603,"l":2820579,"o":"c9b3a1868f00f55d90cf02aa3c877b05"},{"s":437125182,"l":8388608,"o":"d77d35d2ead1595aedc25a65069e8d88"},{"s":445513790,"l":7407288,"o":"2297b4fb6ca3959a7fb0220e358a9778"},{"s":452921078,"l":7891558,"o":"a2cd30afaafcb844405eb6f048323bbc"},{"s":460812636,"l":3191130,"o":"ba6b77fc177cf223b1d50bf330ebf8ce"},{"s":464003766,"l":7565430,"o":"ea273aa565f457e94beca5e1d20ec068"},{"s":471569196,"l":3419794,"o":"eedd34de4ae36993f04f75ebc3c9a165"},{"s":474988990,"l":3460292,"o":"2a851cea2d84ca661b3eebf72cf0de55"},{"s":478449282,"l":8032042,"o":"b402c287796218ddf5d3fff2e70eb2c7"},{"s":486481324,"l":6320993,"o":"6fec73dd933316685cc3de99b6c0be66"},{"s":492802317,"l":2960958,"o":"386bfb6cf878efc2881aacfef8c8c22d"},{"s":495763275,"l":4043015,"o":"eaa10fc56a85813899e15e87ba458c90"},{"s":499806290,"l":2220895,"o":"94e8e439c139f120d514d248cb1d37b7"},{"s":502027185,"l":2318042,"o":"ccd572f48087ee0dce5af0d1823279cf"},{"s":504345227,"l":3396237,"o":"c1080ad8f97a38eaa3754023d0ff616c"},{"s":507741464,"l":3761426,"o":"abd1cc7cb7332535f1672e1fd0b48967"},{"s":511502890,"l":3313883,"o":"030705ce77d9eb02d3e91fa7a2f5ee16"},{"s":514816773,"l":4643444,"o":"56c1e4ca5e2bc64d1744e6645f16fec2"},{"s":519460217,"l":4877742,"o":"83f88295b8539647b759aab1e7588a5f"},{"s":524337959,"l":2731173,"o":"d3fc29a18a49f05f5320592f043b3898"},{"s":527069132,"l":4388381,"o":"0d206d6e7240945ccc2900814604e55d"},{"s":531457513,"l":4198048,"o":"87c54dab1f99b6b44e4193e4e7cbf6b1"},{"s":535655561,"l":8300001,"o":"d1d2be80c5e1942e8742481df1acc022"},{"s":543955562,"l":2103894,"o":"213b91aeb37f106cd97e29d23306d492"},{"s":546059456,"l":3464612,"o":"0cec1bb256cb1f37b65339ee4df7eaa4"},{"s":549524068,"l":6456134,"o":"5b21a9c34210b23e0d1711ffb467e694"},{"s":555980202,"l":4180529,"o":"f77ebea3c198350bb255bdfc0fdf6a36"},{"s":560160731,"l":8388608,"o":"9893ebd1ef51a280861b1168f9e838af"},{"s":568549339,"l":3672532,"o":"40f3c47adb19bec122d9647e1b7986ad"},{"s":572221871,"l":4686009,"o":"ffa5697af8444e22bdf05cd7f7b4e211"},{"s":576907880,"l":8388608,"o":"3ee328d1cb9f862a928198ecb28ae7b6"},{"s":585296488,"l":3117981,"o":"cbdb5e9e2390e031571567ffaf81ba08"},{"s":588414469,"l":8388608,"o":"9212fbcd5b2c5b09475f387b7a54d25c"},{"s":596803077,"l":8388608,"o":"5f06b16231dd3038abe59ddf17789e89"},{"s":605191685,"l":5345215,"o":"b22a5da98d6a3909d5e171998abfdc13"},{"s":610536900,"l":8388608,"o":"93db1f2b3e5272fffc3d644ec00f1463"},{"s":618925508,"l":7526887,"o":"d2b612202fa49f2fd059f76057183fd9"},{"s":626452395,"l":6650357,"o":"5863fec408b1aa89ccf1c77a1e29061e"},{"s":633102752,"l":8388608,"o":"4295a43614c097a8a4f72bb1f8d3cf3a"},{"s":641491360,"l":2281701,"o":"13e34075d962bcfdb89dcbd5b766aee6"},{"s":643773061,"l":4494718,"o":"b6cc56aba7510b753a3dae94428b62ff"},{"s":648267779,"l":6378335,"o":"9a8a3c3fe94e205523e40b2ed7eb902b"},{"s":654646114,"l":8388608,"o":"2636ee206c0a3c3b099b3f9f2e36eec6"},{"s":663034722,"l":8388608,"o":"e6323f8542eb34ad197099074b08ff55"},{"s":671423330,"l":8388608,"o":"66f6a6485ac08085328996b28ced7452"},{"s":679811938,"l":7119415,"o":"170721a5d1a9728df40deedcb5bde060"},{"s":686931353,"l":2960051,"o":"f52f94fbaf8d101e633c545b5b0cdf24"},{"s":689891404,"l":4571243,"o":"cc47bfaa5b6d54dd863bc714cc607f82"},{"s":694462647,"l":7146332,"o":"331722c804700da0c4fa4c43d04aa56a"},{"s":701608979,"l":5152399,"o":"f4668768e6c15d00b8d02c1d20faecca"},{"s":706761378,"l":8388608,"o":"593addeedf8da213289758348e05567c"},{"s":715149986,"l":8388608,"o":"388715dd8b32f2088572c7703302b596"},{"s":723538594,"l":4120402,"o":"0947e4864bd26230e26406f117b18d4c"},{"s":727658996,"l":8103740,"o":"ae3062a4e74d4a407b944c895dfe1f95"},{"s":735762736,"l":4037896,"o":"2fb24ad127cbe65fc704cfdd15d3e4c2"},{"s":739800632,"l":6316726,"o":"6f21491d81b688d5efbe0ff22e35e05b"},{"s":746117358,"l":3007919,"o":"eaa42376365bad6707f4c11c204d65eb"},{"s":749125277,"l":5262875,"o":"321847ff2d9c62f7f2c6db3914327756"},{"s":754388152,"l":4462123,"o":"c565fa31ef90fc2c196d9cde44095597"},{"s":758850275,"l":5294675,"o":"c6baec6e22d1c604a04d887aeed1fd82"},{"s":764144950,"l":2912994,"o":"1327ac0489a8e76c1fbebe5b561ca6b4"},{"s":767057944,"l":2962702,"o":"97fc763b782a57f9fd542f4ab7657a85"},{"s":770020646,"l":8388608,"o":"1ca3bce935b5d306be767a9c89cf0026"},{"s":778409254,"l":365274,"o":"484b0358354388fdd16d9ea2cfe9260d"}]}`)) //nolint:lll +} diff --git a/repo/format/upgrade_lock_test.go b/repo/format/upgrade_lock_test.go index e1a7017862f..fb354b7d8b4 100644 --- a/repo/format/upgrade_lock_test.go +++ b/repo/format/upgrade_lock_test.go @@ -401,7 +401,7 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) { t.Helper() - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := w.Write(data) require.NoError(t, err, testCaseID) diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index e5caa9ca330..dc27ea28ec7 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp } // ConcatenateObjects creates a concatenated objects from the provided object IDs. -func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { +func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) { //nolint:wrapcheck - return r.omgr.Concatenate(ctx, objectIDs) + return r.omgr.Concatenate(ctx, objectIDs, comp) } // maybeRetry executes the provided callback with or without automatic retries depending on how @@ -721,7 +721,7 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data gather.Byt } // we will be writing asynchronously and server will reject this write, fail early. - if prefix == manifest.ContentPrefix { + if prefix == content.ManifestContentPrefix { return content.EmptyID, errors.Errorf("writing manifest contents not allowed") } diff --git a/repo/maintenance/blob_gc_test.go b/repo/maintenance/blob_gc_test.go index e5ba92d5bad..a1b22e05988 100644 --- a/repo/maintenance/blob_gc_test.go +++ b/repo/maintenance/blob_gc_test.go @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) { nro.BlockFormat.HMACSecret = testHMACSecret }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() diff --git a/repo/maintenance/blob_retain_test.go b/repo/maintenance/blob_retain_test.go index d4fe26b1346..58464e6b25d 100644 --- a/repo/maintenance/blob_retain_test.go +++ b/repo/maintenance/blob_retain_test.go @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) { nro.RetentionPeriod = period }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() @@ -98,7 +98,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing nro.BlockFormat.HMACSecret = testHMACSecret }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() diff --git a/repo/maintenance/content_rewrite_test.go b/repo/maintenance/content_rewrite_test.go index 2079279bb3a..e7e02b8ceee 100644 --- a/repo/maintenance/content_rewrite_test.go +++ b/repo/maintenance/content_rewrite_test.go @@ -79,7 +79,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) { // run N sessions to create N individual pack blobs for each content prefix for range tc.numPContents { require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "%v", uuid.NewString()) _, err := ow.Result() return err @@ -88,7 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) { for range tc.numQContents { require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "%v", uuid.NewString()) _, err := ow.Result() return err diff --git a/repo/maintenance/maintenance_safety_test.go b/repo/maintenance/maintenance_safety_test.go index 5a303240129..ccd34c96e94 100644 --- a/repo/maintenance/maintenance_safety_test.go +++ b/repo/maintenance/maintenance_safety_test.go @@ -34,7 +34,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) { // create object that's immediately orphaned since nobody refers to it. require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "hello world") var err error objectID, err = ow.Result() @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) { // create another object in separate pack. require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "hello universe") _, err := ow.Result() return err diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index d955ce724d2..40b6525f4a5 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -117,7 +117,7 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri mustSucceed(gz.Flush()) mustSucceed(gz.Close()) - contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression) + contentID, err := m.b.WriteContent(ctx, buf.Bytes(), content.ManifestContentPrefix, content.NoCompression) if err != nil { return nil, errors.Wrap(err, "unable to write content") } @@ -145,7 +145,7 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte manifests = map[content.ID]manifest{} err := m.b.IterateContents(ctx, content.IterateOptions{ - Range: index.PrefixRange(ContentPrefix), + Range: index.PrefixRange(content.ManifestContentPrefix), Parallel: manifestLoadParallelism, }, func(ci content.Info) error { man, err := loadManifestContent(ctx, m.b, ci.ContentID) diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 8bb2f39f433..acf6bb20cc3 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -23,6 +23,8 @@ import ( const ( manifestLoadParallelism = 8 manifestIDLength = 16 + + autoCompactionContentCountDefault = 16 ) var log = logging.Module("kopia/manifest") // +checklocksignore @@ -30,12 +32,6 @@ var log = logging.Module("kopia/manifest") // +checklocksignore // ErrNotFound is returned when the metadata item is not found. var ErrNotFound = errors.New("not found") -// ContentPrefix is the prefix of the content id for manifests. -const ( - ContentPrefix = "m" - autoCompactionContentCountDefault = 16 -) - // TypeLabelKey is the label key for manifest type. const TypeLabelKey = "type" diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 9c567d8859c..c3312f2c659 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -125,7 +125,7 @@ func TestManifest(t *testing.T) { if err := mgr.b.IterateContents( ctx, - content.IterateOptions{Range: index.PrefixRange(ContentPrefix)}, + content.IterateOptions{Range: index.PrefixRange(content.ManifestContentPrefix)}, func(ci content.Info) error { foundContents++ return nil @@ -447,7 +447,7 @@ func getManifestContentCount(ctx context.Context, t *testing.T, mgr *Manager) in if err := mgr.b.IterateContents( ctx, - content.IterateOptions{Range: index.PrefixRange(ContentPrefix)}, + content.IterateOptions{Range: index.PrefixRange(content.ManifestContentPrefix)}, func(ci content.Info) error { foundContents++ return nil diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index 213069b3d6c..923aa64319a 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer { w.description = opt.Description w.prefix = opt.Prefix w.compressor = compression.ByName[opt.Compressor] + w.metadataCompressor = compression.ByName[opt.MetadataCompressor] w.totalLength = 0 w.currentPosition = 0 @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) { // in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific, // this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB), // so this method should only be used for very large files where this overhead is relatively small. -func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) { +func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) { if len(objectIDs) == 0 { return EmptyID, errors.Errorf("empty list of objects") } @@ -131,8 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength) w := om.NewWriter(ctx, WriterOptions{ - Prefix: indirectContentPrefix, - Description: "CONCATENATED INDEX", + Prefix: indirectContentPrefix, + Description: "CONCATENATED INDEX", + Compressor: metadataComp, + MetadataCompressor: metadataComp, }) defer w.Close() //nolint:errcheck diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 797c44957eb..e750b3577d6 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -88,7 +88,7 @@ func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content. defer f.mu.Unlock() if d, ok := f.data[contentID]; ok { - return content.Info{ContentID: contentID, PackedLength: uint32(len(d))}, nil + return content.Info{ContentID: contentID, PackedLength: uint32(len(d)), CompressionHeaderID: f.compresionIDs[contentID]}, nil } return content.Info{}, blob.ErrBlobNotFound @@ -175,18 +175,43 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) { _, _, om := setupTest(t, cmap) w := om.NewWriter(ctx, WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)) oid, err := w.Result() require.NoError(t, err) cid, isCompressed, ok := oid.ContentID() + require.True(t, ok) require.False(t, isCompressed) // oid will not indicate compression require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid]) } +func TestCompression_IndirectContentCompressionEnabledMetadata(t *testing.T) { + ctx := testlogging.Context(t) + + cmap := map[content.ID]compression.HeaderID{} + _, _, om := setupTest(t, cmap) + w := om.NewWriter(ctx, WriterOptions{ + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", + }) + w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000000)) + oid, err := w.Result() + require.NoError(t, err) + verifyIndirectBlock(ctx, t, om, oid, compression.HeaderZstdFastest) + + w2 := om.NewWriter(ctx, WriterOptions{ + MetadataCompressor: "none", + }) + w2.Write(bytes.Repeat([]byte{5, 6, 7, 8}, 1000000)) + oid2, err2 := w2.Result() + require.NoError(t, err2) + verifyIndirectBlock(ctx, t, om, oid2, content.NoCompression) +} + func TestCompression_CustomSplitters(t *testing.T) { cases := []struct { wo WriterOptions @@ -244,7 +269,8 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) { _, _, om := setupTest(t, nil) w := om.NewWriter(ctx, WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)) oid, err := w.Result() @@ -409,7 +435,7 @@ func verifyNoError(t *testing.T, err error) { require.NoError(t, err) } -func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) { +func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID, expectedComp compression.HeaderID) { t.Helper() for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() { @@ -418,6 +444,11 @@ func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) if !c.HasPrefix() { t.Errorf("expected base content ID to be prefixed, was %v", c) } + info, err := om.contentMgr.ContentInfo(ctx, c) + if err != nil { + t.Errorf("error getting content info for %v", err.Error()) + } + require.Equal(t, expectedComp, info.CompressionHeaderID) } rd, err := Open(ctx, om.contentMgr, indexContentID) @@ -443,6 +474,7 @@ func TestIndirection(t *testing.T) { dataLength int expectedBlobCount int expectedIndirection int + metadataCompressor compression.Name }{ {dataLength: 200, expectedBlobCount: 1, expectedIndirection: 0}, {dataLength: 1000, expectedBlobCount: 1, expectedIndirection: 0}, @@ -452,15 +484,18 @@ func TestIndirection(t *testing.T) { // 1 blob of 1000 zeros + 1 index blob {dataLength: 4000, expectedBlobCount: 2, expectedIndirection: 1}, // 1 blob of 1000 zeros + 1 index blob - {dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1}, + {dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "none"}, + // 1 blob of 1000 zeros + 1 index blob, enabled metadata compression + {dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "zstd-fastest"}, } for _, c := range cases { - data, _, om := setupTest(t, nil) + cmap := map[content.ID]compression.HeaderID{} + data, _, om := setupTest(t, cmap) contentBytes := make([]byte, c.dataLength) - writer := om.NewWriter(ctx, WriterOptions{}) + writer := om.NewWriter(ctx, WriterOptions{MetadataCompressor: c.metadataCompressor}) writer.(*objectWriter).splitter = splitterFactory() if _, err := writer.Write(contentBytes); err != nil { @@ -491,7 +526,11 @@ func TestIndirection(t *testing.T) { t.Errorf("invalid blob count for %v, got %v, wanted %v", result, got, want) } - verifyIndirectBlock(ctx, t, om, result) + expectedCompressor := content.NoCompression + if len(c.metadataCompressor) > 0 && c.metadataCompressor != "none" { + expectedCompressor = compression.ByName[c.metadataCompressor].HeaderID() + } + verifyIndirectBlock(ctx, t, om, result, expectedCompressor) } } @@ -578,7 +617,7 @@ func TestConcatenate(t *testing.T) { } for _, tc := range cases { - concatenatedOID, err := om.Concatenate(ctx, tc.inputs) + concatenatedOID, err := om.Concatenate(ctx, tc.inputs, "zstd-fastest") if err != nil { t.Fatal(err) } @@ -617,7 +656,7 @@ func TestConcatenate(t *testing.T) { } // make sure results of concatenation can be further concatenated. - concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}) + concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}, "zstd-fastest") if err != nil { t.Fatal(err) } diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 5a260f9c3af..85b51c32896 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -68,7 +68,8 @@ type objectWriter struct { om *Manager - compressor compression.Compressor + compressor compression.Compressor + metadataCompressor compression.Compressor prefix content.IDPrefix buffer gather.WriteBuffer @@ -197,6 +198,13 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte objectComp = nil } + // metadata objects are ALWAYS compressed at the content layer, irrespective of the index version (1 or 1+). + // even if a compressor for metadata objects is set by the caller, do not compress the objects at this layer; + // instead, let it be handled at the content layer. + if w.prefix != "" { + objectComp = nil + } + // contentBytes is what we're going to write to the content manager, it potentially uses bytes from b contentBytes, isCompressed, err := maybeCompressedContentBytes(objectComp, data, &b) if err != nil { @@ -292,12 +300,13 @@ func (w *objectWriter) checkpointLocked() (ID, error) { } iw := &objectWriter{ - ctx: w.ctx, - om: w.om, - compressor: nil, - description: "LIST(" + w.description + ")", - splitter: w.om.newDefaultSplitter(), - prefix: w.prefix, + ctx: w.ctx, + om: w.om, + compressor: w.metadataCompressor, + metadataCompressor: w.metadataCompressor, + description: "LIST(" + w.description + ")", + splitter: w.om.newDefaultSplitter(), + prefix: w.prefix, } if iw.prefix == "" { @@ -334,9 +343,10 @@ func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error { // WriterOptions can be passed to Repository.NewWriter(). type WriterOptions struct { - Description string - Prefix content.IDPrefix // empty string or a single-character ('g'..'z') - Compressor compression.Name - Splitter string // use particular splitter instead of default - AsyncWrites int // allow up to N content writes to be asynchronous + Description string + Prefix content.IDPrefix // empty string or a single-character ('g'..'z') + Compressor compression.Name + MetadataCompressor compression.Name + Splitter string // use particular splitter instead of default + AsyncWrites int // allow up to N content writes to be asynchronous } diff --git a/repo/repo_benchmarks_test.go b/repo/repo_benchmarks_test.go index 13d0105d9d5..abfa90b2a8f 100644 --- a/repo/repo_benchmarks_test.go +++ b/repo/repo_benchmarks_test.go @@ -15,7 +15,7 @@ func BenchmarkWriterDedup1M(b *testing.B) { ctx, env := repotesting.NewEnvironment(b, format.FormatVersion2) dataBuf := make([]byte, 4<<20) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(dataBuf) _, err := writer.Result() require.NoError(b, err) @@ -25,7 +25,7 @@ func BenchmarkWriterDedup1M(b *testing.B) { for range b.N { // write exactly the same data - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(dataBuf) writer.Result() writer.Close() @@ -45,7 +45,7 @@ func BenchmarkWriterNoDedup1M(b *testing.B) { for i := range b.N { // write exactly the same data - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if i+chunkSize > len(dataBuf) { chunkSize++ diff --git a/repo/repository.go b/repo/repository.go index e9451eda145..9616073d7b0 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -14,6 +14,7 @@ import ( "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/throttling" + "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/content/indexblob" "github.com/kopia/kopia/repo/format" @@ -47,7 +48,7 @@ type RepositoryWriter interface { Repository NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer - ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) + ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) DeleteManifest(ctx context.Context, id manifest.ID) error @@ -180,9 +181,9 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write } // ConcatenateObjects creates a concatenated objects from the provided object IDs. -func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { +func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) { //nolint:wrapcheck - return r.omgr.Concatenate(ctx, objectIDs) + return r.omgr.Concatenate(ctx, objectIDs, comp) } // DisableIndexRefresh disables index refresh for the duration of the write session. diff --git a/repo/repository_test.go b/repo/repository_test.go index cf8b17cedd2..5e67cc3ed5d 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestWriters(t *testing.T) { for _, c := range cases { ctx, env := repotesting.NewEnvironment(t, s.formatVersion) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if _, err := writer.Write(c.data); err != nil { t.Fatalf("write error: %v", err) } @@ -74,7 +74,7 @@ func (s *formatSpecificTestSuite) TestWriterCompleteChunkInTwoWrites(t *testing. ctx, env := repotesting.NewEnvironment(t, s.formatVersion) b := make([]byte, 100) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(b[0:50]) writer.Write(b[0:50]) result, err := writer.Result() @@ -159,7 +159,7 @@ func (s *formatSpecificTestSuite) TestHMAC(t *testing.T) { c := bytes.Repeat([]byte{0xcd}, 50) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) w.Write(c) result, err := w.Result() @@ -185,7 +185,7 @@ func (s *formatSpecificTestSuite) TestReaderStoredBlockNotFound(t *testing.T) { func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) object.ID { t.Helper() - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if _, err := w.Write(data); err != nil { t.Fatalf("can't write object %q - write failed: %v", testCaseID, err) } @@ -275,7 +275,7 @@ func TestFormats(t *testing.T) { for k, v := range c.oids { bytesToWrite := []byte(k) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) w.Write(bytesToWrite) oid, err := w.Result() @@ -555,7 +555,7 @@ func TestObjectWritesWithRetention(t *testing.T) { }, }) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := writer.Write([]byte("the quick brown fox jumps over the lazy dog")) require.NoError(t, err) @@ -774,7 +774,8 @@ func TestMetrics_CompressibleData(t *testing.T) { for ensureMapEntry(t, env.RepositoryMetrics().Snapshot(false).Counters, "content_write_duration_nanos") < 5e6 { w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(inputData) diff --git a/snapshot/policy/compression_policy.go b/snapshot/policy/compression_policy.go index 4c782272020..22623b77fa0 100644 --- a/snapshot/policy/compression_policy.go +++ b/snapshot/policy/compression_policy.go @@ -20,6 +20,11 @@ type CompressionPolicy struct { MaxSize int64 `json:"maxSize,omitempty"` } +// MetadataCompressionPolicy specifies compression policy for metadata. +type MetadataCompressionPolicy struct { + CompressorName compression.Name `json:"compressorName,omitempty"` +} + // CompressionPolicyDefinition specifies which policy definition provided the value of a particular field. type CompressionPolicyDefinition struct { CompressorName snapshot.SourceInfo `json:"compressorName,omitempty"` @@ -29,6 +34,11 @@ type CompressionPolicyDefinition struct { MaxSize snapshot.SourceInfo `json:"maxSize,omitempty"` } +// MetadataCompressionPolicyDefinition specifies which policy definition provided the value of a particular field. +type MetadataCompressionPolicyDefinition struct { + CompressorName snapshot.SourceInfo `json:"compressorName,omitempty"` +} + // CompressorForFile returns compression name to be used for compressing a given file according to policy, using attributes such as name or size. func (p *CompressionPolicy) CompressorForFile(e fs.Entry) compression.Name { ext := filepath.Ext(e.Name()) @@ -67,6 +77,20 @@ func (p *CompressionPolicy) Merge(src CompressionPolicy, def *CompressionPolicyD mergeStrings(&p.NeverCompress, &p.NoParentNeverCompress, src.NeverCompress, src.NoParentNeverCompress, &def.NeverCompress, si) } +// Merge applies default values from the provided policy. +func (p *MetadataCompressionPolicy) Merge(src MetadataCompressionPolicy, def *MetadataCompressionPolicyDefinition, si snapshot.SourceInfo) { + mergeCompressionName(&p.CompressorName, src.CompressorName, &def.CompressorName, si) +} + +// MetadataCompressor returns compression name to be used for according to policy. +func (p *MetadataCompressionPolicy) MetadataCompressor() compression.Name { + if p.CompressorName == "none" { + return "" + } + + return p.CompressorName +} + func isInSortedSlice(s string, slice []string) bool { x := sort.SearchStrings(slice, s) return x < len(slice) && slice[x] == s diff --git a/snapshot/policy/policy.go b/snapshot/policy/policy.go index c7c82fbbc99..bb26ca309e5 100644 --- a/snapshot/policy/policy.go +++ b/snapshot/policy/policy.go @@ -21,33 +21,35 @@ type TargetWithPolicy struct { // Policy describes snapshot policy for a single source. type Policy struct { - Labels map[string]string `json:"-"` - RetentionPolicy RetentionPolicy `json:"retention,omitempty"` - FilesPolicy FilesPolicy `json:"files,omitempty"` - ErrorHandlingPolicy ErrorHandlingPolicy `json:"errorHandling,omitempty"` - SchedulingPolicy SchedulingPolicy `json:"scheduling,omitempty"` - CompressionPolicy CompressionPolicy `json:"compression,omitempty"` - SplitterPolicy SplitterPolicy `json:"splitter,omitempty"` - Actions ActionsPolicy `json:"actions,omitempty"` - OSSnapshotPolicy OSSnapshotPolicy `json:"osSnapshots,omitempty"` - LoggingPolicy LoggingPolicy `json:"logging,omitempty"` - UploadPolicy UploadPolicy `json:"upload,omitempty"` - NoParent bool `json:"noParent,omitempty"` + Labels map[string]string `json:"-"` + RetentionPolicy RetentionPolicy `json:"retention,omitempty"` + FilesPolicy FilesPolicy `json:"files,omitempty"` + ErrorHandlingPolicy ErrorHandlingPolicy `json:"errorHandling,omitempty"` + SchedulingPolicy SchedulingPolicy `json:"scheduling,omitempty"` + CompressionPolicy CompressionPolicy `json:"compression,omitempty"` + MetadataCompressionPolicy MetadataCompressionPolicy `json:"metadataCompression,omitempty"` + SplitterPolicy SplitterPolicy `json:"splitter,omitempty"` + Actions ActionsPolicy `json:"actions,omitempty"` + OSSnapshotPolicy OSSnapshotPolicy `json:"osSnapshots,omitempty"` + LoggingPolicy LoggingPolicy `json:"logging,omitempty"` + UploadPolicy UploadPolicy `json:"upload,omitempty"` + NoParent bool `json:"noParent,omitempty"` } // Definition corresponds 1:1 to Policy and each field specifies the snapshot.SourceInfo // where a particular policy field was specified. type Definition struct { - RetentionPolicy RetentionPolicyDefinition `json:"retention,omitempty"` - FilesPolicy FilesPolicyDefinition `json:"files,omitempty"` - ErrorHandlingPolicy ErrorHandlingPolicyDefinition `json:"errorHandling,omitempty"` - SchedulingPolicy SchedulingPolicyDefinition `json:"scheduling,omitempty"` - CompressionPolicy CompressionPolicyDefinition `json:"compression,omitempty"` - SplitterPolicy SplitterPolicyDefinition `json:"splitter,omitempty"` - Actions ActionsPolicyDefinition `json:"actions,omitempty"` - OSSnapshotPolicy OSSnapshotPolicyDefinition `json:"osSnapshots,omitempty"` - LoggingPolicy LoggingPolicyDefinition `json:"logging,omitempty"` - UploadPolicy UploadPolicyDefinition `json:"upload,omitempty"` + RetentionPolicy RetentionPolicyDefinition `json:"retention,omitempty"` + FilesPolicy FilesPolicyDefinition `json:"files,omitempty"` + ErrorHandlingPolicy ErrorHandlingPolicyDefinition `json:"errorHandling,omitempty"` + SchedulingPolicy SchedulingPolicyDefinition `json:"scheduling,omitempty"` + CompressionPolicy CompressionPolicyDefinition `json:"compression,omitempty"` + MetadataCompressionPolicy MetadataCompressionPolicyDefinition `json:"metadataCompression,omitempty"` + SplitterPolicy SplitterPolicyDefinition `json:"splitter,omitempty"` + Actions ActionsPolicyDefinition `json:"actions,omitempty"` + OSSnapshotPolicy OSSnapshotPolicyDefinition `json:"osSnapshots,omitempty"` + LoggingPolicy LoggingPolicyDefinition `json:"logging,omitempty"` + UploadPolicy UploadPolicyDefinition `json:"upload,omitempty"` } func (p *Policy) String() string { diff --git a/snapshot/policy/policy_merge.go b/snapshot/policy/policy_merge.go index 32b40dc4cba..d724f6fe9e1 100644 --- a/snapshot/policy/policy_merge.go +++ b/snapshot/policy/policy_merge.go @@ -24,6 +24,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini merged.SchedulingPolicy.Merge(p.SchedulingPolicy, &def.SchedulingPolicy, p.Target()) merged.UploadPolicy.Merge(p.UploadPolicy, &def.UploadPolicy, p.Target()) merged.CompressionPolicy.Merge(p.CompressionPolicy, &def.CompressionPolicy, p.Target()) + merged.MetadataCompressionPolicy.Merge(p.MetadataCompressionPolicy, &def.MetadataCompressionPolicy, p.Target()) merged.SplitterPolicy.Merge(p.SplitterPolicy, &def.SplitterPolicy, p.Target()) merged.Actions.Merge(p.Actions, &def.Actions, p.Target()) merged.OSSnapshotPolicy.Merge(p.OSSnapshotPolicy, &def.OSSnapshotPolicy, p.Target()) @@ -41,6 +42,7 @@ func MergePolicies(policies []*Policy, si snapshot.SourceInfo) (*Policy, *Defini merged.SchedulingPolicy.Merge(defaultSchedulingPolicy, &def.SchedulingPolicy, GlobalPolicySourceInfo) merged.UploadPolicy.Merge(defaultUploadPolicy, &def.UploadPolicy, GlobalPolicySourceInfo) merged.CompressionPolicy.Merge(defaultCompressionPolicy, &def.CompressionPolicy, GlobalPolicySourceInfo) + merged.MetadataCompressionPolicy.Merge(defaultMetadataCompressionPolicy, &def.MetadataCompressionPolicy, GlobalPolicySourceInfo) merged.SplitterPolicy.Merge(defaultSplitterPolicy, &def.SplitterPolicy, GlobalPolicySourceInfo) merged.Actions.Merge(defaultActionsPolicy, &def.Actions, GlobalPolicySourceInfo) merged.OSSnapshotPolicy.Merge(defaultOSSnapshotPolicy, &def.OSSnapshotPolicy, GlobalPolicySourceInfo) diff --git a/snapshot/policy/policy_tree.go b/snapshot/policy/policy_tree.go index 0aa463d6696..ca633635996 100644 --- a/snapshot/policy/policy_tree.go +++ b/snapshot/policy/policy_tree.go @@ -12,6 +12,9 @@ var ( defaultCompressionPolicy = CompressionPolicy{ CompressorName: "none", } + defaultMetadataCompressionPolicy = MetadataCompressionPolicy{ + CompressorName: "zstd-fastest", + } defaultSplitterPolicy = SplitterPolicy{} @@ -71,15 +74,16 @@ var ( // DefaultPolicy is a default policy returned by policy tree in absence of other policies. DefaultPolicy = &Policy{ - FilesPolicy: defaultFilesPolicy, - RetentionPolicy: defaultRetentionPolicy, - CompressionPolicy: defaultCompressionPolicy, - ErrorHandlingPolicy: defaultErrorHandlingPolicy, - SchedulingPolicy: defaultSchedulingPolicy, - LoggingPolicy: defaultLoggingPolicy, - Actions: defaultActionsPolicy, - OSSnapshotPolicy: defaultOSSnapshotPolicy, - UploadPolicy: defaultUploadPolicy, + FilesPolicy: defaultFilesPolicy, + RetentionPolicy: defaultRetentionPolicy, + CompressionPolicy: defaultCompressionPolicy, + MetadataCompressionPolicy: defaultMetadataCompressionPolicy, + ErrorHandlingPolicy: defaultErrorHandlingPolicy, + SchedulingPolicy: defaultSchedulingPolicy, + LoggingPolicy: defaultLoggingPolicy, + Actions: defaultActionsPolicy, + OSSnapshotPolicy: defaultOSSnapshotPolicy, + UploadPolicy: defaultUploadPolicy, } // DefaultDefinition provides the Definition for the default policy. diff --git a/snapshot/snapshotfs/dir_rewriter.go b/snapshot/snapshotfs/dir_rewriter.go index 2594ff68d3f..ab41d463f34 100644 --- a/snapshot/snapshotfs/dir_rewriter.go +++ b/snapshot/snapshotfs/dir_rewriter.go @@ -14,9 +14,11 @@ import ( "github.com/kopia/kopia/internal/impossible" "github.com/kopia/kopia/internal/workshare" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" ) var dirRewriterLog = logging.Module("dirRewriter") @@ -59,17 +61,18 @@ type DirRewriter struct { } type dirRewriterRequest struct { - ctx context.Context //nolint:containedctx - parentPath string - input *snapshot.DirEntry - result *snapshot.DirEntry - err error + ctx context.Context //nolint:containedctx + parentPath string + input *snapshot.DirEntry + result *snapshot.DirEntry + metadataCompression compression.Name + err error } func (rw *DirRewriter) processRequest(pool *workshare.Pool[*dirRewriterRequest], req *dirRewriterRequest) { _ = pool - req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input) + req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input, req.metadataCompression) } func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey { @@ -87,7 +90,7 @@ func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey return out } -func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { key := rw.getCacheKey(input) // see if we already processed this exact directory entry @@ -113,7 +116,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri // the rewriter returned a directory, we must recursively process it. if result.Type == snapshot.EntryTypeDirectory { - rep2, subdirErr := rw.processDirectory(ctx, parentPath, result) + rep2, subdirErr := rw.processDirectory(ctx, parentPath, result, metadataComp) if rep2 == nil { return nil, errors.Wrap(subdirErr, input.Name) } @@ -131,7 +134,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri return result, nil } -func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { dirRewriterLog(ctx).Debugw("processDirectory", "path", pathFromRoot) r, err := rw.rep.OpenObject(ctx, entry.ObjectID) @@ -145,10 +148,10 @@ func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string return rw.opts.OnDirectoryReadFailure(ctx, pathFromRoot, entry, errors.Wrap(err, "unable to read directory entries")) } - return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries) + return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries, metadataComp) } -func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { var ( builder DirManifestBuilder wg workshare.AsyncGroup[*dirRewriterRequest] @@ -165,6 +168,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s path.Join(parentPath, child.Name), child, nil, + metadataComp, nil, }) @@ -172,7 +176,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s } // run in current goroutine - replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child) + replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child, metadataComp) if repErr != nil { return nil, errors.Wrap(repErr, child.Name) } @@ -194,7 +198,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason) - oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm) + oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp) if err != nil { return nil, errors.Wrap(err, "unable to write directory manifest") } @@ -219,8 +223,8 @@ func (rw *DirRewriter) equalEntries(e1, e2 *snapshot.DirEntry) bool { } // RewriteSnapshotManifest rewrites the directory tree starting at a given manifest. -func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest) (bool, error) { - newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry) +func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest, metadataComp compression.Name) (bool, error) { + newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry, metadataComp) if err != nil { return false, errors.Wrapf(err, "error processing snapshot %v", man.ID) } @@ -273,7 +277,13 @@ func RewriteAsStub(rep repo.RepositoryWriter) RewriteFailedEntryCallback { return nil, errors.Wrap(err, "error writing stub contents") } - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + pol, _, _, err := policy.GetEffectivePolicy(ctx, rep, policy.GlobalPolicySourceInfo) + if err != nil { + return nil, errors.Wrap(err, "error getting policy") + } + + metadataCompressor := pol.MetadataCompressionPolicy.MetadataCompressor() + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: metadataCompressor}) n, err := buf.WriteTo(w) if err != nil { diff --git a/snapshot/snapshotfs/dir_writer.go b/snapshot/snapshotfs/dir_writer.go index f1409db13be..39556547a33 100644 --- a/snapshot/snapshotfs/dir_writer.go +++ b/snapshot/snapshotfs/dir_writer.go @@ -7,14 +7,17 @@ import ( "github.com/pkg/errors" "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" ) -func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest) (object.ID, error) { +func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) { writer := rep.NewObjectWriter(ctx, object.WriterOptions{ - Description: "DIR:" + dirRelativePath, - Prefix: objectIDPrefixDirectory, + Description: "DIR:" + dirRelativePath, + Prefix: objectIDPrefixDirectory, + Compressor: metadataComp, + MetadataCompressor: metadataComp, }) defer writer.Close() //nolint:errcheck diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 225a30f93f9..ae3911e4b42 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -161,12 +161,13 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis } comp := pol.CompressionPolicy.CompressorForFile(f) + metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor() splitterName := pol.SplitterPolicy.SplitterForFile(f) chunkSize := pol.UploadPolicy.ParallelUploadAboveSize.OrDefault(-1) if chunkSize < 0 || f.Size() <= chunkSize { // all data fits in 1 full chunks, upload directly - return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, splitterName) + return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, metadataComp, splitterName) } // we always have N+1 parts, first N are exactly chunkSize, last one has undetermined length @@ -191,11 +192,11 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis if wg.CanShareWork(u.workerPool) { // another goroutine is available, delegate to them wg.RunAsync(u.workerPool, func(_ *workshare.Pool[*uploadWorkItem], _ *uploadWorkItem) { - parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName) + parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName) }, nil) } else { // just do the work in the current goroutine - parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName) + parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName) } } @@ -206,10 +207,10 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis return nil, errors.Wrap(err, "error uploading parts") } - return concatenateParts(ctx, u.repo, f.Name(), parts) + return concatenateParts(ctx, u.repo, f.Name(), parts, metadataComp) } -func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry) (*snapshot.DirEntry, error) { +func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { var ( objectIDs []object.ID totalSize int64 @@ -221,7 +222,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin objectIDs = append(objectIDs, part.ObjectID) } - resultObject, err := rep.ConcatenateObjects(ctx, objectIDs) + resultObject, err := rep.ConcatenateObjects(ctx, objectIDs, metadataComp) if err != nil { return nil, errors.Wrap(err, "concatenate") } @@ -234,7 +235,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin return de, nil } -func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name, splitterName string) (*snapshot.DirEntry, error) { +func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor, metadataComp compression.Name, splitterName string) (*snapshot.DirEntry, error) { file, err := f.Open(ctx) if err != nil { return nil, errors.Wrap(err, "unable to open file") @@ -242,10 +243,11 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry defer file.Close() //nolint:errcheck writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "FILE:" + fname, - Compressor: compressor, - Splitter: splitterName, - AsyncWrites: 1, // upload chunk in parallel to writing another chunk + Description: "FILE:" + fname, + Compressor: compressor, + MetadataCompressor: metadataComp, + Splitter: splitterName, + AsyncWrites: 1, // upload chunk in parallel to writing another chunk }) defer writer.Close() //nolint:errcheck @@ -298,7 +300,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry return de, nil } -func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) (dirEntry *snapshot.DirEntry, ret error) { +func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink, metadataComp compression.Name) (dirEntry *snapshot.DirEntry, ret error) { u.Progress.HashingFile(relativePath) defer func() { @@ -312,7 +314,8 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin } writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "SYMLINK:" + f.Name(), + Description: "SYMLINK:" + f.Name(), + MetadataCompressor: metadataComp, }) defer writer.Close() //nolint:errcheck @@ -353,11 +356,13 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath }() comp := pol.CompressionPolicy.CompressorForFile(f) + metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor() writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "STREAMFILE:" + f.Name(), - Compressor: comp, - Splitter: pol.SplitterPolicy.SplitterForFile(f), + Description: "STREAMFILE:" + f.Name(), + Compressor: comp, + MetadataCompressor: metadataComp, + Splitter: pol.SplitterPolicy.SplitterForFile(f), }) defer writer.Close() //nolint:errcheck @@ -903,7 +908,8 @@ func (u *Uploader) processSingle( return nil case fs.Symlink: - de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry) + childTree := policyTree.Child(entry.Name()) + de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry, childTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()) return u.processEntryUploadResult(ctx, de, err, entryRelativePath, parentDirBuilder, policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreFileErrors.OrDefault(false), @@ -1145,6 +1151,8 @@ func uploadDirInternal( childCheckpointRegistry := &checkpointRegistry{} + metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor() + thisCheckpointRegistry.addCheckpointCallback(directory.Name(), func() (*snapshot.DirEntry, error) { // when snapshotting the parent, snapshot all our children and tell them to populate // childCheckpointBuilder @@ -1157,13 +1165,14 @@ func uploadDirInternal( checkpointManifest := thisCheckpointBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), IncompleteReasonCheckpoint) - oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest) + oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, checkpointManifest, metadataComp) if err != nil { return nil, errors.Wrap(err, "error writing dir manifest") } return newDirEntryWithSummary(directory, oid, checkpointManifest.Summary) }) + defer thisCheckpointRegistry.removeCheckpointCallback(directory.Name()) if err := u.processChildren(ctx, childCheckpointRegistry, thisDirBuilder, localDirPathOrEmpty, dirRelativePath, directory, policyTree, uniqueDirectories(previousDirs)); err != nil && !errors.Is(err, errCanceled) { @@ -1172,7 +1181,7 @@ func uploadDirInternal( dirManifest := thisDirBuilder.Build(fs.UTCTimestampFromTime(directory.ModTime()), u.incompleteReason()) - oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, dirManifest) + oid, err := writeDirManifest(ctx, u.repo, dirRelativePath, dirManifest, metadataComp) if err != nil { return nil, errors.Wrapf(err, "error writing dir manifest: %v", directory.Name()) } diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 279ed8f2e55..f030726f5fb 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -38,6 +38,8 @@ import ( "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob/filesystem" bloblogging "github.com/kopia/kopia/repo/blob/logging" + "github.com/kopia/kopia/repo/compression" + "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" @@ -228,6 +230,108 @@ func TestUpload(t *testing.T) { } } +type entry struct { + name string + objectID object.ID +} + +// findAllEntries recursively iterates over all the dirs and returns list of file entries. +func findAllEntries(t *testing.T, ctx context.Context, dir fs.Directory) []entry { + t.Helper() + entries := []entry{} + fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { + oid, err := object.ParseID(e.(object.HasObjectID).ObjectID().String()) + require.NoError(t, err) + entries = append(entries, entry{ + name: e.Name(), + objectID: oid, + }) + if e.IsDir() { + entries = append(entries, findAllEntries(t, ctx, e.(fs.Directory))...) + } + return nil + }) + + return entries +} + +func verifyMetadataCompressor(t *testing.T, ctx context.Context, rep repo.Repository, entries []entry, comp compression.HeaderID) { + t.Helper() + for _, e := range entries { + cid, _, ok := e.objectID.ContentID() + require.True(t, ok) + if !cid.HasPrefix() { + continue + } + info, err := rep.ContentInfo(ctx, cid) + if err != nil { + t.Errorf("failed to get content info: %v", err) + } + require.Equal(t, comp, info.CompressionHeaderID) + } +} + +func TestUploadMetadataCompression(t *testing.T) { + ctx := testlogging.Context(t) + t.Run("default metadata compression", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest) + }) + t.Run("disable metadata compression", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + MetadataCompressionPolicy: policy.MetadataCompressionPolicy{ + CompressorName: "none", + }, + }, + }, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression) + }) + t.Run("set metadata compressor", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + MetadataCompressionPolicy: policy.MetadataCompressionPolicy{ + CompressorName: "gzip", + }, + }, + }, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID()) + }) +} + func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) { ctx := testlogging.Context(t) th := newUploadTestHarness(ctx, t) @@ -816,14 +920,14 @@ func TestUpload_VirtualDirectoryWithStreamingFile(t *testing.T) { policyTree := policy.BuildTree(nil, policy.DefaultPolicy) // Create a temporary pipe file with test data - content := []byte("Streaming Temporary file content") + tmpContent := []byte("Streaming Temporary file content") r, w, err := os.Pipe() if err != nil { t.Fatalf("error creating pipe file: %v", err) } - if _, err = w.Write(content); err != nil { + if _, err = w.Write(tmpContent); err != nil { t.Fatalf("error writing to pipe file: %v", err) } @@ -873,8 +977,8 @@ func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T) // Create a temporary file with test data. Want something compressible but // small so we don't trigger dedupe. - content := []byte(strings.Repeat("a", 4096)) - r := io.NopCloser(bytes.NewReader(content)) + tmpContent := []byte(strings.Repeat("a", 4096)) + r := io.NopCloser(bytes.NewReader(tmpContent)) staticRoot := virtualfs.NewStaticDirectory("rootdir", []fs.Entry{ virtualfs.StreamingFileFromReader("stream-file", r), @@ -895,7 +999,7 @@ func TestUpload_VirtualDirectoryWithStreamingFile_WithCompression(t *testing.T) } func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) { - content := []byte("Streaming Temporary file content") + tmpContent := []byte("Streaming Temporary file content") mt := time.Date(2021, 1, 2, 3, 4, 5, 0, time.UTC) cases := []struct { @@ -907,7 +1011,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) { { desc: "CurrentTime", getFile: func() fs.StreamingFile { - return virtualfs.StreamingFileFromReader("a", io.NopCloser(bytes.NewReader(content))) + return virtualfs.StreamingFileFromReader("a", io.NopCloser(bytes.NewReader(tmpContent))) }, cachedFiles: 0, uploadedFiles: 1, @@ -915,7 +1019,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) { { desc: "FixedTime", getFile: func() fs.StreamingFile { - return virtualfs.StreamingFileWithModTimeFromReader("a", mt, io.NopCloser(bytes.NewReader(content))) + return virtualfs.StreamingFileWithModTimeFromReader("a", mt, io.NopCloser(bytes.NewReader(tmpContent))) }, cachedFiles: 1, uploadedFiles: 0, @@ -944,7 +1048,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) { require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.NonCachedFiles)) require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalDirectoryCount)) require.Equal(t, int32(1), atomic.LoadInt32(&man1.Stats.TotalFileCount)) - require.Equal(t, int64(len(content)), atomic.LoadInt64(&man1.Stats.TotalFileSize)) + require.Equal(t, int64(len(tmpContent)), atomic.LoadInt64(&man1.Stats.TotalFileSize)) // wait a little bit to ensure clock moves forward which is not always the case on Windows. time.Sleep(100 * time.Millisecond) @@ -963,7 +1067,7 @@ func TestUpload_VirtualDirectoryWithStreamingFileWithModTime(t *testing.T) { assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.NonCachedFiles)) // Cached files don't count towards the total file count. assert.Equal(t, tc.uploadedFiles, atomic.LoadInt32(&man2.Stats.TotalFileCount)) - require.Equal(t, int64(len(content)), atomic.LoadInt64(&man2.Stats.TotalFileSize)) + require.Equal(t, int64(len(tmpContent)), atomic.LoadInt64(&man2.Stats.TotalFileSize)) }) } } diff --git a/snapshot/snapshotgc/gc.go b/snapshot/snapshotgc/gc.go index 172f4054498..de868746a22 100644 --- a/snapshot/snapshotgc/gc.go +++ b/snapshot/snapshotgc/gc.go @@ -15,7 +15,6 @@ import ( "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/maintenance" - "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/snapshotfs" @@ -116,7 +115,7 @@ func runInternal(ctx context.Context, rep repo.DirectRepositoryWriter, gcDelete // Ensure that the iteration includes deleted contents, so those can be // undeleted (recovered). err := rep.ContentReader().IterateContents(ctx, content.IterateOptions{IncludeDeleted: true}, func(ci content.Info) error { - if manifest.ContentPrefix == ci.ContentID.Prefix() { + if content.ManifestContentPrefix == ci.ContentID.Prefix() { system.Add(int64(ci.PackedLength)) return nil }