Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug when renaming multiple objects #1607

Draft
wants to merge 13 commits into
base: api-breakers
Choose a base branch
from
56 changes: 51 additions & 5 deletions internal/sql/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ import (
)

type (
Migration struct {
ID string
Migrate func(tx Tx) error
}

// Migrator is an interface for defining database-specific helper methods
// required during migrations
Migrator interface {
Expand All @@ -29,8 +24,19 @@ type (
MainMigrator interface {
Migrator
MakeDirsForPath(ctx context.Context, tx Tx, path string) (int64, error)
ObjectsWithCorruptedDirectoryID(ctx context.Context, tx Tx) ([]Object, error)
UpdateSetting(ctx context.Context, tx Tx, key, value string) error
}

Migration struct {
ID string
Migrate func(tx Tx) error
}

Object struct {
ID uint
ObjectID string
}
)

var (
Expand Down Expand Up @@ -255,6 +261,46 @@ var (
return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00023_key_prefix", log)
},
},
{
ID: "00024_fix_directories",
Migrate: func(tx Tx) error {
log.Info("performing main migration '00024_fix_directories'")

// fetch corrupted objects
objects, err := m.ObjectsWithCorruptedDirectoryID(ctx, tx)
if err != nil {
return err
}
log.Infof("found %d objects with a corrupted path", len(objects))

// prepare update stmt
updateStmt, err := tx.Prepare(ctx, "UPDATE objects SET db_directory_id = ? WHERE id = ?")
if err != nil {
return fmt.Errorf("failed to prepare update statement, %w", err)
}
defer updateStmt.Close()

// loop every object and re-insert its directory
for _, o := range objects {
log.Debugf("re-inserting directory for object %v (%v)", o.ObjectID, o.ID)

// recreate dirs
dirID, err := m.MakeDirsForPath(ctx, tx, o.ObjectID)
if err != nil {
return fmt.Errorf("failed to create directory %s: %w", o.ObjectID, err)
}

// update object
_, err = updateStmt.Exec(ctx, dirID, o.ID)
if err != nil {
return fmt.Errorf("failed to execute update statement, %w", err)
}
}

log.Info("migration '00024_fix_directories' complete")
return nil
},
},
}
}
MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
Expand Down
2 changes: 1 addition & 1 deletion internal/test/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func TestObjectsWithDelimiterSlash(t *testing.T) {
// assert mime type
isDir := strings.HasSuffix(entries[i].Key, "/") && entries[i].Key != "//double/" // double is a file
if (isDir && entries[i].MimeType != "") || (!isDir && entries[i].MimeType == "") {
t.Fatal("unexpected mime type", entries[i].MimeType)
t.Fatal("unexpected mime type", entries[i], entries[i].MimeType)
}
entries[i].MimeType = ""

Expand Down
26 changes: 26 additions & 0 deletions object/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package object

import "strings"

// Directories returns the directories for the given path. When explicit is
// true, the returned directories do not include the path itself should it be a
// directory. The root path ('/') is always excluded.
func Directories(path string, explicit bool) (dirs []string) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisSchinnerl I'm not a big fan of the naming here, I went with explicit because we have this test called TestObjectsExplicitDir re-reading it now makes me think I have to inverse it actually. Holding off on that for now until we review the entire thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh I'm not a fan of having the bool. It feels a bit too forced. The way I understand it is that it's only required for InsertDirectoriesForRename. Looking at that method it seems very verbose. I'm pretty sure your first instinct was to just update the directory names instead of doing the whole mapping thing but you ran into issues due to the fact that the directories table doesn't contain the bucket.

I think it might be worth adding the bucket id to the directories table and extending the unique key to cover that.
Then simplify InsertDirectoriesForRename and finally get rid of the explicit bool since we should be able to just update all directories of the bucket that we rename within. My guess is that that's also a lot faster than looping and inserting directories.

That makes the migration more complicated but if we ever decide to use the directories table as a cache for fields like size and health of contained objects, we will need to introduce buckets to that table anyway.

if explicit {
path = strings.TrimSuffix(path, "/")
}
if path == "/" {
return nil
}
for i, r := range path {
if r != '/' {
continue
}
dir := path[:i+1]
if dir == "/" {
continue
}
dirs = append(dirs, dir)
}
return
}
28 changes: 28 additions & 0 deletions object/path_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package object

import (
"reflect"
"testing"
)

func TestDirectories(t *testing.T) {
cases := []struct {
path string
explicit bool
dirs []string
}{
{"/", true, nil},
{"/", false, nil},
{"/foo", true, nil},
{"/foo", false, nil},
{"/foo/bar", true, []string{"/foo/"}},
{"/foo/bar", false, []string{"/foo/"}},
{"/foo/bar/", true, []string{"/foo/"}},
{"/foo/bar/", false, []string{"/foo/", "/foo/bar/"}},
}
for _, c := range cases {
if got := Directories(c.path, c.explicit); !reflect.DeepEqual(got, c.dirs) {
t.Fatalf("unexpected dirs for path %v (explicit %t), %v != %v", c.path, c.explicit, got, c.dirs)
}
}
}
8 changes: 4 additions & 4 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con
func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error {
return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
// create new dir
dirID, err := tx.MakeDirsForPath(ctx, keyNew)
dirID, err := tx.InsertDirectories(ctx, object.Directories(keyNew, true))
if err != nil {
return err
}
Expand All @@ -418,10 +418,10 @@ func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew stri
func (s *SQLStore) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error {
return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
// create new dir
dirID, err := tx.MakeDirsForPath(ctx, prefixNew)
dirID, renamedIDs, err := tx.InsertDirectoriesForRename(ctx, prefixOld, prefixNew)
if err != nil {
return fmt.Errorf("RenameObjects: failed to create new directory: %w", err)
} else if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, dirID, force); err != nil {
} else if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, dirID, renamedIDs, force); err != nil {
return err
}
// prune old dirs
Expand Down Expand Up @@ -491,7 +491,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, key, contractSet, e
}

// create the dir
dirID, err := tx.MakeDirsForPath(ctx, key)
dirID, err := tx.InsertDirectories(ctx, object.Directories(key, true))
if err != nil {
return fmt.Errorf("failed to create directories for key '%s': %w", key, err)
}
Expand Down
Loading
Loading