diff --git a/internal/server/server_test.go b/internal/server/server_test.go index ecdee16e903..c99ab0e8176 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -210,13 +210,14 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository mustReadObject(ctx, t, w, result, written) ow := w.NewObjectWriter(ctx, object.WriterOptions{ - Prefix: content.ManifestContentPrefix, + Prefix: content.ManifestContentPrefix, + MetadataCompressor: "zstd-fastest", }) _, err := ow.Write([]byte{2, 3, 4}) require.NoError(t, err) - _, err = ow.Result("zstd-fastest") + _, err = ow.Result() if err == nil { return errors.Errorf("unexpected success writing object with 'm' prefix") } @@ -258,12 +259,12 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID { t.Helper() - ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := ow.Write(data) require.NoError(t, err) - result, err := ow.Result("zstd-fastest") + result, err := ow.Result() require.NoError(t, err) return result diff --git a/repo/blob/storage_extend_test.go b/repo/blob/storage_extend_test.go index c33f450e62e..c7cd60c244d 100644 --- a/repo/blob/storage_extend_test.go +++ b/repo/blob/storage_extend_test.go @@ -42,9 +42,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) { nro.RetentionPeriod = period }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") - w.Result("zstd-fastest") + w.Result() w.Close() env.RepositoryWriter.Flush(ctx) @@ -103,9 +103,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing. nro.RetentionMode = "" }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") - w.Result("zstd-fastest") + w.Result() w.Close() env.RepositoryWriter.Flush(ctx) diff --git a/repo/format/upgrade_lock_test.go b/repo/format/upgrade_lock_test.go index 56aa31c95dc..fb354b7d8b4 100644 --- a/repo/format/upgrade_lock_test.go +++ b/repo/format/upgrade_lock_test.go @@ -401,11 +401,11 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) { t.Helper() - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := w.Write(data) require.NoError(t, err, testCaseID) - _, err = w.Result("zstd-fastest") + _, err = w.Result() require.NoError(t, err, testCaseID) } diff --git a/repo/maintenance/blob_gc_test.go b/repo/maintenance/blob_gc_test.go index af7247ac51f..a1b22e05988 100644 --- a/repo/maintenance/blob_gc_test.go +++ b/repo/maintenance/blob_gc_test.go @@ -47,9 +47,9 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) { nro.BlockFormat.HMACSecret = testHMACSecret }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") - w.Result("zstd-fastest") + w.Result() w.Close() env.RepositoryWriter.Flush(ctx) diff --git a/repo/maintenance/blob_retain_test.go b/repo/maintenance/blob_retain_test.go index 7a7bf307fbf..58464e6b25d 100644 --- a/repo/maintenance/blob_retain_test.go +++ b/repo/maintenance/blob_retain_test.go @@ -43,9 +43,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) { nro.RetentionPeriod = period }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") - w.Result("zstd-fastest") + w.Result() w.Close() env.RepositoryWriter.Flush(ctx) @@ -98,9 +98,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing nro.BlockFormat.HMACSecret = testHMACSecret }, }) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) io.WriteString(w, "hello world!") - w.Result("zstd-fastest") + w.Result() w.Close() env.RepositoryWriter.Flush(ctx) diff --git a/repo/maintenance/content_rewrite_test.go b/repo/maintenance/content_rewrite_test.go index 4e20de032fd..e7e02b8ceee 100644 --- a/repo/maintenance/content_rewrite_test.go +++ b/repo/maintenance/content_rewrite_test.go @@ -79,18 +79,18 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) { // run N sessions to create N individual pack blobs for each content prefix for range tc.numPContents { require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "%v", uuid.NewString()) - _, err := ow.Result("zstd-fastest") + _, err := ow.Result() return err })) } for range tc.numQContents { require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "%v", uuid.NewString()) - _, err := ow.Result("zstd-fastest") + _, err := ow.Result() return err })) } diff --git a/repo/maintenance/maintenance_safety_test.go b/repo/maintenance/maintenance_safety_test.go index cc7ee98f008..ccd34c96e94 100644 --- a/repo/maintenance/maintenance_safety_test.go +++ b/repo/maintenance/maintenance_safety_test.go @@ -34,18 +34,18 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) { // create object that's immediately orphaned since nobody refers to it. require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "hello world") var err error - objectID, err = ow.Result("zstd-fastest") + objectID, err = ow.Result() return err })) // create another object in separate pack. require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error { - ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"}) + ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"}) fmt.Fprintf(ow, "hello universe") - _, err := ow.Result("zstd-fastest") + _, err := ow.Result() return err })) diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index 7d29ebf79dd..923aa64319a 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer { w.description = opt.Description w.prefix = opt.Prefix w.compressor = compression.ByName[opt.Compressor] + w.metadataCompressor = compression.ByName[opt.MetadataCompressor] w.totalLength = 0 w.currentPosition = 0 @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) { // in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific, // this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB), // so this method should only be used for very large files where this overhead is relatively small. -func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compression.Name) (ID, error) { +func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) { if len(objectIDs) == 0 { return EmptyID, errors.Errorf("empty list of objects") } @@ -131,9 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compres log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength) w := om.NewWriter(ctx, WriterOptions{ - Prefix: indirectContentPrefix, - Description: "CONCATENATED INDEX", - Compressor: comp, + Prefix: indirectContentPrefix, + Description: "CONCATENATED INDEX", + Compressor: metadataComp, + MetadataCompressor: metadataComp, }) defer w.Close() //nolint:errcheck @@ -141,7 +143,7 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compres return EmptyID, werr } - concatID, err := w.Result(comp) + concatID, err := w.Result() if err != nil { return EmptyID, errors.Wrap(err, "error writing concatenated index") } diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 320d0734c64..7a46e8f1a94 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -141,7 +141,7 @@ func TestWriters(t *testing.T) { t.Errorf("write error: %v", err) } - result, err := writer.Result("zstd-fastest") + result, err := writer.Result() if err != nil { t.Errorf("error getting writer results for %v, expected: %v", c.data, c.objectID.String()) continue @@ -175,13 +175,15 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) { _, _, om := setupTest(t, cmap) w := om.NewWriter(ctx, WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)) - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() require.NoError(t, err) cid, isCompressed, ok := oid.ContentID() + require.True(t, ok) require.False(t, isCompressed) // oid will not indicate compression require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid]) @@ -219,7 +221,7 @@ func TestCompression_CustomSplitters(t *testing.T) { w := om.NewWriter(ctx, tc.wo) w.Write(bytes.Repeat([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 128<<10)) - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() require.NoError(t, err) ndx, ok := oid.IndexObjectID() @@ -244,10 +246,11 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) { _, _, om := setupTest(t, nil) w := om.NewWriter(ctx, WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)) - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() require.NoError(t, err) _, isCompressed, ok := oid.ContentID() @@ -263,7 +266,7 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) { writer := om.NewWriter(ctx, WriterOptions{}) writer.Write(b[0:50]) writer.Write(b[0:50]) - result, err := writer.Result("zstd-fastest") + result, err := writer.Result() if !objectIDsEqual(result, mustParseID(t, "cd00e292c5970d3c5e2f0ffa5171e555bc46bfc4faddfb4a418b6840b86e79a3")) { t.Errorf("unexpected result: %v err: %v", result, err) @@ -280,25 +283,25 @@ func TestCheckpointing(t *testing.T) { allZeroes := make([]byte, 1<<20) // empty file, nothing flushed - checkpoint1, err := writer.Checkpoint("zstd-fastest") + checkpoint1, err := writer.Checkpoint() verifyNoError(t, err) // write some bytes, but not enough to flush. writer.Write(allZeroes[0:50]) - checkpoint2, err := writer.Checkpoint("zstd-fastest") + checkpoint2, err := writer.Checkpoint() verifyNoError(t, err) // write enough to flush first content. writer.Write(allZeroes) - checkpoint3, err := writer.Checkpoint("zstd-fastest") + checkpoint3, err := writer.Checkpoint() verifyNoError(t, err) // write enough to flush second content. writer.Write(allZeroes) - checkpoint4, err := writer.Checkpoint("zstd-fastest") + checkpoint4, err := writer.Checkpoint() verifyNoError(t, err) - result, err := writer.Result("zstd-fastest") + result, err := writer.Result() verifyNoError(t, err) if !objectIDsEqual(checkpoint1, EmptyID) { @@ -309,7 +312,7 @@ func TestCheckpointing(t *testing.T) { t.Errorf("unexpected checkpoint2: %v err: %v", checkpoint2, err) } - result2, err := writer.Checkpoint("zstd-fastest") + result2, err := writer.Checkpoint() verifyNoError(t, err) if result2 != result { @@ -354,13 +357,13 @@ func TestObjectWriterRaceBetweenCheckpointAndResult(t *testing.T) { var eg errgroup.Group eg.Go(func() error { - _, rerr := w.Result("zstd-fastest") + _, rerr := w.Result() return rerr }) eg.Go(func() error { - cpID, cperr := w.Checkpoint("zstd-fastest") + cpID, cperr := w.Checkpoint() if cperr == nil && cpID != EmptyID { ids, verr := VerifyObject(ctx, om.contentMgr, cpID) if verr != nil { @@ -467,7 +470,8 @@ func TestIndirection(t *testing.T) { t.Errorf("write error: %v", err) } - result, err := writer.Result("zstd-fastest") + // Disable metadata compression here + result, err := writer.Result() if err != nil { t.Errorf("error getting writer results: %v", err) } @@ -512,7 +516,7 @@ func TestHMAC(t *testing.T) { w := om.NewWriter(ctx, WriterOptions{}) w.Write(c) - result, err := w.Result("zstd-fastest") + result, err := w.Result() if result.String() != "cad29ff89951a3c085c86cb7ed22b82b51f7bdfda24f932c7f9601f51d5975ba" { t.Errorf("unexpected result: %v err: %v", result.String(), err) @@ -645,7 +649,7 @@ func mustWriteObject(t *testing.T, om *Manager, data []byte, compressor compress _, err := w.Write(data) require.NoError(t, err) - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() require.NoError(t, err) return oid @@ -728,7 +732,7 @@ func TestEndToEndReadAndSeek(t *testing.T) { t.Errorf("write error: %v", err) } - objectID, err := writer.Result("zstd-fastest") + objectID, err := writer.Result() t.Logf("oid: %v", objectID) writer.Close() @@ -784,7 +788,7 @@ func TestEndToEndReadAndSeekWithCompression(t *testing.T) { totalBytesWritten += size - objectID, err := writer.Result("zstd-fastest") + objectID, err := writer.Result() writer.Close() @@ -876,7 +880,7 @@ func TestSeek(t *testing.T) { t.Errorf("write error: %v", err) } - objectID, err := writer.Result("zstd-fastest") + objectID, err := writer.Result() if err != nil { t.Fatalf("unable to write: %v", err) } @@ -938,7 +942,7 @@ func TestWriterFlushFailure_OnFlush(t *testing.T) { fcm.writeContentError = errSomeError - _, err = w.Result("zstd-fastest") + _, err = w.Result() require.ErrorIs(t, err, errSomeError) } @@ -951,7 +955,7 @@ func TestWriterFlushFailure_OnCheckpoint(t *testing.T) { w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1e6)) fcm.writeContentError = errSomeError - _, err := w.Checkpoint("zstd-fastest") + _, err := w.Checkpoint() require.ErrorIs(t, err, errSomeError) } @@ -970,7 +974,7 @@ func TestWriterFlushFailure_OnAsyncWrite(t *testing.T) { require.NotErrorIs(t, err, errSomeError) require.Equal(t, 4000000, n) - _, err = w.Result("zstd-fastest") + _, err = w.Result() require.ErrorIs(t, err, errSomeError) } diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 8b8cd47ee91..ee5661904bb 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -27,10 +27,10 @@ type Writer interface { // Checkpoint returns ID of an object consisting of all contents written to storage so far. // This may not include some data buffered in the writer. // In case nothing has been written yet, returns empty object ID. - Checkpoint(comp compression.Name) (ID, error) + Checkpoint() (ID, error) // Result returns object ID representing all bytes written to the writer. - Result(comp compression.Name) (ID, error) + Result() (ID, error) } type contentIDTracker struct { @@ -68,7 +68,8 @@ type objectWriter struct { om *Manager - compressor compression.Compressor + compressor compression.Compressor + metadataCompressor compression.Compressor prefix content.IDPrefix buffer gather.WriteBuffer @@ -251,7 +252,7 @@ func maybeCompressedContentBytes(comp compression.Compressor, input gather.Bytes return input, false, nil } -func (w *objectWriter) Result(comp compression.Name) (ID, error) { +func (w *objectWriter) Result() (ID, error) { w.mu.Lock() defer w.mu.Unlock() @@ -263,19 +264,19 @@ func (w *objectWriter) Result(comp compression.Name) (ID, error) { } } - return w.checkpointLocked(comp) + return w.checkpointLocked() } // Checkpoint returns object ID which represents portion of the object that has already been written. // The result may be an empty object ID if nothing has been flushed yet. -func (w *objectWriter) Checkpoint(comp compression.Name) (ID, error) { +func (w *objectWriter) Checkpoint() (ID, error) { w.mu.Lock() defer w.mu.Unlock() - return w.checkpointLocked(comp) + return w.checkpointLocked() } -func (w *objectWriter) checkpointLocked(comp compression.Name) (ID, error) { +func (w *objectWriter) checkpointLocked() (ID, error) { // wait for any in-flight asynchronous writes to finish w.asyncWritesWG.Wait() @@ -292,12 +293,13 @@ func (w *objectWriter) checkpointLocked(comp compression.Name) (ID, error) { } iw := &objectWriter{ - ctx: w.ctx, - om: w.om, - compressor: compression.ByName[comp], - description: "LIST(" + w.description + ")", - splitter: w.om.newDefaultSplitter(), - prefix: w.prefix, + ctx: w.ctx, + om: w.om, + compressor: w.metadataCompressor, + metadataCompressor: w.metadataCompressor, + description: "LIST(" + w.description + ")", + splitter: w.om.newDefaultSplitter(), + prefix: w.prefix, } if iw.prefix == "" { @@ -311,7 +313,7 @@ func (w *objectWriter) checkpointLocked(comp compression.Name) (ID, error) { return EmptyID, err } - oid, err := iw.Result(comp) + oid, err := iw.Result() if err != nil { return EmptyID, err } @@ -334,9 +336,10 @@ func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error { // WriterOptions can be passed to Repository.NewWriter(). type WriterOptions struct { - Description string - Prefix content.IDPrefix // empty string or a single-character ('g'..'z') - Compressor compression.Name - Splitter string // use particular splitter instead of default - AsyncWrites int // allow up to N content writes to be asynchronous + Description string + Prefix content.IDPrefix // empty string or a single-character ('g'..'z') + Compressor compression.Name + MetadataCompressor compression.Name + Splitter string // use particular splitter instead of default + AsyncWrites int // allow up to N content writes to be asynchronous } diff --git a/repo/repo_benchmarks_test.go b/repo/repo_benchmarks_test.go index e444e62f351..abfa90b2a8f 100644 --- a/repo/repo_benchmarks_test.go +++ b/repo/repo_benchmarks_test.go @@ -15,9 +15,9 @@ func BenchmarkWriterDedup1M(b *testing.B) { ctx, env := repotesting.NewEnvironment(b, format.FormatVersion2) dataBuf := make([]byte, 4<<20) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(dataBuf) - _, err := writer.Result("zstd-fastest") + _, err := writer.Result() require.NoError(b, err) writer.Close() @@ -25,9 +25,9 @@ func BenchmarkWriterDedup1M(b *testing.B) { for range b.N { // write exactly the same data - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(dataBuf) - writer.Result("zstd-fastest") + writer.Result() writer.Close() } } @@ -45,7 +45,7 @@ func BenchmarkWriterNoDedup1M(b *testing.B) { for i := range b.N { // write exactly the same data - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if i+chunkSize > len(dataBuf) { chunkSize++ @@ -54,7 +54,7 @@ func BenchmarkWriterNoDedup1M(b *testing.B) { } writer.Write(dataBuf[offset : offset+chunkSize]) - writer.Result("zstd-fastest") + writer.Result() writer.Close() offset++ diff --git a/repo/repository_test.go b/repo/repository_test.go index 23393de1f69..5e67cc3ed5d 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -47,12 +47,12 @@ func (s *formatSpecificTestSuite) TestWriters(t *testing.T) { for _, c := range cases { ctx, env := repotesting.NewEnvironment(t, s.formatVersion) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if _, err := writer.Write(c.data); err != nil { t.Fatalf("write error: %v", err) } - result, err := writer.Result("zstd-fastest") + result, err := writer.Result() if err != nil { t.Errorf("error getting writer results for %v, expected: %v", c.data, c.objectID.String()) continue @@ -74,10 +74,10 @@ func (s *formatSpecificTestSuite) TestWriterCompleteChunkInTwoWrites(t *testing. ctx, env := repotesting.NewEnvironment(t, s.formatVersion) b := make([]byte, 100) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) writer.Write(b[0:50]) writer.Write(b[0:50]) - result, err := writer.Result("zstd-fastest") + result, err := writer.Result() if result != mustParseObjectID(t, "bfa2b4b9421671ab2b5bfa8c90ee33607784a27e452b08556509ef9bd47a37c6") { t.Errorf("unexpected result: %v err: %v", result, err) @@ -159,9 +159,9 @@ func (s *formatSpecificTestSuite) TestHMAC(t *testing.T) { c := bytes.Repeat([]byte{0xcd}, 50) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) w.Write(c) - result, err := w.Result("zstd-fastest") + result, err := w.Result() if result.String() != "e37e93ba74e074ad1366ee2f032ee9c3a5b81ec82c140b053c1a4e6673d5d9d9" { t.Errorf("unexpected result: %v err: %v", result.String(), err) @@ -185,12 +185,12 @@ func (s *formatSpecificTestSuite) TestReaderStoredBlockNotFound(t *testing.T) { func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) object.ID { t.Helper() - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) if _, err := w.Write(data); err != nil { t.Fatalf("can't write object %q - write failed: %v", testCaseID, err) } - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() if err != nil { t.Fatalf("can't write object %q - result failed: %v", testCaseID, err) } @@ -275,10 +275,10 @@ func TestFormats(t *testing.T) { for k, v := range c.oids { bytesToWrite := []byte(k) - w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) w.Write(bytesToWrite) - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() if err != nil { t.Errorf("error: %v", err) } @@ -555,11 +555,11 @@ func TestObjectWritesWithRetention(t *testing.T) { }, }) - writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"}) _, err := writer.Write([]byte("the quick brown fox jumps over the lazy dog")) require.NoError(t, err) - _, err = writer.Result("zstd-fastest") + _, err = writer.Result() require.NoError(t, err) env.RepositoryWriter.ContentManager().Flush(ctx) @@ -774,13 +774,14 @@ func TestMetrics_CompressibleData(t *testing.T) { for ensureMapEntry(t, env.RepositoryMetrics().Snapshot(false).Counters, "content_write_duration_nanos") < 5e6 { w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{ - Compressor: "gzip", + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", }) w.Write(inputData) var err error - oid, err = w.Result("zstd-fastest") + oid, err = w.Result() require.NoError(t, err) count++ diff --git a/snapshot/snapshotfs/dir_rewriter.go b/snapshot/snapshotfs/dir_rewriter.go index 489b4c16c1c..8117c655e87 100644 --- a/snapshot/snapshotfs/dir_rewriter.go +++ b/snapshot/snapshotfs/dir_rewriter.go @@ -61,18 +61,18 @@ type DirRewriter struct { } type dirRewriterRequest struct { - ctx context.Context //nolint:containedctx - parentPath string - input *snapshot.DirEntry - result *snapshot.DirEntry - compression compression.Name - err error + ctx context.Context //nolint:containedctx + parentPath string + input *snapshot.DirEntry + result *snapshot.DirEntry + metadataCompression compression.Name + err error } func (rw *DirRewriter) processRequest(pool *workshare.Pool[*dirRewriterRequest], req *dirRewriterRequest) { _ = pool - req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input, req.compression) + req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input, req.metadataCompression) } func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey { @@ -90,7 +90,7 @@ func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey return out } -func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { key := rw.getCacheKey(input) // see if we already processed this exact directory entry @@ -116,7 +116,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri // the rewriter returned a directory, we must recursively process it. if result.Type == snapshot.EntryTypeDirectory { - rep2, subdirErr := rw.processDirectory(ctx, parentPath, result, comp) + rep2, subdirErr := rw.processDirectory(ctx, parentPath, result, metadataComp) if rep2 == nil { return nil, errors.Wrap(subdirErr, input.Name) } @@ -134,7 +134,7 @@ func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath stri return result, nil } -func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { dirRewriterLog(ctx).Debugw("processDirectory", "path", pathFromRoot) r, err := rw.rep.OpenObject(ctx, entry.ObjectID) @@ -148,10 +148,10 @@ func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string return rw.opts.OnDirectoryReadFailure(ctx, pathFromRoot, entry, errors.Wrap(err, "unable to read directory entries")) } - return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries, comp) + return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries, metadataComp) } -func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { var ( builder DirManifestBuilder wg workshare.AsyncGroup[*dirRewriterRequest] @@ -168,7 +168,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s path.Join(parentPath, child.Name), child, nil, - comp, + metadataComp, nil, }) @@ -176,7 +176,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s } // run in current goroutine - replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child, comp) + replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child, metadataComp) if repErr != nil { return nil, errors.Wrap(repErr, child.Name) } @@ -198,7 +198,7 @@ func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath s dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason) - oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, comp) + oid, err := writeDirManifest(ctx, rw.rep, entry.ObjectID.String(), dm, metadataComp) if err != nil { return nil, errors.Wrap(err, "unable to write directory manifest") } @@ -223,8 +223,8 @@ func (rw *DirRewriter) equalEntries(e1, e2 *snapshot.DirEntry) bool { } // RewriteSnapshotManifest rewrites the directory tree starting at a given manifest. -func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest, comp compression.Name) (bool, error) { - newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry, comp) +func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest, metadataComp compression.Name) (bool, error) { + newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry, metadataComp) if err != nil { return false, errors.Wrapf(err, "error processing snapshot %v", man.ID) } @@ -277,19 +277,20 @@ func RewriteAsStub(rep repo.RepositoryWriter) RewriteFailedEntryCallback { return nil, errors.Wrap(err, "error writing stub contents") } - w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + pol, _, _, err := policy.GetEffectivePolicy(ctx, rep, policy.GlobalPolicySourceInfo) + if err != nil { + return nil, errors.Wrap(err, "error getting policy") + + } + metadataCompressor := pol.MetadataCompressionPolicy.MetadataCompressor() + w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: metadataCompressor}) n, err := buf.WriteTo(w) if err != nil { return nil, errors.Wrap(err, "error writing stub") } - pol, _, _, err := policy.GetEffectivePolicy(ctx, rep, policy.GlobalPolicySourceInfo) - if err != nil { - return nil, errors.Wrap(err, "error getting policy") - - } - oid, err := w.Result(pol.MetadataCompressionPolicy.MetadataCompressor()) + oid, err := w.Result() if err != nil { return nil, errors.Wrap(err, "error writing stub") } diff --git a/snapshot/snapshotfs/dir_writer.go b/snapshot/snapshotfs/dir_writer.go index 86c403fd341..39556547a33 100644 --- a/snapshot/snapshotfs/dir_writer.go +++ b/snapshot/snapshotfs/dir_writer.go @@ -12,11 +12,12 @@ import ( "github.com/kopia/kopia/snapshot" ) -func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, comp compression.Name) (object.ID, error) { +func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativePath string, dirManifest *snapshot.DirManifest, metadataComp compression.Name) (object.ID, error) { writer := rep.NewObjectWriter(ctx, object.WriterOptions{ - Description: "DIR:" + dirRelativePath, - Prefix: objectIDPrefixDirectory, - Compressor: comp, + Description: "DIR:" + dirRelativePath, + Prefix: objectIDPrefixDirectory, + Compressor: metadataComp, + MetadataCompressor: metadataComp, }) defer writer.Close() //nolint:errcheck @@ -25,7 +26,7 @@ func writeDirManifest(ctx context.Context, rep repo.RepositoryWriter, dirRelativ return object.EmptyID, errors.Wrap(err, "unable to encode directory JSON") } - oid, err := writer.Result(comp) + oid, err := writer.Result() if err != nil { return object.EmptyID, errors.Wrap(err, "unable to write directory") } diff --git a/snapshot/snapshotfs/upload.go b/snapshot/snapshotfs/upload.go index 0160d7451a4..d5656cb29a3 100644 --- a/snapshot/snapshotfs/upload.go +++ b/snapshot/snapshotfs/upload.go @@ -210,7 +210,7 @@ func (u *Uploader) uploadFileInternal(ctx context.Context, parentCheckpointRegis return concatenateParts(ctx, u.repo, f.Name(), parts, metadataComp) } -func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry, comp compression.Name) (*snapshot.DirEntry, error) { +func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name string, parts []*snapshot.DirEntry, metadataComp compression.Name) (*snapshot.DirEntry, error) { var ( objectIDs []object.ID totalSize int64 @@ -222,7 +222,7 @@ func concatenateParts(ctx context.Context, rep repo.RepositoryWriter, name strin objectIDs = append(objectIDs, part.ObjectID) } - resultObject, err := rep.ConcatenateObjects(ctx, objectIDs, comp) + resultObject, err := rep.ConcatenateObjects(ctx, objectIDs, metadataComp) if err != nil { return nil, errors.Wrap(err, "concatenate") } @@ -243,15 +243,16 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry defer file.Close() //nolint:errcheck writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "FILE:" + fname, - Compressor: compressor, - Splitter: splitterName, - AsyncWrites: 1, // upload chunk in parallel to writing another chunk + Description: "FILE:" + fname, + Compressor: compressor, + MetadataCompressor: metadataComp, + Splitter: splitterName, + AsyncWrites: 1, // upload chunk in parallel to writing another chunk }) defer writer.Close() //nolint:errcheck parentCheckpointRegistry.addCheckpointCallback(fname, func() (*snapshot.DirEntry, error) { - checkpointID, err := writer.Checkpoint(metadataComp) + checkpointID, err := writer.Checkpoint() if err != nil { return nil, errors.Wrap(err, "checkpoint error") } @@ -281,7 +282,7 @@ func (u *Uploader) uploadFileData(ctx context.Context, parentCheckpointRegistry return nil, err } - r, err := writer.Result(metadataComp) + r, err := writer.Result() if err != nil { return nil, errors.Wrap(err, "unable to get result") } @@ -313,7 +314,8 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin } writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "SYMLINK:" + f.Name(), + Description: "SYMLINK:" + f.Name(), + MetadataCompressor: metadataComp, }) defer writer.Close() //nolint:errcheck @@ -322,7 +324,7 @@ func (u *Uploader) uploadSymlinkInternal(ctx context.Context, relativePath strin return nil, err } - r, err := writer.Result(metadataComp) + r, err := writer.Result() if err != nil { return nil, errors.Wrap(err, "unable to get result") } @@ -357,9 +359,10 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath metadataComp := pol.MetadataCompressionPolicy.MetadataCompressor() writer := u.repo.NewObjectWriter(ctx, object.WriterOptions{ - Description: "STREAMFILE:" + f.Name(), - Compressor: comp, - Splitter: pol.SplitterPolicy.SplitterForFile(f), + Description: "STREAMFILE:" + f.Name(), + Compressor: comp, + MetadataCompressor: metadataComp, + Splitter: pol.SplitterPolicy.SplitterForFile(f), }) defer writer.Close() //nolint:errcheck @@ -369,7 +372,7 @@ func (u *Uploader) uploadStreamingFileInternal(ctx context.Context, relativePath return nil, err } - r, err := writer.Result(metadataComp) + r, err := writer.Result() if err != nil { return nil, errors.Wrap(err, "unable to get result") } diff --git a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go index dff3274dacc..bb787fb5888 100644 --- a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go +++ b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go @@ -388,7 +388,7 @@ func create4ByteObjects(t *testing.T, r repo.Repository, base, count int) []obje _, err := w.Write(b[:]) require.NoError(t, err) - oid, err := w.Result("zstd-fastest") + oid, err := w.Result() require.NoError(t, err) require.NoError(t, w.Close())