diff --git a/internal/sql/migrations.go b/internal/sql/migrations.go index 167e60cb5..0d521fef3 100644 --- a/internal/sql/migrations.go +++ b/internal/sql/migrations.go @@ -13,11 +13,6 @@ import ( ) type ( - Migration struct { - ID string - Migrate func(tx Tx) error - } - // Migrator is an interface for defining database-specific helper methods // required during migrations Migrator interface { @@ -29,8 +24,19 @@ type ( MainMigrator interface { Migrator MakeDirsForPath(ctx context.Context, tx Tx, path string) (int64, error) + ObjectsWithCorruptedDirectoryID(ctx context.Context, tx Tx) ([]Object, error) UpdateSetting(ctx context.Context, tx Tx, key, value string) error } + + Migration struct { + ID string + Migrate func(tx Tx) error + } + + Object struct { + ID uint + ObjectID string + } ) var ( @@ -255,6 +261,46 @@ var ( return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00023_key_prefix", log) }, }, + { + ID: "00024_fix_directories", + Migrate: func(tx Tx) error { + log.Info("performing main migration '00024_fix_directories'") + + // fetch corrupted objects + objects, err := m.ObjectsWithCorruptedDirectoryID(ctx, tx) + if err != nil { + return err + } + log.Infof("found %d objects with a corrupted path", len(objects)) + + // prepare update stmt + updateStmt, err := tx.Prepare(ctx, "UPDATE objects SET db_directory_id = ? WHERE id = ?") + if err != nil { + return fmt.Errorf("failed to prepare update statement, %w", err) + } + defer updateStmt.Close() + + // loop every object and re-insert its directory + for _, o := range objects { + log.Debugf("re-inserting directory for object %v (%v)", o.ObjectID, o.ID) + + // recreate dirs + dirID, err := m.MakeDirsForPath(ctx, tx, o.ObjectID) + if err != nil { + return fmt.Errorf("failed to create directory %s: %w", o.ObjectID, err) + } + + // update object + _, err = updateStmt.Exec(ctx, dirID, o.ID) + if err != nil { + return fmt.Errorf("failed to execute update statement, %w", err) + } + } + + log.Info("migration '00024_fix_directories' complete") + return nil + }, + }, } } MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 32164f75c..36240717c 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -454,7 +454,7 @@ func TestObjectsWithDelimiterSlash(t *testing.T) { // assert mime type isDir := strings.HasSuffix(entries[i].Key, "/") && entries[i].Key != "//double/" // double is a file if (isDir && entries[i].MimeType != "") || (!isDir && entries[i].MimeType == "") { - t.Fatal("unexpected mime type", entries[i].MimeType) + t.Fatal("unexpected mime type", entries[i], entries[i].MimeType) } entries[i].MimeType = "" diff --git a/object/path.go b/object/path.go new file mode 100644 index 000000000..c43a6fafa --- /dev/null +++ b/object/path.go @@ -0,0 +1,26 @@ +package object + +import "strings" + +// Directories returns the directories for the given path. When explicit is +// true, the returned directories do not include the path itself should it be a +// directory. The root path ('/') is always excluded. +func Directories(path string, explicit bool) (dirs []string) { + if explicit { + path = strings.TrimSuffix(path, "/") + } + if path == "/" { + return nil + } + for i, r := range path { + if r != '/' { + continue + } + dir := path[:i+1] + if dir == "/" { + continue + } + dirs = append(dirs, dir) + } + return +} diff --git a/object/path_test.go b/object/path_test.go new file mode 100644 index 000000000..cdaeba2b9 --- /dev/null +++ b/object/path_test.go @@ -0,0 +1,28 @@ +package object + +import ( + "reflect" + "testing" +) + +func TestDirectories(t *testing.T) { + cases := []struct { + path string + explicit bool + dirs []string + }{ + {"/", true, nil}, + {"/", false, nil}, + {"/foo", true, nil}, + {"/foo", false, nil}, + {"/foo/bar", true, []string{"/foo/"}}, + {"/foo/bar", false, []string{"/foo/"}}, + {"/foo/bar/", true, []string{"/foo/"}}, + {"/foo/bar/", false, []string{"/foo/", "/foo/bar/"}}, + } + for _, c := range cases { + if got := Directories(c.path, c.explicit); !reflect.DeepEqual(got, c.dirs) { + t.Fatalf("unexpected dirs for path %v (explicit %t), %v != %v", c.path, c.explicit, got, c.dirs) + } + } +} diff --git a/stores/metadata.go b/stores/metadata.go index 0feb77bb5..7e6d5d3af 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -400,7 +400,7 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error { return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { // create new dir - dirID, err := tx.MakeDirsForPath(ctx, keyNew) + dirID, err := tx.InsertDirectories(ctx, object.Directories(keyNew, true)) if err != nil { return err } @@ -418,10 +418,10 @@ func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew stri func (s *SQLStore) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error { return s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { // create new dir - dirID, err := tx.MakeDirsForPath(ctx, prefixNew) + dirID, renamedIDs, err := tx.InsertDirectoriesForRename(ctx, prefixOld, prefixNew) if err != nil { return fmt.Errorf("RenameObjects: failed to create new directory: %w", err) - } else if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, dirID, force); err != nil { + } else if err := tx.RenameObjects(ctx, bucket, prefixOld, prefixNew, dirID, renamedIDs, force); err != nil { return err } // prune old dirs @@ -491,7 +491,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, key, contractSet, e } // create the dir - dirID, err := tx.MakeDirsForPath(ctx, key) + dirID, err := tx.InsertDirectories(ctx, object.Directories(key, true)) if err != nil { return fmt.Errorf("failed to create directories for key '%s': %w", key, err) } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 9bd9e3b35..1c63af125 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -2200,6 +2200,8 @@ func TestRenameObjects(t *testing.T) { "/fileś/dir/1b", "/fileś/dir/2b", "/fileś/dir/3b", + "/folder/file1", + "/folder/foo/file2", "/foo", "/bar", "/baz", @@ -2222,6 +2224,9 @@ func TestRenameObjects(t *testing.T) { } // Perform some renames. + if err := ss.RenameObjectsBlocking(ctx, testBucket, "/folder/", "/fileś/", false); err != nil { + t.Fatal(err) + } if err := ss.RenameObjectsBlocking(ctx, testBucket, "/fileś/dir/", "/fileś/", false); err != nil { t.Fatal(err) } @@ -2250,6 +2255,8 @@ func TestRenameObjects(t *testing.T) { // Paths after. objectsAfter := []string{ + "/fileś/file1", + "/fileś/foo/file2", "/fileś/1a", "/fileś/2a", "/fileś/3a", @@ -2267,68 +2274,58 @@ func TestRenameObjects(t *testing.T) { objectsAfterMap[path] = struct{}{} } - // Assert that number of objects matches. - resp, err := ss.Objects(ctx, testBucket, "", "/", "", "", "", "", 100, object.EncryptionKey{}) - if err != nil { + // Assert that number of objects matches and paths are correct. + if resp, err := ss.Objects(ctx, testBucket, "", "", "", "", "", "", 100, object.EncryptionKey{}); err != nil { t.Fatal(err) - } - if len(resp.Objects) != len(objectsAfter) { + } else if len(resp.Objects) != len(objectsAfter) { t.Fatal("unexpected number of objects", len(resp.Objects), len(objectsAfter)) - } - - // Assert paths are correct. - for _, obj := range resp.Objects { - if _, exists := objectsAfterMap[obj.Key]; !exists { - t.Fatal("unexpected path", obj.Key) + } else { + for _, obj := range resp.Objects { + if _, exists := objectsAfterMap[obj.Key]; !exists { + t.Fatal("unexpected path", obj.Key) + } } } - // Assert directories are correct - expectedDirs := []struct { - id int64 - parentID int64 - name string - }{ - { - id: 1, - parentID: 0, - name: "/", - }, - { - id: 2, - parentID: 1, - name: "/fileś/", - }, + // Assert everything is under one folder in the root directory + if resp, err := ss.Objects(ctx, testBucket, "", "", "/", "", "", "", 100, object.EncryptionKey{}); err != nil { + t.Fatal(err) + } else if len(resp.Objects) != 1 { + t.Fatal("unexpected number of objects", len(resp.Objects)) + } else if resp.Objects[0].Key != "/fileś/" { + t.Fatal("unexpected folder", resp.Objects[0]) } - var n int64 - if err := ss.DB().QueryRow(ctx, "SELECT COUNT(*) FROM directories").Scan(&n); err != nil { + // Assert file2 's parent dir id has not been updated and is still under the foo directory + if resp, err := ss.Objects(ctx, testBucket, "/fileś/foo/", "", "/", "", "", "", 100, object.EncryptionKey{}); err != nil { t.Fatal(err) - } else if n != int64(len(expectedDirs)) { - t.Fatalf("unexpected number of directories, %v != %v", n, len(expectedDirs)) + } else if len(resp.Objects) != 1 { + t.Fatal("unexpected number of objects", len(resp.Objects)) + } else if resp.Objects[0].Key != "/fileś/foo/file2" { + t.Fatal("unexpected folder", resp.Objects[0]) } - type row struct { - ID int64 - ParentID int64 - Name string + // Assert directories are correct + expectedDirs := map[string]string{ + "/": "NULL", + "/fileś/": "/", + "/fileś/foo/": "/fileś/", } - rows, err := ss.DB().Query(context.Background(), "SELECT id, COALESCE(db_parent_id, 0), name FROM directories ORDER BY id ASC") + + rows, err := ss.DB().Query(context.Background(), "SELECT d1.name, COALESCE(d2.name, 'NULL') as parent FROM directories d1 LEFT JOIN directories d2 ON d1.db_parent_id = d2.id ") if err != nil { t.Fatal(err) } defer rows.Close() var i int for rows.Next() { - var dir row - if err := rows.Scan(&dir.ID, &dir.ParentID, &dir.Name); err != nil { + var dir, parent string + if err := rows.Scan(&dir, &parent); err != nil { t.Fatal(err) - } else if dir.ID != expectedDirs[i].id { - t.Fatalf("unexpected directory id, %v != %v", dir.ID, expectedDirs[i].id) - } else if dir.ParentID != expectedDirs[i].parentID { - t.Fatalf("unexpected directory parent id, %v != %v", dir.ParentID, expectedDirs[i].parentID) - } else if dir.Name != expectedDirs[i].name { - t.Fatalf("unexpected directory name, %v != %v", dir.Name, expectedDirs[i].name) + } else if expectedParent, ok := expectedDirs[dir]; !ok { + t.Fatalf("unexpected directory %v", dir) + } else if parent != expectedParent { + t.Fatalf("unexpected parent, %v != %v", parent, expectedParent) } i++ } @@ -2337,6 +2334,83 @@ func TestRenameObjects(t *testing.T) { } } +func TestRenameObjectsRegression(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // define directory structure + objects := []string{ + "/firefly/s1/", + "/firefly/s2/", + "/suits/s1/", + "/lost/", + "/movie", + + "/firefly/trailer", + "/firefly/s1/ep1", + "/firefly/s1/ep2", + "/firefly/s2/ep1", + } + + // define a helper to assert the number of objects with given prefix + ctx := context.Background() + assertNumObjects := func(prefix, delimiter string, n int) { + t.Helper() + if resp, err := ss.Objects(ctx, testBucket, prefix, "", delimiter, "", "", "", -1, object.EncryptionKey{}); err != nil { + t.Fatal(err) + } else if len(resp.Objects) != n { + t.Fatalf("unexpected number of objects %d != %d, objects:\n%+v", len(resp.Objects), n, resp.Objects) + } + } + + // persist the structure + for _, path := range objects { + var s int + if !strings.HasSuffix(path, "/") { + s = 1 + } + if _, err := ss.addTestObject(path, newTestObject(s)); err != nil { + t.Fatal(err) + } + } + + // assert the structure + assertNumObjects("/", "/", 4) + assertNumObjects("/firefly", "", 6) + assertNumObjects("/firefly/", "/", 3) + assertNumObjects("/firefly/s1/", "/", 2) + assertNumObjects("/firefly/s2/", "/", 1) + assertNumObjects("/suits/", "/", 1) + assertNumObjects("/lost/", "/", 0) + + // assert we can't rename to an already existing directory without force + if err := ss.RenameObjects(ctx, testBucket, "/firefly/s1/", "/firefly/s2/", false); !errors.Is(err, api.ErrObjectExists) { + t.Fatal("unexpected error", err) + } + + // assert we can forcefully rename it + if err := ss.RenameObjects(ctx, testBucket, "/firefly/s1/", "/firefly/s2/", true); err != nil { + t.Fatal(err) + } + assertNumObjects("/firefly/s2/", "/", 2) + + // assert we can rename it and its children still point to the right directory + if err := ss.RenameObjects(ctx, testBucket, "/firefly/s2/", "/firefly/s02/", false); err != nil { + t.Fatal(err) + } + assertNumObjects("/firefly/s2/", "/", 0) + assertNumObjects("/firefly/s02/", "/", 2) + + // assert we rename a grand parent and all children remain intact + if err := ss.RenameObjects(ctx, testBucket, "/firefly/", "/gotham/", true); err != nil { + t.Fatal(err) + } + + assertNumObjects("/gotham/", "/", 2) + assertNumObjects("/gotham/s02/", "/", 2) + assertNumObjects("/", "/", 4) +} + // TestObjectsStats is a unit test for ObjectsStats. func TestObjectsStats(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) @@ -4009,7 +4083,7 @@ func TestSlabCleanup(t *testing.T) { var dirID int64 err = ss.db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { var err error - dirID, err = tx.MakeDirsForPath(context.Background(), "1") + dirID, err = tx.InsertDirectories(context.Background(), object.Directories("1", true)) return err }) if err != nil { @@ -4550,20 +4624,20 @@ func TestDirectories(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() - objects := []string{ + paths := []string{ "/foo", - "/bar/baz", + "/fileś/baz", "///somefile", "/dir/fakedir/", "/", - "/bar/fileinsamedirasbefore", + "/fileś/fileinsamedirasbefore", } - for _, o := range objects { + for _, o := range paths { var dirID int64 err := ss.db.Transaction(context.Background(), func(tx sql.DatabaseTx) error { var err error - dirID, err = tx.MakeDirsForPath(context.Background(), o) + dirID, err = tx.InsertDirectories(context.Background(), object.Directories(o, true)) return err }) if err != nil { @@ -4584,7 +4658,7 @@ func TestDirectories(t *testing.T) { parentID: 0, }, { - name: "/bar/", + name: "/fileś/", id: 2, parentID: 1, }, diff --git a/stores/sql/database.go b/stores/sql/database.go index 717fc39e6..186c3ec7d 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -177,6 +177,16 @@ type ( // that was created. InsertBufferedSlab(ctx context.Context, fileName string, contractSetID int64, ec object.EncryptionKey, minShards, totalShards uint8) (int64, error) + // InsertDirectories inserts the given directories and returns the ID of + // the child directory. + InsertDirectories(ctx context.Context, dirs []string) (int64, error) + + // InsertDirectoriesForRename will insert directories for the new prefix + // and return the ID of the child directory as well as a mapping for the + // old directories to the new. This is necessary so the corresponding + // directory IDs on the objects can be updated accordingly. + InsertDirectoriesForRename(ctx context.Context, prefixOld, prefixNew string) (int64, []int64, error) + // InsertMultipartUpload creates a new multipart upload and returns a // unique upload ID. InsertMultipartUpload(ctx context.Context, bucket, key string, ec object.EncryptionKey, mimeType string, metadata api.ObjectUserMetadata) (string, error) @@ -188,9 +198,6 @@ type ( // are associated with any of the provided contracts. InvalidateSlabHealthByFCID(ctx context.Context, fcids []types.FileContractID, limit int64) (int64, error) - // MakeDirsForPath creates all directories for a given object's path. - MakeDirsForPath(ctx context.Context, path string) (int64, error) - // MarkPackedSlabUploaded marks the packed slab as uploaded in the // database, causing the provided shards to be associated with the slab. // The returned string contains the filename of the slab buffer on disk. @@ -285,7 +292,7 @@ type ( // `api.ErrOBjectNotFound` is returned. If 'force' is false and an // object already exists with the new prefix, `api.ErrObjectExists` is // returned. - RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error + RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, renamedIDs []int64, force bool) error // RenewedContract returns the metadata of the contract that was renewed // from the specified contract or ErrContractNotFound otherwise. diff --git a/stores/sql/main.go b/stores/sql/main.go index 314249a8a..4fd587722 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -1603,6 +1603,28 @@ func ObjectsStats(ctx context.Context, tx sql.Tx, opts api.ObjectsStatsOpts) (ap }, nil } +func ObjectsWithCorruptedDirectoryID(ctx context.Context, tx sql.Tx) ([]sql.Object, error) { + rows, err := tx.Query(ctx, ` +SELECT o.id, o.object_id +FROM objects o +INNER JOIN directories d ON o.db_directory_id = d.id +WHERE INSTR(REPLACE(o.object_id, d.name, ""), "/") > 0`) + if err != nil { + return nil, fmt.Errorf("failed to fetch corrupted objects, %w", err) + } + defer rows.Close() + + var objects []sql.Object + for rows.Next() { + var o sql.Object + if err := rows.Scan(&o.ID, &o.ObjectID); err != nil { + return nil, fmt.Errorf("failed to scan object row, %w", err) + } + objects = append(objects, o) + } + return objects, nil +} + func PeerBanned(ctx context.Context, tx sql.Tx, addr string) (bool, error) { // normalize the address to a CIDR netCIDR, err := NormalizePeer(addr) @@ -2097,6 +2119,19 @@ WHERE fcid = ?`, return nil } +func UpdateObjectDirectorIdExpr(dirIds []int64) string { + if len(dirIds) == 0 { + return "db_directory_id = ?" + } + + expr := "db_directory_id = CASE " + for i := 0; i < len(dirIds); i += 2 { + expr += "WHEN db_directory_id = ? THEN ? " + } + expr += "ELSE CASE WHEN size = 0 THEN db_directory_id ELSE ? END END" + return expr +} + func UpdatePeerInfo(ctx context.Context, tx sql.Tx, addr string, fn func(*syncer.PeerInfo)) error { info, err := PeerInfo(ctx, tx, addr) if err != nil { diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index b097e28cb..bcaf0188a 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -73,13 +73,17 @@ func (b *MainDatabase) LoadSlabBuffers(ctx context.Context) ([]ssql.LoadedSlabBu func (b *MainDatabase) MakeDirsForPath(ctx context.Context, tx sql.Tx, path string) (int64, error) { mtx := b.wrapTxn(tx) - return mtx.MakeDirsForPath(ctx, path) + return mtx.InsertDirectories(ctx, object.Directories(path, true)) } func (b *MainDatabase) Migrate(ctx context.Context) error { return sql.PerformMigrations(ctx, b, migrationsFs, "main", sql.MainMigrations(ctx, b, migrationsFs, b.log)) } +func (b *MainDatabase) ObjectsWithCorruptedDirectoryID(ctx context.Context, tx sql.Tx) ([]sql.Object, error) { + return ssql.ObjectsWithCorruptedDirectoryID(ctx, tx) +} + func (b *MainDatabase) Transaction(ctx context.Context, fn func(tx ssql.DatabaseTx) error) error { return b.db.Transaction(ctx, func(tx sql.Tx) error { return fn(b.wrapTxn(tx)) @@ -231,7 +235,7 @@ func (tx *MainDatabaseTx) CompleteMultipartUpload(ctx context.Context, bucket, k } // create the directory. - dirID, err := tx.MakeDirsForPath(ctx, key) + dirID, err := tx.InsertDirectories(ctx, object.Directories(key, true)) if err != nil { return "", fmt.Errorf("failed to create directory for key %s: %w", key, err) } @@ -474,33 +478,21 @@ func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids return res.RowsAffected() } -func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int64, error) { - // Create root dir. +func (tx *MainDatabaseTx) InsertDirectories(ctx context.Context, dirs []string) (int64, error) { + // create root dir dirID := int64(sql.DirectoriesRootID) if _, err := tx.Exec(ctx, "INSERT IGNORE INTO directories (id, name, db_parent_id) VALUES (?, '/', NULL)", dirID); err != nil { return 0, fmt.Errorf("failed to create root directory: %w", err) } - path = strings.TrimSuffix(path, "/") - if path == "/" { - return dirID, nil - } - - // Create remaining directories. + // create remaining directories insertDirStmt, err := tx.Prepare(ctx, "INSERT INTO directories (name, db_parent_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE id = last_insert_id(id)") if err != nil { return 0, fmt.Errorf("failed to prepare statement to insert dir: %w", err) } defer insertDirStmt.Close() - for i := 0; i < utf8.RuneCountInString(path); i++ { - if path[i] != '/' { - continue - } - dir := path[:i+1] - if dir == "/" { - continue - } + for _, dir := range dirs { if res, err := insertDirStmt.Exec(ctx, dir, dirID); err != nil { return 0, fmt.Errorf("failed to create directory %v: %w", dir, err) } else if dirID, err = res.LastInsertId(); err != nil { @@ -510,6 +502,71 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int return dirID, nil } +func (tx *MainDatabaseTx) InsertDirectoriesForRename(ctx context.Context, prefixOld, prefixNew string) (int64, []int64, error) { + // prepare statement + insertDirStmt, err := tx.Prepare(ctx, "INSERT INTO directories (name, db_parent_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE id = last_insert_id(id)") + if err != nil { + return 0, nil, fmt.Errorf("failed to prepare statement to insert dir: %w", err) + } + defer insertDirStmt.Close() + + // prepare a helper that inserts all directories for given path + insertDirectories := func(path string) (int64, error) { + dirID := int64(sql.DirectoriesRootID) + for _, dir := range object.Directories(path, false) { + if res, err := insertDirStmt.Exec(ctx, dir, dirID); err != nil { + return 0, fmt.Errorf("failed to create directory %v: %w", dir, err) + } else if childID, err := res.LastInsertId(); err != nil { + return 0, fmt.Errorf("failed to fetch last inserted dir id %v: %w", dir, err) + } else { + dirID = childID + } + } + return dirID, nil + } + + // create root dir + dirID := int64(sql.DirectoriesRootID) + if _, err := tx.Exec(ctx, "INSERT IGNORE INTO directories (id, name, db_parent_id) VALUES (?, '/', NULL)", dirID); err != nil { + return 0, nil, fmt.Errorf("failed to create root directory: %w", err) + } + + // fetch directories with given prefix + rows, err := tx.Query(ctx, "SELECT id, name FROM directories WHERE name LIKE ?", prefixOld+"%") + if err != nil { + return 0, nil, fmt.Errorf("failed to fetch existing directories: %w", err) + } + defer rows.Close() + + dirs := make(map[string]int64) + for rows.Next() { + var id int64 + var name string + if err := rows.Scan(&id, &name); err != nil { + return 0, nil, fmt.Errorf("failed to scan directory: %w", err) + } + dirs[name] = id + } + + // create a mapping of existing directories to renamed directories + var mapping []int64 + for name, id := range dirs { + renamedDirID, err := insertDirectories(strings.Replace(name, prefixOld, prefixNew, 1)) + if err != nil { + return 0, nil, err + } + mapping = append(mapping, id, renamedDirID) + } + + // create directories for the new prefix + childID, err := insertDirectories(prefixNew) + if err != nil { + return 0, nil, err + } + + return childID, mapping, nil +} + func (tx *MainDatabaseTx) MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) { return ssql.MarkPackedSlabUploaded(ctx, tx, slab) } @@ -750,7 +807,7 @@ func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyN return nil } -func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error { +func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, renamedIDs []int64, force bool) error { if force { _, err := tx.Exec(ctx, ` DELETE @@ -761,27 +818,42 @@ func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, SELECT CONCAT(?, SUBSTR(object_id, ?)) FROM objects WHERE object_id LIKE ? + AND SUBSTR(object_id, 1, ?) = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) ) as i - )`, + ) OR (object_id = ? AND size = 0)`, prefixNew, utf8.RuneCountInString(prefixOld)+1, prefixOld+"%", - bucket) + utf8.RuneCountInString(prefixOld), prefixOld, + bucket, + prefixNew) if err != nil { return err } } - resp, err := tx.Exec(ctx, ` + + // build query + query := fmt.Sprintf(` UPDATE objects SET object_id = CONCAT(?, SUBSTR(object_id, ?)), - db_directory_id = ? + %s WHERE object_id LIKE ? - AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, - prefixNew, utf8.RuneCountInString(prefixOld)+1, + AND SUBSTR(object_id, 1, ?) = ? + AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, ssql.UpdateObjectDirectorIdExpr(renamedIDs)) + + // build arguments + args := []any{prefixNew, utf8.RuneCountInString(prefixOld) + 1} + for _, id := range renamedIDs { + args = append(args, id) + } + args = append(args, dirID, prefixOld+"%", - bucket) + utf8.RuneCountInString(prefixOld), prefixOld, + bucket, + ) + resp, err := tx.Exec(ctx, query, args...) if err != nil && strings.Contains(err.Error(), "Duplicate entry") { return api.ErrObjectExists } else if err != nil { diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index dd2d91eb7..5aa828f81 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -72,13 +72,17 @@ func (b *MainDatabase) LoadSlabBuffers(ctx context.Context) ([]ssql.LoadedSlabBu func (b *MainDatabase) MakeDirsForPath(ctx context.Context, tx sql.Tx, path string) (int64, error) { mtx := b.wrapTxn(tx) - return mtx.MakeDirsForPath(ctx, path) + return mtx.InsertDirectories(ctx, object.Directories(path, true)) } func (b *MainDatabase) Migrate(ctx context.Context) error { return sql.PerformMigrations(ctx, b, migrationsFs, "main", sql.MainMigrations(ctx, b, migrationsFs, b.log)) } +func (b *MainDatabase) ObjectsWithCorruptedDirectoryID(ctx context.Context, tx sql.Tx) ([]sql.Object, error) { + return ssql.ObjectsWithCorruptedDirectoryID(ctx, tx) +} + func (b *MainDatabase) Transaction(ctx context.Context, fn func(tx ssql.DatabaseTx) error) error { return b.db.Transaction(ctx, func(tx sql.Tx) error { return fn(b.wrapTxn(tx)) @@ -230,7 +234,7 @@ func (tx *MainDatabaseTx) CompleteMultipartUpload(ctx context.Context, bucket, k } // create the directory. - dirID, err := tx.MakeDirsForPath(ctx, key) + dirID, err := tx.InsertDirectories(ctx, object.Directories(key, true)) if err != nil { return "", fmt.Errorf("failed to create directory for key %s: %w", key, err) } @@ -460,7 +464,8 @@ func (tx *MainDatabaseTx) InvalidateSlabHealthByFCID(ctx context.Context, fcids return res.RowsAffected() } -func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int64, error) { +func (tx *MainDatabaseTx) InsertDirectories(ctx context.Context, dirs []string) (int64, error) { + // prepare statements insertDirStmt, err := tx.Prepare(ctx, "INSERT INTO directories (name, db_parent_id) VALUES (?, ?) ON CONFLICT(name) DO NOTHING") if err != nil { return 0, fmt.Errorf("failed to prepare statement: %w", err) @@ -473,25 +478,14 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int } defer queryDirStmt.Close() - // Create root dir. + // create root directory dirID := int64(sql.DirectoriesRootID) if _, err := tx.Exec(ctx, "INSERT INTO directories (id, name, db_parent_id) VALUES (?, '/', NULL) ON CONFLICT(id) DO NOTHING", dirID); err != nil { return 0, fmt.Errorf("failed to create root directory: %w", err) } - // Create remaining directories. - path = strings.TrimSuffix(path, "/") - if path == "/" { - return dirID, nil - } - for i := 0; i < utf8.RuneCountInString(path); i++ { - if path[i] != '/' { - continue - } - dir := path[:i+1] - if dir == "/" { - continue - } + // create remaining directories + for _, dir := range dirs { if _, err := insertDirStmt.Exec(ctx, dir, dirID); err != nil { return 0, fmt.Errorf("failed to create directory %v: %w", dir, err) } @@ -506,6 +500,80 @@ func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int return dirID, nil } +func (tx *MainDatabaseTx) InsertDirectoriesForRename(ctx context.Context, prefixOld, prefixNew string) (int64, []int64, error) { + // prepare statements + insertDirStmt, err := tx.Prepare(ctx, "INSERT INTO directories (name, db_parent_id) VALUES (?, ?) ON CONFLICT(name) DO NOTHING") + if err != nil { + return 0, nil, fmt.Errorf("failed to prepare statement: %w", err) + } + defer insertDirStmt.Close() + + queryDirStmt, err := tx.Prepare(ctx, "SELECT id FROM directories WHERE name = ?") + if err != nil { + return 0, nil, fmt.Errorf("failed to prepare statement: %w", err) + } + defer queryDirStmt.Close() + + // prepare a helper that inserts all directories for given path + insertDirectories := func(path string) (int64, error) { + dirID := int64(sql.DirectoriesRootID) + for _, dir := range object.Directories(path, false) { + if _, err := insertDirStmt.Exec(ctx, dir, dirID); err != nil { + return 0, fmt.Errorf("failed to create directory %v: %w", dir, err) + } + var childID int64 + if err := queryDirStmt.QueryRow(ctx, dir).Scan(&childID); err != nil { + return 0, fmt.Errorf("failed to fetch directory id %v: %w", dir, err) + } else if childID == 0 { + return 0, fmt.Errorf("dir we just created doesn't exist - shouldn't happen") + } + dirID = childID + } + return dirID, nil + } + + // create root directory + dirID := int64(sql.DirectoriesRootID) + if _, err := tx.Exec(ctx, "INSERT INTO directories (id, name, db_parent_id) VALUES (?, '/', NULL) ON CONFLICT(id) DO NOTHING", dirID); err != nil { + return 0, nil, fmt.Errorf("failed to create root directory: %w", err) + } + + // fetch directories with given prefix + rows, err := tx.Query(ctx, "SELECT id, name FROM directories WHERE name LIKE ?", prefixOld+"%") + if err != nil { + return 0, nil, fmt.Errorf("failed to fetch existing directories: %w", err) + } + defer rows.Close() + + dirs := make(map[string]int64) + for rows.Next() { + var id int64 + var name string + if err := rows.Scan(&id, &name); err != nil { + return 0, nil, fmt.Errorf("failed to scan directory: %w", err) + } + dirs[name] = id + } + + // create a mapping of existing directories to renamed directories + var mapping []int64 + for name, id := range dirs { + renamedDirID, err := insertDirectories(strings.Replace(name, prefixOld, prefixNew, 1)) + if err != nil { + return 0, nil, err + } + mapping = append(mapping, id, renamedDirID) + } + + // create directories for the new prefix + childID, err := insertDirectories(prefixNew) + if err != nil { + return 0, nil, err + } + + return childID, mapping, nil +} + func (tx *MainDatabaseTx) MarkPackedSlabUploaded(ctx context.Context, slab api.UploadedPackedSlab) (string, error) { return ssql.MarkPackedSlabUploaded(ctx, tx, slab) } @@ -760,39 +828,51 @@ func (tx *MainDatabaseTx) RenameObject(ctx context.Context, bucket, keyOld, keyN return nil } -func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, force bool) error { +func (tx *MainDatabaseTx) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, dirID int64, renamedIDs []int64, force bool) error { if force { _, err := tx.Exec(ctx, ` DELETE FROM objects WHERE object_id IN ( - SELECT CONCAT(?, SUBSTR(object_id, ?)) + SELECT ? || SUBSTR(object_id, ?) FROM objects WHERE object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ? AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?) - )`, + ) OR (object_id = ? AND size = 0)`, prefixNew, utf8.RuneCountInString(prefixOld)+1, prefixOld+"%", utf8.RuneCountInString(prefixOld), prefixOld, - bucket) + bucket, + prefixNew) if err != nil { return err } } - resp, err := tx.Exec(ctx, ` - UPDATE objects - SET object_id = ? || SUBSTR(object_id, ?), - db_directory_id = ? - WHERE object_id LIKE ? - AND SUBSTR(object_id, 1, ?) = ? - AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, - prefixNew, utf8.RuneCountInString(prefixOld)+1, + + // build query + query := fmt.Sprintf(` +UPDATE objects +SET object_id = ? || SUBSTR(object_id, ?), +%s +WHERE object_id LIKE ? +AND SUBSTR(object_id, 1, ?) = ? +AND db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)`, ssql.UpdateObjectDirectorIdExpr(renamedIDs)) + + // build arguments + args := []any{prefixNew, utf8.RuneCountInString(prefixOld) + 1} + for _, id := range renamedIDs { + args = append(args, id) + } + args = append(args, dirID, prefixOld+"%", utf8.RuneCountInString(prefixOld), prefixOld, - bucket) + bucket, + ) + + resp, err := tx.Exec(ctx, query, args...) if err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") { return api.ErrObjectExists } else if err != nil {