diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 706f60f9b53..c99ab0e8176 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -210,7 +210,8 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository mustReadObject(ctx, t, w, result, written) ow := w.NewObjectWriter(ctx, object.WriterOptions{ - Prefix: content.ManifestContentPrefix, + Prefix: content.ManifestContentPrefix, + MetadataCompressor: "zstd-fastest", }) _, err := ow.Write([]byte{2, 3, 4}) @@ -258,7 +259,7 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID { t.Helper() - ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := ow.Write(data) require.NoError(t, err) diff --git a/repo/blob/storage_extend_test.go b/repo/blob/storage_extend_test.go index fad91143434..c7cd60c244d 100644 --- a/repo/blob/storage_extend_test.go +++ b/repo/blob/storage_extend_test.go @@ -42,7 +42,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) { nro.RetentionPeriod = period }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() @@ -103,7 +103,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing. nro.RetentionMode = "" }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 2b66b9515fe..a3ff51d53c3 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -1820,6 +1820,16 @@ func (s *contentManagerSuite) TestDisableCompressionOfMetadata(t *testing.T) { info, err := bm.ContentInfo(ctx, contentID) require.NoError(t, err) require.Equal(t, NoCompression, info.CompressionHeaderID) + + contentID1, err1 := bm.WriteContent(ctx, + indirectMetadataContent(), + "x", + NoCompression) + require.NoError(t, err1) + + info1, err1 := bm.ContentInfo(ctx, contentID1) + require.NoError(t, err1) + require.Equal(t, NoCompression, info1.CompressionHeaderID) } func (s *contentManagerSuite) TestCompressionOfMetadata(t *testing.T) { @@ -1845,6 +1855,21 @@ func (s *contentManagerSuite) TestCompressionOfMetadata(t *testing.T) { } else { require.Equal(t, NoCompression, info.CompressionHeaderID) } + + contentID1, err1 := bm.WriteContent(ctx, + indirectMetadataContent(), + "x", + compression.HeaderZstdFastest) + require.NoError(t, err1) + + info1, err1 := bm.ContentInfo(ctx, contentID1) + require.NoError(t, err1) + + if bm.SupportsContentCompression() { + require.Equal(t, compression.HeaderZstdFastest, info1.CompressionHeaderID) + } else { + require.Equal(t, NoCompression, info1.CompressionHeaderID) + } } func (s *contentManagerSuite) TestContentReadAliasing(t *testing.T) { @@ -2704,3 +2729,7 @@ func randRead(b []byte) (n int, err error) { func dirMetadataContent() gather.Bytes { return gather.FromSlice([]byte(`{"stream":"kopia:directory","entries":[{"name":".chglog","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.159239913-07:00","uid":501,"gid":20,"obj":"k18c2fa7d9108a2bf0d9d5b8e7993c48d","summ":{"size":1897,"files":2,"symlinks":0,"dirs":1,"maxTime":"2022-03-22T22:45:22.159499411-07:00","numFailed":0}},{"name":".git","type":"d","mode":"0755","mtime":"2022-04-03T17:47:38.340226306-07:00","uid":501,"gid":20,"obj":"k0ad4214eb961aa78cf06611ec4563086","summ":{"size":88602907,"files":7336,"symlinks":0,"dirs":450,"maxTime":"2022-04-03T17:28:54.030135198-07:00","numFailed":0}},{"name":".github","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.160470238-07:00","uid":501,"gid":20,"obj":"k76bee329054d5574d89a4e87c3f24088","summ":{"size":20043,"files":13,"symlinks":0,"dirs":2,"maxTime":"2022-03-22T22:45:22.162580934-07:00","numFailed":0}},{"name":".logs","type":"d","mode":"0750","mtime":"2021-11-06T13:43:35.082115457-07:00","uid":501,"gid":20,"obj":"k1e7d5bda28d6b684bb180cac16775c1c","summ":{"size":382943352,"files":1823,"symlinks":0,"dirs":122,"maxTime":"2021-11-06T13:43:45.111270118-07:00","numFailed":0}},{"name":".release","type":"d","mode":"0755","mtime":"2021-04-16T06:26:47-07:00","uid":501,"gid":20,"obj":"k0eb539316600015bf2861e593f68e18d","summ":{"size":159711446,"files":19,"symlinks":0,"dirs":1,"maxTime":"2021-04-16T06:26:47-07:00","numFailed":0}},{"name":".screenshots","type":"d","mode":"0755","mtime":"2022-01-29T00:12:29.023594487-08:00","uid":501,"gid":20,"obj":"k97f6dbc82e84c97c955364d12ddc44bd","summ":{"size":6770746,"files":53,"symlinks":0,"dirs":7,"maxTime":"2022-03-19T18:59:51.559099257-07:00","numFailed":0}},{"name":"app","type":"d","mode":"0755","mtime":"2022-03-26T22:28:51.863826565-07:00","uid":501,"gid":20,"obj":"k656b41b8679c2537392b3997648cf43e","summ":{"size":565633611,"files":44812,"symlinks":0,"dirs":7576,"maxTime":"2022-03-26T22:28:51.863946606-07:00","numFailed":0}},{"name":"cli","type":"d","mode":"0755","mtime":"2022-04-03T12:24:52.84319224-07:00","uid":501,"gid":20,"obj":"k04ab4f2a1da96c47f62a51f119dba14d","summ":{"size":468233,"files":164,"symlinks":0,"dirs":1,"maxTime":"2022-04-03T12:24:52.843267824-07:00","numFailed":0}},{"name":"dist","type":"d","mode":"0755","mtime":"2022-03-19T22:46:00.12834831-07:00","uid":501,"gid":20,"obj":"k19fc65da8a47b7702bf6b501b7f3e1b5","summ":{"size":3420732994,"files":315,"symlinks":0,"dirs":321,"maxTime":"2022-03-27T12:10:08.019195221-07:00","numFailed":0}},{"name":"fs","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.194955195-07:00","uid":501,"gid":20,"obj":"k1f0be83e34826450e651f16ba63c5b9c","summ":{"size":80421,"files":21,"symlinks":0,"dirs":6,"maxTime":"2022-03-22T22:45:22.195085778-07:00","numFailed":0}},{"name":"icons","type":"d","mode":"0755","mtime":"2022-01-23T12:06:14.739575928-08:00","uid":501,"gid":20,"obj":"k9e76c283312bdc6e562f66c7d6526396","summ":{"size":361744,"files":13,"symlinks":0,"dirs":1,"maxTime":"2021-03-12T19:28:45-08:00","numFailed":0}},{"name":"internal","type":"d","mode":"0755","mtime":"2022-04-02T18:14:02.459772332-07:00","uid":501,"gid":20,"obj":"k181db968f69045159753f8d6f3f3454f","summ":{"size":778467,"files":198,"symlinks":0,"dirs":56,"maxTime":"2022-04-03T12:24:52.844331708-07:00","numFailed":0}},{"name":"node_modules","type":"d","mode":"0755","mtime":"2021-05-16T15:45:19-07:00","uid":501,"gid":20,"obj":"kf2b636c57a7cc412739d2c10ca7ab0a3","summ":{"size":5061213,"files":361,"symlinks":0,"dirs":69,"maxTime":"2021-05-16T15:45:19-07:00","numFailed":0}},{"name":"repo","type":"d","mode":"0755","mtime":"2022-04-03T12:24:52.844407167-07:00","uid":501,"gid":20,"obj":"kb839dcd04d94a1b568f7f5e8fc809fab","summ":{"size":992877,"files":193,"symlinks":0,"dirs":27,"maxTime":"2022-04-03T17:47:31.211316848-07:00","numFailed":0}},{"name":"site","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.250939688-07:00","uid":501,"gid":20,"obj":"k5d8ce70ca4337c17219502963f0fe6d3","summ":{"size":58225583,"files":11387,"symlinks":0,"dirs":557,"maxTime":"2022-03-22T22:45:22.258280685-07:00","numFailed":0}},{"name":"snapshot","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.265723348-07:00","uid":501,"gid":20,"obj":"k6201166bd99c8fe85d53d742e92c81a6","summ":{"size":316009,"files":66,"symlinks":0,"dirs":6,"maxTime":"2022-03-26T23:04:24.313115653-07:00","numFailed":0}},{"name":"tests","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.2749515-07:00","uid":501,"gid":20,"obj":"k1e20890089f6cbad3c6fe79cbae71e09","summ":{"size":657360,"files":183,"symlinks":0,"dirs":30,"maxTime":"2022-04-02T18:41:02.232496031-07:00","numFailed":0}},{"name":"tools","type":"d","mode":"0755","mtime":"2022-03-22T22:45:22.279094142-07:00","uid":501,"gid":20,"obj":"k6464e940fea5ef916ab86eafdb68b1cd","summ":{"size":889231805,"files":12412,"symlinks":0,"dirs":3405,"maxTime":"2022-03-22T22:45:22.279144141-07:00","numFailed":0}},{"name":".DS_Store","type":"f","mode":"0644","size":14340,"mtime":"2022-02-12T20:06:35.60110891-08:00","uid":501,"gid":20,"obj":"d9295958410ae3b73f68033274cd7a8f"},{"name":".codecov.yml","type":"f","mode":"0644","size":620,"mtime":"2022-03-22T22:45:22.159772743-07:00","uid":501,"gid":20,"obj":"6f81038ca8d7b81804f42031142731ed"},{"name":".gitattributes","type":"f","mode":"0644","size":340,"mtime":"2022-03-22T22:45:22.159870909-07:00","uid":501,"gid":20,"obj":"5608c2d289164627e8bdb468bbee2643"},{"name":".gitignore","type":"f","mode":"0644","size":321,"mtime":"2022-03-22T22:45:22.162843932-07:00","uid":501,"gid":20,"obj":"c43ce513c6371e0838fc553b77f5cdb2"},{"name":".golangci.yml","type":"f","mode":"0644","size":3071,"mtime":"2022-03-22T22:45:22.163100014-07:00","uid":501,"gid":20,"obj":"4289f49e43fba6800fa75462bd2ad43e"},{"name":".gometalinter.json","type":"f","mode":"0644","size":163,"mtime":"2019-05-09T22:33:06-07:00","uid":501,"gid":20,"obj":"fe4fc9d77cfb5f1b062414fdfd121713"},{"name":".goreleaser.yml","type":"f","mode":"0644","size":1736,"mtime":"2022-03-22T22:45:22.163354888-07:00","uid":501,"gid":20,"obj":"91093a462f4f72c619fb9f144702c1bf"},{"name":".linterr.txt","type":"f","mode":"0644","size":425,"mtime":"2021-11-08T22:14:29.315279172-08:00","uid":501,"gid":20,"obj":"f6c165387b84c7fb0ebc26fdc812775d"},{"name":".tmp.integration-tests.json","type":"f","mode":"0644","size":5306553,"mtime":"2022-03-27T12:10:55.035217892-07:00","uid":501,"gid":20,"obj":"Ixbc27b9a704275d05a6505e794ce63e66"},{"name":".tmp.provider-tests.json","type":"f","mode":"0644","size":617740,"mtime":"2022-02-15T21:30:28.579546866-08:00","uid":501,"gid":20,"obj":"e7f69fc0222763628d5b294faf37a6d7"},{"name":".tmp.unit-tests.json","type":"f","mode":"0644","size":200525943,"mtime":"2022-04-03T10:08:51.453180251-07:00","uid":501,"gid":20,"obj":"Ixf5da1bbcdbc267fa123d93aaf90cbd75"},{"name":".wwhrd.yml","type":"f","mode":"0644","size":244,"mtime":"2022-03-22T22:45:22.163564803-07:00","uid":501,"gid":20,"obj":"cea0cac6d19d59dcf2818b08521f46b8"},{"name":"BUILD.md","type":"f","mode":"0644","size":4873,"mtime":"2022-03-22T22:45:22.163818593-07:00","uid":501,"gid":20,"obj":"bcd47eca7b520b3ea88e4799cc0c9fea"},{"name":"CODE_OF_CONDUCT.md","type":"f","mode":"0644","size":5226,"mtime":"2021-03-12T19:28:45-08:00","uid":501,"gid":20,"obj":"270e55b022ec0c7588b2dbb501791b3e"},{"name":"GOVERNANCE.md","type":"f","mode":"0644","size":12477,"mtime":"2020-03-15T23:40:35-07:00","uid":501,"gid":20,"obj":"96674fad8fcf2bdfb96b0583917bb617"},{"name":"LICENSE","type":"f","mode":"0644","size":10763,"mtime":"2019-05-27T15:50:18-07:00","uid":501,"gid":20,"obj":"e751b8a146e1dd5494564e9a8c26dd6a"},{"name":"Makefile","type":"f","mode":"0644","size":17602,"mtime":"2022-03-22T22:45:22.1639718-07:00","uid":501,"gid":20,"obj":"aa9cc80d567e94087ea9be8fef718c1a"},{"name":"README.md","type":"f","mode":"0644","size":3874,"mtime":"2022-03-22T22:45:22.164109925-07:00","uid":501,"gid":20,"obj":"d227c763b9cf476426da5d99e9fff694"},{"name":"a.log","type":"f","mode":"0644","size":3776,"mtime":"2022-03-08T19:19:40.196874627-08:00","uid":501,"gid":20,"obj":"6337190196e804297f92a17805600be7"},{"name":"build_architecture.svg","type":"f","mode":"0644","size":143884,"mtime":"2021-03-12T19:28:45-08:00","uid":501,"gid":20,"obj":"72c0aef8c43498b056236b2d46d7e44a"},{"name":"coverage.txt","type":"f","mode":"0644","size":194996,"mtime":"2022-03-26T07:09:37.533649628-07:00","uid":501,"gid":20,"obj":"fdf1a20cea21d4daf053b99711735d0e"},{"name":"go.mod","type":"f","mode":"0644","size":5447,"mtime":"2022-03-27T09:40:59.78753556-07:00","uid":501,"gid":20,"obj":"71eefc767aeea467b1d1f7ff0ee5c21b"},{"name":"go.sum","type":"f","mode":"0644","size":114899,"mtime":"2022-03-27T09:40:59.788485485-07:00","uid":501,"gid":20,"obj":"2e801e525d9e58208dff3c25bd30f296"},{"name":"main.go","type":"f","mode":"0644","size":2057,"mtime":"2022-03-22T22:45:22.22380977-07:00","uid":501,"gid":20,"obj":"73411f7e340e5cddc43faaa1d1fe5743"}],"summary":{"size":5787582078,"files":79395,"symlinks":0,"dirs":12639,"maxTime":"2022-04-03T17:47:38.340226306-07:00","numFailed":0}}`)) } + +func indirectMetadataContent() gather.Bytes { + return gather.FromSlice([]byte(`{"stream":"kopia:indirect","entries":[{"l":7616808,"o":"a6d555a7070f7e6c1e0c9cf90e8a6cc7"},{"s":7616808,"l":8388608,"o":"7ba10912378095851cff7da5f8083fc0"},{"s":16005416,"l":2642326,"o":"de41b93c1c1ba1f030d32e2cefffa0e9"},{"s":18647742,"l":2556388,"o":"25f391d185c3101006a45553efb67742"},{"s":21204130,"l":3156843,"o":"3b281271f7c0e17f533fe5edc0f79b31"},{"s":24360973,"l":8388608,"o":"4fb9395a4790fb0b6c5f0b91f102e9ab"},{"s":32749581,"l":8388608,"o":"bf0cfa2796354f0c74ee725af7a6824b"},{"s":41138189,"l":5788370,"o":"ecb6672792bfb433886b6e57d055ecd7"},{"s":46926559,"l":3828331,"o":"ac49ad086654c624f1e86a3d46ebdf04"},{"s":50754890,"l":6544699,"o":"951b34fddcc2cc679b23b074dabc7e4e"},{"s":57299589,"l":2523488,"o":"47965162d4ebc46b25a965854d4921d3"},{"s":59823077,"l":3510947,"o":"83d6c1f3ab9695075b93eeab6cc0761c"},{"s":63334024,"l":3239328,"o":"a8aa9f5ed5357520f0c0b04cb65293ec"},{"s":66573352,"l":8388608,"o":"9ca2f0ff2e50219759b4c07971ea4e84"},{"s":74961960,"l":3737528,"o":"5eaddb02c217c1d455078c858ae3ff96"},{"s":78699488,"l":2382189,"o":"513adbee65ed3f13fc6a6a27c1b683d1"},{"s":81081677,"l":3145876,"o":"a5968eb3ad727f4a6b263541a7847c7e"},{"s":84227553,"l":4302790,"o":"58929275a937192f01b1af8526c25cad"},{"s":88530343,"l":3795820,"o":"d2adf1e91029b37450ef988ff88bd861"},{"s":92326163,"l":8388608,"o":"9a14d257b93a9011a8d133ee3cd0c5bc"},{"s":100714771,"l":3885115,"o":"3ce2122c512d00744ab065ef8d782fe6"},{"s":104599886,"l":2109875,"o":"501a69a59ee5f3dd1b2c8add2fdc5cf8"},{"s":106709761,"l":6656155,"o":"6ba38db7fb389339b41dde4e8097e4ab"},{"s":113365916,"l":3789867,"o":"7b594f73ab9e3ad736aede2d1964e4e9"},{"s":117155783,"l":4156979,"o":"7215d07ec33b442aee52bd50234bf03d"},{"s":121312762,"l":4089475,"o":"d1ef2d9e330b11eec9365fefdc5434eb"},{"s":125402237,"l":8388608,"o":"38969b3114caf31a3501b34109063c25"},{"s":133790845,"l":8388608,"o":"cb1cf30e75d0fbbe058db1b8394e6e03"},{"s":142179453,"l":3645601,"o":"975e2cdb9ccbf36e3012a715c2a596de"},{"s":145825054,"l":2546129,"o":"2e2b6b2e98fbfcdc1855f5f36d8c2fb7"},{"s":148371183,"l":2830247,"o":"535dffb5b1df8f5f6f8d9787d961f81e"},{"s":151201430,"l":7158506,"o":"f953277da0845c6fe42d0e115219e6d6"},{"s":158359936,"l":2705426,"o":"83130d0e230071c5a94d38e3e94cf326"},{"s":161065362,"l":7085401,"o":"6b75fb5f5ab5728282bb043cf6d96cd3"},{"s":168150763,"l":5357359,"o":"431c63e39c20b879e517861acf12091f"},{"s":173508122,"l":5426372,"o":"0f329762d79c6948261dcde8fa26b3b8"},{"s":178934494,"l":6322719,"o":"dc8c1d8c09c0ce783e932ae2279c3db5"},{"s":185257213,"l":8388608,"o":"b5cb9fc5464c30f7bacfda0e5381ae91"},{"s":193645821,"l":3711229,"o":"494f1e15cfea3ab09523a391df0fbebc"},{"s":197357050,"l":6853193,"o":"a0c91d2654cfd2b4ca34542bb4b5d926"},{"s":204210243,"l":2645205,"o":"1cfcab6023b83e32c284c8eb1310f34c"},{"s":206855448,"l":5775640,"o":"84baf20ed2f84ba09f317028a366532d"},{"s":212631088,"l":2698898,"o":"7a6746a097f4506956f5e8d56eee6873"},{"s":215329986,"l":3444532,"o":"b11be0bf84341a0cbcd46ca14b6fed6d"},{"s":218774518,"l":5042437,"o":"3bc63ab43d9b7c19b42d51508f449b8b"},{"s":223816955,"l":4407710,"o":"f4cb0dcb6ad0d1d17c52ef7f5654d7b9"},{"s":228224665,"l":3288967,"o":"0a9254bb39e95e9a93c30b10f03e2f2a"},{"s":231513632,"l":6818881,"o":"fa22cfbe6caebb301dc4eae4d8d13a9b"},{"s":238332513,"l":4224104,"o":"29a1316a5157b0a3359b2760cbd0895c"},{"s":242556617,"l":4427385,"o":"0efe5d26d520d4ab114fcddb8d1a1931"},{"s":246984002,"l":3625567,"o":"8e6b4a4e1acc6100a271a9100518ff77"},{"s":250609569,"l":5412145,"o":"d3988a71021a70c0ff69eb0d80dca0c8"},{"s":256021714,"l":8388608,"o":"0b5c245c16e8fb845358f75a2f984585"},{"s":264410322,"l":8388608,"o":"70d149b1ec039dc716ae3b524f1ef0f8"},{"s":272798930,"l":5295221,"o":"a081eb5227d37e8d00343c450bc12117"},{"s":278094151,"l":3320852,"o":"7394c656b6278445ad39189dec6896f8"},{"s":281415003,"l":4569639,"o":"9e80f48dc5aa9378d1c4206d17dc3116"},{"s":285984642,"l":3227911,"o":"bd486cf43401ef78ae1199c6c18cb424"},{"s":289212553,"l":4408113,"o":"f73c366a16745ca5fe823c4074e026b4"},{"s":293620666,"l":5806890,"o":"fba0357b2a79b20ba3b942c0f22d545b"},{"s":299427556,"l":8388608,"o":"6e805d1757fa230794ab8445d377c832"},{"s":307816164,"l":5026069,"o":"88e75d7ba957fbe150e5c49a501540a6"},{"s":312842233,"l":8388608,"o":"17e65917f54e4e0b454c93eb08a8c342"},{"s":321230841,"l":2416356,"o":"e65ce9c2efe34ea01d015c737abc060a"},{"s":323647197,"l":2129020,"o":"b89cb59bb69a32e865d9afbf454d080e"},{"s":325776217,"l":6264283,"o":"6a80f62763f33d2946844ef3a8755517"},{"s":332040500,"l":7998871,"o":"59bce9d16094aef2e07f98098039bd91"},{"s":340039371,"l":3760705,"o":"53b191c6dfb41134b3430343438bf4ae"},{"s":343800076,"l":8388608,"o":"8d8945a17b9a819d03f414a337c2e47d"},{"s":352188684,"l":4370796,"o":"d216de504cdbc7a598c067e49f26c69b"},{"s":356559480,"l":8388608,"o":"e6f7e4cce390627c7030a9774ed885b1"},{"s":364948088,"l":4673010,"o":"32865f3c19fcf194e7fde39ef2e6aa28"},{"s":369621098,"l":8388608,"o":"26139bd21b4581d4b97be682f13005c9"},{"s":378009706,"l":3305716,"o":"5fe7a3d8d80e4dc367021ece1130b203"},{"s":381315422,"l":8388608,"o":"00a029bd5a9a63cde2ba9d25ebea11f7"},{"s":389704030,"l":8388608,"o":"67c10d19567b60a4193ab73bfc77ae99"},{"s":398092638,"l":5533146,"o":"045bcfb7416579d060c10f82946eae1b"},{"s":403625784,"l":8388608,"o":"72cda208c56f5c7bbfc99b65889bfc80"},{"s":412014392,"l":3760763,"o":"6cb3f59c8823c049e222b58c8c155d1e"},{"s":415775155,"l":3552185,"o":"d71b9f954d280b03f54c90db61168fc2"},{"s":419327340,"l":8388608,"o":"66df8620bdd389b079cc0334c4fb0f04"},{"s":427715948,"l":3653017,"o":"796520ac43adcaec6117760fc2699b78"},{"s":431368965,"l":2935638,"o":"01fea89a93279431a0a7f5188ceefed1"},{"s":434304603,"l":2820579,"o":"c9b3a1868f00f55d90cf02aa3c877b05"},{"s":437125182,"l":8388608,"o":"d77d35d2ead1595aedc25a65069e8d88"},{"s":445513790,"l":7407288,"o":"2297b4fb6ca3959a7fb0220e358a9778"},{"s":452921078,"l":7891558,"o":"a2cd30afaafcb844405eb6f048323bbc"},{"s":460812636,"l":3191130,"o":"ba6b77fc177cf223b1d50bf330ebf8ce"},{"s":464003766,"l":7565430,"o":"ea273aa565f457e94beca5e1d20ec068"},{"s":471569196,"l":3419794,"o":"eedd34de4ae36993f04f75ebc3c9a165"},{"s":474988990,"l":3460292,"o":"2a851cea2d84ca661b3eebf72cf0de55"},{"s":478449282,"l":8032042,"o":"b402c287796218ddf5d3fff2e70eb2c7"},{"s":486481324,"l":6320993,"o":"6fec73dd933316685cc3de99b6c0be66"},{"s":492802317,"l":2960958,"o":"386bfb6cf878efc2881aacfef8c8c22d"},{"s":495763275,"l":4043015,"o":"eaa10fc56a85813899e15e87ba458c90"},{"s":499806290,"l":2220895,"o":"94e8e439c139f120d514d248cb1d37b7"},{"s":502027185,"l":2318042,"o":"ccd572f48087ee0dce5af0d1823279cf"},{"s":504345227,"l":3396237,"o":"c1080ad8f97a38eaa3754023d0ff616c"},{"s":507741464,"l":3761426,"o":"abd1cc7cb7332535f1672e1fd0b48967"},{"s":511502890,"l":3313883,"o":"030705ce77d9eb02d3e91fa7a2f5ee16"},{"s":514816773,"l":4643444,"o":"56c1e4ca5e2bc64d1744e6645f16fec2"},{"s":519460217,"l":4877742,"o":"83f88295b8539647b759aab1e7588a5f"},{"s":524337959,"l":2731173,"o":"d3fc29a18a49f05f5320592f043b3898"},{"s":527069132,"l":4388381,"o":"0d206d6e7240945ccc2900814604e55d"},{"s":531457513,"l":4198048,"o":"87c54dab1f99b6b44e4193e4e7cbf6b1"},{"s":535655561,"l":8300001,"o":"d1d2be80c5e1942e8742481df1acc022"},{"s":543955562,"l":2103894,"o":"213b91aeb37f106cd97e29d23306d492"},{"s":546059456,"l":3464612,"o":"0cec1bb256cb1f37b65339ee4df7eaa4"},{"s":549524068,"l":6456134,"o":"5b21a9c34210b23e0d1711ffb467e694"},{"s":555980202,"l":4180529,"o":"f77ebea3c198350bb255bdfc0fdf6a36"},{"s":560160731,"l":8388608,"o":"9893ebd1ef51a280861b1168f9e838af"},{"s":568549339,"l":3672532,"o":"40f3c47adb19bec122d9647e1b7986ad"},{"s":572221871,"l":4686009,"o":"ffa5697af8444e22bdf05cd7f7b4e211"},{"s":576907880,"l":8388608,"o":"3ee328d1cb9f862a928198ecb28ae7b6"},{"s":585296488,"l":3117981,"o":"cbdb5e9e2390e031571567ffaf81ba08"},{"s":588414469,"l":8388608,"o":"9212fbcd5b2c5b09475f387b7a54d25c"},{"s":596803077,"l":8388608,"o":"5f06b16231dd3038abe59ddf17789e89"},{"s":605191685,"l":5345215,"o":"b22a5da98d6a3909d5e171998abfdc13"},{"s":610536900,"l":8388608,"o":"93db1f2b3e5272fffc3d644ec00f1463"},{"s":618925508,"l":7526887,"o":"d2b612202fa49f2fd059f76057183fd9"},{"s":626452395,"l":6650357,"o":"5863fec408b1aa89ccf1c77a1e29061e"},{"s":633102752,"l":8388608,"o":"4295a43614c097a8a4f72bb1f8d3cf3a"},{"s":641491360,"l":2281701,"o":"13e34075d962bcfdb89dcbd5b766aee6"},{"s":643773061,"l":4494718,"o":"b6cc56aba7510b753a3dae94428b62ff"},{"s":648267779,"l":6378335,"o":"9a8a3c3fe94e205523e40b2ed7eb902b"},{"s":654646114,"l":8388608,"o":"2636ee206c0a3c3b099b3f9f2e36eec6"},{"s":663034722,"l":8388608,"o":"e6323f8542eb34ad197099074b08ff55"},{"s":671423330,"l":8388608,"o":"66f6a6485ac08085328996b28ced7452"},{"s":679811938,"l":7119415,"o":"170721a5d1a9728df40deedcb5bde060"},{"s":686931353,"l":2960051,"o":"f52f94fbaf8d101e633c545b5b0cdf24"},{"s":689891404,"l":4571243,"o":"cc47bfaa5b6d54dd863bc714cc607f82"},{"s":694462647,"l":7146332,"o":"331722c804700da0c4fa4c43d04aa56a"},{"s":701608979,"l":5152399,"o":"f4668768e6c15d00b8d02c1d20faecca"},{"s":706761378,"l":8388608,"o":"593addeedf8da213289758348e05567c"},{"s":715149986,"l":8388608,"o":"388715dd8b32f2088572c7703302b596"},{"s":723538594,"l":4120402,"o":"0947e4864bd26230e26406f117b18d4c"},{"s":727658996,"l":8103740,"o":"ae3062a4e74d4a407b944c895dfe1f95"},{"s":735762736,"l":4037896,"o":"2fb24ad127cbe65fc704cfdd15d3e4c2"},{"s":739800632,"l":6316726,"o":"6f21491d81b688d5efbe0ff22e35e05b"},{"s":746117358,"l":3007919,"o":"eaa42376365bad6707f4c11c204d65eb"},{"s":749125277,"l":5262875,"o":"321847ff2d9c62f7f2c6db3914327756"},{"s":754388152,"l":4462123,"o":"c565fa31ef90fc2c196d9cde44095597"},{"s":758850275,"l":5294675,"o":"c6baec6e22d1c604a04d887aeed1fd82"},{"s":764144950,"l":2912994,"o":"1327ac0489a8e76c1fbebe5b561ca6b4"},{"s":767057944,"l":2962702,"o":"97fc763b782a57f9fd542f4ab7657a85"},{"s":770020646,"l":8388608,"o":"1ca3bce935b5d306be767a9c89cf0026"},{"s":778409254,"l":365274,"o":"484b0358354388fdd16d9ea2cfe9260d"}]}`)) +} diff --git a/repo/format/upgrade_lock_test.go b/repo/format/upgrade_lock_test.go index e1a7017862f..fb354b7d8b4 100644 --- a/repo/format/upgrade_lock_test.go +++ b/repo/format/upgrade_lock_test.go @@ -401,7 +401,7 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) { t.Helper() - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := w.Write(data) require.NoError(t, err, testCaseID) diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index e6af5db8228..dc27ea28ec7 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp } // ConcatenateObjects creates a concatenated objects from the provided object IDs. -func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { +func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) { //nolint:wrapcheck - return r.omgr.Concatenate(ctx, objectIDs) + return r.omgr.Concatenate(ctx, objectIDs, comp) } // maybeRetry executes the provided callback with or without automatic retries depending on how diff --git a/repo/maintenance/blob_gc_test.go b/repo/maintenance/blob_gc_test.go index e5ba92d5bad..a1b22e05988 100644 --- a/repo/maintenance/blob_gc_test.go +++ b/repo/maintenance/blob_gc_test.go @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) { nro.BlockFormat.HMACSecret = testHMACSecret }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() diff --git a/repo/maintenance/blob_retain_test.go b/repo/maintenance/blob_retain_test.go index d4fe26b1346..58464e6b25d 100644 --- a/repo/maintenance/blob_retain_test.go +++ b/repo/maintenance/blob_retain_test.go @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) { nro.RetentionPeriod = period }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() @@ -98,7 +98,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing nro.BlockFormat.HMACSecret = testHMACSecret }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") w.Result() w.Close() diff --git a/repo/maintenance/content_rewrite_test.go b/repo/maintenance/content_rewrite_test.go index 2079279bb3a..e7e02b8ceee 100644 --- a/repo/maintenance/content_rewrite_test.go +++ b/repo/maintenance/content_rewrite_test.go @@ -79,7 +79,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) { // run N sessions to create N individual pack blobs for each content prefix for range tc.numPContents { require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "%v", uuid.NewString()) _, err := ow.Result() return err @@ -88,7 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) { for range tc.numQContents { require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "%v", uuid.NewString()) _, err := ow.Result() return err diff --git a/repo/maintenance/maintenance_safety_test.go b/repo/maintenance/maintenance_safety_test.go index 5a303240129..ccd34c96e94 100644 --- a/repo/maintenance/maintenance_safety_test.go +++ b/repo/maintenance/maintenance_safety_test.go @@ -34,7 +34,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) { // create object that's immediately orphaned since nobody refers to it. require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "hello world") var err error objectID, err = ow.Result() @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) { // create another object in separate pack. require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "hello universe") _, err := ow.Result() return err diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index 213069b3d6c..923aa64319a 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer { w.description = opt.Description w.prefix = opt.Prefix w.compressor = compression.ByName[opt.Compressor] + w.metadataCompressor = compression.ByName[opt.MetadataCompressor] w.totalLength = 0 w.currentPosition = 0 @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) { // in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific, // this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB), // so this method should only be used for very large files where this overhead is relatively small. -func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) { +func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) { if len(objectIDs) == 0 { return EmptyID, errors.Errorf("empty list of objects") } @@ -131,8 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength) w := om.NewWriter(ctx, WriterOptions{ - Prefix: indirectContentPrefix, - Description: "CONCATENATED INDEX", + Prefix: indirectContentPrefix, + Description: "CONCATENATED INDEX", + Compressor: metadataComp, + MetadataCompressor: metadataComp, }) defer w.Close() //nolint:errcheck diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 797c44957eb..e750b3577d6 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -88,7 +88,7 @@ func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content. defer f.mu.Unlock() if d, ok := f.data[contentID]; ok { - return content.Info{ContentID: contentID, PackedLength: uint32(len(d))}, nil + return content.Info{ContentID: contentID, PackedLength: uint32(len(d)), CompressionHeaderID: f.compresionIDs[contentID]}, nil } return content.Info{}, blob.ErrBlobNotFound @@ -175,18 +175,43 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) { _, _, om := setupTest(t, cmap) w := om.NewWriter(ctx, WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)) oid, err := w.Result() require.NoError(t, err) cid, isCompressed, ok := oid.ContentID() + require.True(t, ok) require.False(t, isCompressed) // oid will not indicate compression require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid]) } +func TestCompression_IndirectContentCompressionEnabledMetadata(t *testing.T) { + ctx := testlogging.Context(t) + + cmap := map[content.ID]compression.HeaderID{} + _, _, om := setupTest(t, cmap) + w := om.NewWriter(ctx, WriterOptions{ + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", + }) + w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000000)) + oid, err := w.Result() + require.NoError(t, err) + verifyIndirectBlock(ctx, t, om, oid, compression.HeaderZstdFastest) + + w2 := om.NewWriter(ctx, WriterOptions{ + MetadataCompressor: "none", + }) + w2.Write(bytes.Repeat([]byte{5, 6, 7, 8}, 1000000)) + oid2, err2 := w2.Result() + require.NoError(t, err2) + verifyIndirectBlock(ctx, t, om, oid2, content.NoCompression) +} + func TestCompression_CustomSplitters(t *testing.T) { cases := []struct { wo WriterOptions @@ -244,7 +269,8 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) { _, _, om := setupTest(t, nil) w := om.NewWriter(ctx, WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)) oid, err := w.Result() @@ -409,7 +435,7 @@ func verifyNoError(t *testing.T, err error) { require.NoError(t, err) } -func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) { +func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID, expectedComp compression.HeaderID) { t.Helper() for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() { @@ -418,6 +444,11 @@ func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) if !c.HasPrefix() { t.Errorf("expected base content ID to be prefixed, was %v", c) } + info, err := om.contentMgr.ContentInfo(ctx, c) + if err != nil { + t.Errorf("error getting content info for %v", err.Error()) + } + require.Equal(t, expectedComp, info.CompressionHeaderID) } rd, err := Open(ctx, om.contentMgr, indexContentID) @@ -443,6 +474,7 @@ func TestIndirection(t *testing.T) { dataLength int expectedBlobCount int expectedIndirection int + metadataCompressor compression.Name }{ {dataLength: 200, expectedBlobCount: 1, expectedIndirection: 0}, {dataLength: 1000, expectedBlobCount: 1, expectedIndirection: 0}, @@ -452,15 +484,18 @@ func TestIndirection(t *testing.T) { // 1 blob of 1000 zeros + 1 index blob {dataLength: 4000, expectedBlobCount: 2, expectedIndirection: 1}, // 1 blob of 1000 zeros + 1 index blob - {dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1}, + {dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "none"}, + // 1 blob of 1000 zeros + 1 index blob, enabled metadata compression + {dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "zstd-fastest"}, } for _, c := range cases { - data, _, om := setupTest(t, nil) + cmap := map[content.ID]compression.HeaderID{} + data, _, om := setupTest(t, cmap) contentBytes := make([]byte, c.dataLength) - writer := om.NewWriter(ctx, WriterOptions{}) + writer := om.NewWriter(ctx, WriterOptions{MetadataCompressor: c.metadataCompressor}) writer.(*objectWriter).splitter = splitterFactory() if _, err := writer.Write(contentBytes); err != nil { @@ -491,7 +526,11 @@ func TestIndirection(t *testing.T) { t.Errorf("invalid blob count for %v, got %v, wanted %v", result, got, want) } - verifyIndirectBlock(ctx, t, om, result) + expectedCompressor := content.NoCompression + if len(c.metadataCompressor) > 0 && c.metadataCompressor != "none" { + expectedCompressor = compression.ByName[c.metadataCompressor].HeaderID() + } + verifyIndirectBlock(ctx, t, om, result, expectedCompressor) } } @@ -578,7 +617,7 @@ func TestConcatenate(t *testing.T) { } for _, tc := range cases { - concatenatedOID, err := om.Concatenate(ctx, tc.inputs) + concatenatedOID, err := om.Concatenate(ctx, tc.inputs, "zstd-fastest") if err != nil { t.Fatal(err) } @@ -617,7 +656,7 @@ func TestConcatenate(t *testing.T) { } // make sure results of concatenation can be further concatenated. - concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}) + concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}, "zstd-fastest") if err != nil { t.Fatal(err) } diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 5a260f9c3af..ee5661904bb 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -68,7 +68,8 @@ type objectWriter struct { om *Manager - compressor compression.Compressor + compressor compression.Compressor + metadataCompressor compression.Compressor prefix content.IDPrefix buffer gather.WriteBuffer @@ -292,12 +293,13 @@ func (w *objectWriter) checkpointLocked() (ID, error) { } iw := &objectWriter{ - ctx: w.ctx, - om: w.om, - compressor: nil, - description: "LIST(" + w.description + ")", - splitter: w.om.newDefaultSplitter(), - prefix: w.prefix, + ctx: w.ctx, + om: w.om, + compressor: w.metadataCompressor, + metadataCompressor: w.metadataCompressor, + description: "LIST(" + w.description + ")", + splitter: w.om.newDefaultSplitter(), + prefix: w.prefix, } if iw.prefix == "" { @@ -334,9 +336,10 @@ func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error { // WriterOptions can be passed to Repository.NewWriter(). type WriterOptions struct { - Description string - Prefix content.IDPrefix // empty string or a single-character ('g'..'z') - Compressor compression.Name - Splitter string // use particular splitter instead of default - AsyncWrites int // allow up to N content writes to be asynchronous + Description string + Prefix content.IDPrefix // empty string or a single-character ('g'..'z') + Compressor compression.Name + MetadataCompressor compression.Name + Splitter string // use particular splitter instead of default + AsyncWrites int // allow up to N content writes to be asynchronous } diff --git a/repo/repo_benchmarks_test.go b/repo/repo_benchmarks_test.go index 13d0105d9d5..abfa90b2a8f 100644 --- a/repo/repo_benchmarks_test.go +++ b/repo/repo_benchmarks_test.go @@ -15,7 +15,7 @@ func BenchmarkWriterDedup1M(b *testing.B) { ctx, env := repotesting.NewEnvironment(b, format.FormatVersion2) dataBuf := make([]byte, 4<<20) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(dataBuf) _, err := writer.Result() require.NoError(b, err) @@ -25,7 +25,7 @@ func BenchmarkWriterDedup1M(b *testing.B) { for range b.N { // write exactly the same data - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(dataBuf) writer.Result() writer.Close() @@ -45,7 +45,7 @@ func BenchmarkWriterNoDedup1M(b *testing.B) { for i := range b.N { // write exactly the same data - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if i+chunkSize > len(dataBuf) { chunkSize++ diff --git a/repo/repository.go b/repo/repository.go index e9451eda145..9616073d7b0 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -14,6 +14,7 @@ import ( "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/throttling" + "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/content/indexblob" "github.com/kopia/kopia/repo/format" @@ -47,7 +48,7 @@ type RepositoryWriter interface { Repository NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer - ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) + ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error) DeleteManifest(ctx context.Context, id manifest.ID) error @@ -180,9 +181,9 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write } // ConcatenateObjects creates a concatenated objects from the provided object IDs. -func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) { +func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) { //nolint:wrapcheck - return r.omgr.Concatenate(ctx, objectIDs) + return r.omgr.Concatenate(ctx, objectIDs, comp) } // DisableIndexRefresh disables index refresh for the duration of the write session. diff --git a/repo/repository_test.go b/repo/repository_test.go index cf8b17cedd2..5e67cc3ed5d 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestWriters(t *testing.T) { for _, c := range cases { ctx, env := repotesting.NewEnvironment(t, s.formatVersion) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if _, err := writer.Write(c.data); err != nil { t.Fatalf("write error: %v", err) } @@ -74,7 +74,7 @@ func (s *formatSpecificTestSuite) TestWriterCompleteChunkInTwoWrites(t *testing. ctx, env := repotesting.NewEnvironment(t, s.formatVersion) b := make([]byte, 100) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(b[0:50]) writer.Write(b[0:50]) result, err := writer.Result() @@ -159,7 +159,7 @@ func (s *formatSpecificTestSuite) TestHMAC(t *testing.T) { c := bytes.Repeat([]byte{0xcd}, 50) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) w.Write(c) result, err := w.Result() @@ -185,7 +185,7 @@ func (s *formatSpecificTestSuite) TestReaderStoredBlockNotFound(t *testing.T) { func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) object.ID { t.Helper() - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if _, err := w.Write(data); err != nil { t.Fatalf("can't write object %q - write failed: %v", testCaseID, err) } @@ -275,7 +275,7 @@ func TestFormats(t *testing.T) { for k, v := range c.oids { bytesToWrite := []byte(k) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) w.Write(bytesToWrite) oid, err := w.Result() @@ -555,7 +555,7 @@ func TestObjectWritesWithRetention(t *testing.T) { }, }) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := writer.Write([]byte("the quick brown fox jumps over the lazy dog")) require.NoError(t, err) @@ -774,7 +774,8 @@ func TestMetrics_CompressibleData(t *testing.T) { for ensureMapEntry(t, env.RepositoryMetrics().Snapshot(false).Counters, "content_write_duration_nanos") < 5e6 { w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(inputData) diff --git a/snapshot/snapshotfs/dir_rewriter.go b/snapshot/snapshotfs/dir_rewriter.go index 7e36f342cd3..8117c655e87 100644 --- a/snapshot/snapshotfs/dir_rewriter.go +++ b/snapshot/snapshotfs/dir_rewriter.go @@ -18,6 +18,7 @@ import ( "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/policy" ) var dirRewriterLog = logging.Module("dirRewriter") @@ -60,18 +61,18 @@ type DirRewriter struct { } type dirRewriterRequest struct { - ctx context.Context //nolint:containedctx - parentPath string - input *snapshot.DirEntry - result *snapshot.DirEntry - compression compression.Name - err error + ctx context.Context //nolint:containedctx + parentPath string + input *snapshot.DirEntry + result *snapshot.DirEntry + metadataCompression compression.Name + err error } func (rw *DirRewriter) processRequest(pool *workshare.Pool[*dirRewriterRequest], req *dirRewriterRequest) { _ = pool - req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input, req.compression) + req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input, req.metadataCompression) } func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey { @@ -89,7 +90,7 @@ func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey return out } -func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { key := rw.getCacheKey(input) // see if we already processed this exact directory entry @@ -115,7 +116,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri // the rewriter returned a directory, we must recursively process it. if result.Type == snapshot.EntryTypeDirectory { - rep2, subdirErr := rw.processDirectory(ctx, parentPath, result, comp) + rep2, subdirErr := rw.processDirectory(ctx, parentPath, result, metadataComp) if rep2 == nil { return nil, errors.Wrap(subdirErr, input.Name) } @@ -133,7 +134,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri return result, nil } -func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { dirRewriterLog(ctx).Debugw("processDirectory", "path", pathFromRoot) r, err := rw.rep.OpenObject(ctx, entry.ObjectID) @@ -147,10 +148,10 @@ func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string return rw.opts.OnDirectoryReadFailure(ctx, pathFromRoot, entry, errors.Wrap(err, "unable to read directory entries")) } - return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries, comp) + return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries, metadataComp) } -func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { var ( builder DirManifestBuilder wg workshare.AsyncGroup[*dirRewriterRequest] @@ -167,7 +168,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s path.Join(parentPath, child.Name), child, nil, - comp, + metadataComp, nil, }) @@ -175,7 +176,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s } // run in current goroutine - replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child, comp) + replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child, metadataComp) if repErr != nil { return nil, errors.Wrap(repErr, child.Name) } @@ -197,7 +198,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason) - oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, comp) + oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp) if err != nil { return nil, errors.Wrap(err, "unable to write directory manifest") } @@ -222,8 +223,8 @@ func (rw *DirRewriter) equalEntries(e1, e2 *snapshot.DirEntry) bool { } // RewriteSnapshotManifest rewrites the directory tree starting at a given manifest. -func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest, comp compression.Name) (bool, error) { - newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry, comp) +func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest, metadataComp compression.Name) (bool, error) { + newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry, metadataComp) if err != nil { return false, errors.Wrapf(err, "error processing snapshot %v", man.ID) } @@ -276,7 +277,13 @@ func RewriteAsStub(rep repo.RepositoryWriter) RewriteFailedEntryCallback { return nil, errors.Wrap(err, "error writing stub contents") } - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + pol, _, _, err := policy.GetEffectivePolicy(ctx, rep, policy.GlobalPolicySourceInfo) + if err != nil { + return nil, errors.Wrap(err, "error getting policy") + + } + metadataCompressor := pol.MetadataCompressionPolicy.MetadataCompressor() + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: metadataCompressor}) n, err := buf.WriteTo(w) if err != nil { diff --git a/snapshot/snapshotfs/dir_writer.go b/snapshot/snapshotfs/dir_writer.go index 3e2de73908d..39556547a33 100644 --- a/snapshot/snapshotfs/dir_writer.go +++ b/snapshot/snapshotfs/dir_writer.go @@ -12,11 +12,12 @@ import ( "github.com/kopia/kopia/snapshot" ) -func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, comp compression.Name) (object.ID, error) { +func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) { writer := rep.NewObjectWriter(ctx, object.WriterOptions{ - Description: "DIR:" + dirRelativePath, - Prefix: objectIDPrefixDirectory, - Compressor: comp, + Description: "DIR:" + dirRelativePath, + Prefix: objectIDPrefixDirectory, + Compressor: metadataComp, + MetadataCompressor: metadataComp, }) defer writer.Close() //nolint:errcheck diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 1c6ff4a3943..d5656cb29a3 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -161,12 +161,13 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis } comp := pol.CompressionPolicy.CompressorForFile(f) + metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor() splitterName := pol.SplitterPolicy.SplitterForFile(f) chunkSize := pol.UploadPolicy.ParallelUploadAboveSize.OrDefault(-1) if chunkSize < 0 || f.Size() <= chunkSize { // all data fits in 1 full chunks, upload directly - return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, splitterName) + return u.uploadFileData(ctx, parentCheckpointRegistry, f, f.Name(), 0, -1, comp, metadataComp, splitterName) } // we always have N+1 parts, first N are exactly chunkSize, last one has undetermined length @@ -191,11 +192,11 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis if wg.CanShareWork(u.workerPool) { // another goroutine is available, delegate to them wg.RunAsync(u.workerPool, func(_ *workshare.Pool[*uploadWorkItem], _ *uploadWorkItem) { - parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName) + parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName) }, nil) } else { // just do the work in the current goroutine - parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, splitterName) + parts[i], partErrors[i] = u.uploadFileData(ctx, parentCheckpointRegistry, f, uuid.NewString(), offset, length, comp, metadataComp, splitterName) } } @@ -206,10 +207,10 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis return nil, errors.Wrap(err, "error uploading parts") } - return concatenateParts(ctx, u.repo, f.Name(), parts) + return concatenateParts(ctx, u.repo, f.Name(), parts, metadataComp) } -func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry) (*snapshot.DirEntry, error) { +func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { var ( objectIDs []object.ID totalSize int64 @@ -221,7 +222,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin objectIDs = append(objectIDs, part.ObjectID) } - resultObject, err := rep.ConcatenateObjects(ctx, objectIDs) + resultObject, err := rep.ConcatenateObjects(ctx, objectIDs, metadataComp) if err != nil { return nil, errors.Wrap(err, "concatenate") } @@ -234,7 +235,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin return de, nil } -func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor compression.Name, splitterName string) (*snapshot.DirEntry, error) { +func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry *checkpointRegistry, f fs.File, fname string, offset, length int64, compressor, metadataComp compression.Name, splitterName string) (*snapshot.DirEntry, error) { file, err := f.Open(ctx) if err != nil { return nil, errors.Wrap(err, "unable to open file") @@ -242,10 +243,11 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry defer file.Close() //nolint:errcheck writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "FILE:" + fname, - Compressor: compressor, - Splitter: splitterName, - AsyncWrites: 1, // upload chunk in parallel to writing another chunk + Description: "FILE:" + fname, + Compressor: compressor, + MetadataCompressor: metadataComp, + Splitter: splitterName, + AsyncWrites: 1, // upload chunk in parallel to writing another chunk }) defer writer.Close() //nolint:errcheck @@ -298,7 +300,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry return de, nil } -func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink) (dirEntry *snapshot.DirEntry, ret error) { +func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath string, f fs.Symlink, metadataComp compression.Name) (dirEntry *snapshot.DirEntry, ret error) { u.Progress.HashingFile(relativePath) defer func() { @@ -312,7 +314,8 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin } writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "SYMLINK:" + f.Name(), + Description: "SYMLINK:" + f.Name(), + MetadataCompressor: metadataComp, }) defer writer.Close() //nolint:errcheck @@ -353,11 +356,13 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath }() comp := pol.CompressionPolicy.CompressorForFile(f) + metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor() writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "STREAMFILE:" + f.Name(), - Compressor: comp, - Splitter: pol.SplitterPolicy.SplitterForFile(f), + Description: "STREAMFILE:" + f.Name(), + Compressor: comp, + MetadataCompressor: metadataComp, + Splitter: pol.SplitterPolicy.SplitterForFile(f), }) defer writer.Close() //nolint:errcheck @@ -903,7 +908,8 @@ func (u *Uploader) processSingle( return nil case fs.Symlink: - de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry) + childTree := policyTree.Child(entry.Name()) + de, err := u.uploadSymlinkInternal(ctx, entryRelativePath, entry, childTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()) return u.processEntryUploadResult(ctx, de, err, entryRelativePath, parentDirBuilder, policyTree.EffectivePolicy().ErrorHandlingPolicy.IgnoreFileErrors.OrDefault(false), diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 934c0a9b4f5..b82325b8e02 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -38,6 +38,8 @@ import ( "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob/filesystem" bloblogging "github.com/kopia/kopia/repo/blob/logging" + "github.com/kopia/kopia/repo/compression" + "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" @@ -228,6 +230,106 @@ func TestUpload(t *testing.T) { } } +type entry struct { + name string + objectID object.ID +} + +// findAllEntries recursively iterates over all the dirs and returns list of file entries +func findAllEntries(t *testing.T, ctx context.Context, dir fs.Directory) []entry { + entries := []entry{} + fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { + oid, err := object.ParseID(e.(object.HasObjectID).ObjectID().String()) + require.NoError(t, err) + entries = append(entries, entry{ + name: e.Name(), + objectID: oid, + }) + if e.IsDir() { + entries = append(entries, findAllEntries(t, ctx, e.(fs.Directory))...) + } + return nil + }) + return entries +} + +func verifyMetadataCompressor(t *testing.T, ctx context.Context, rep repo.Repository, entries []entry, comp compression.HeaderID) { + for _, e := range entries { + cid, _, ok := e.objectID.ContentID() + require.True(t, ok) + if !cid.HasPrefix() { + continue + } + info, err := rep.ContentInfo(ctx, cid) + if err != nil { + t.Errorf("failed to get content info: %v", err) + } + require.Equal(t, comp, info.CompressionHeaderID) + } +} + +func TestUploadMetadataCompression(t *testing.T) { + ctx := testlogging.Context(t) + t.Run("default metadata compression", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest) + }) + t.Run("disable metadata compression", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + // policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + MetadataCompressionPolicy: policy.MetadataCompressionPolicy{ + CompressorName: "none", + }, + }, + }, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression) + }) + t.Run("set metadata compressor", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + MetadataCompressionPolicy: policy.MetadataCompressionPolicy{ + CompressorName: "gzip", + }, + }, + }, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID()) + }) +} + func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) { ctx := testlogging.Context(t) th := newUploadTestHarness(ctx, t)