Skip to content

Commit

Permalink
Core/migrate library (#80)
Browse files Browse the repository at this point in the history
* move migrate create and migrate up to importable pkg/core

* document options

* wire up verbose to migrate up

* move remaining migrate function to core
  • Loading branch information
RoryQ authored Aug 2, 2024
1 parent cf9383c commit 002f47b
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 175 deletions.
22 changes: 0 additions & 22 deletions cmd/export_test.go

This file was deleted.

171 changes: 21 additions & 150 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,15 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"text/tabwriter"

"github.com/kennygrant/sanitize"
"github.com/spf13/cobra"

"github.com/roryq/wrench/pkg/core"
"github.com/roryq/wrench/pkg/spanner"
)

Expand Down Expand Up @@ -118,7 +116,7 @@ func migrateCreate(c *cobra.Command, args []string) error {
}
}

filename, err := createMigrationFile(dir, name, 6)
filename, err := core.CreateMigrationFile(dir, name, core.WithInterval(sequenceInterval), core.WithZeroPrefixLength(6))
if err != nil {
return &Error{
cmd: c,
Expand Down Expand Up @@ -173,69 +171,23 @@ func migrateUp(c *cobra.Command, args []string) error {
}
defer client.Close()

lock, err := client.GetMigrationLock(ctx, migrationLockTable, lockIdentifier)
defer lock.Release()
if err != nil {
return &Error{
cmd: c,
err: err,
}
}
if !lock.Success {
return &Error{
cmd: c,
err: fmt.Errorf("lock taken by another process %s which expires %v", lock.LockIdentifier, lock.Expiry),
}
}

dir := filepath.Join(c.Flag(flagNameDirectory).Value.String(), migrationsDirName)
migrations, err := spanner.LoadMigrations(dir, toSkip, detectPartitionedDML)
if err != nil {
return &Error{
cmd: c,
err: err,
}
}

if err = client.EnsureMigrationTable(ctx, migrationTableName); err != nil {
return &Error{
cmd: c,
err: err,
}
}

status, err := client.DetermineUpgradeStatus(ctx, migrationTableName)
migrationsDir := filepath.Join(c.Flag(flagNameDirectory).Value.String(), migrationsDirName)
err = core.MigrateUp(ctx, client, migrationsDir,
core.WithLimit(limit),
core.WithSkipVersions(toSkip),
core.WithLockIdentifier(lockIdentifier),
core.WithVersionTable(migrationTableName),
core.WithLockTable(migrationLockTable),
core.WithPartitionedDMLConcurrency(partitionedDMLConcurrency),
core.WithDetectPartitionedDML(detectPartitionedDML),
core.WithPrintRowsAffected(verbose),
)
if err != nil {
return &Error{
cmd: c,
err: err,
}
}

concurrency := int(partitionedDMLConcurrency)

var migrationsOutput spanner.MigrationsOutput
switch status {
case spanner.ExistingMigrationsUpgradeStarted:
migrationsOutput, err = client.UpgradeExecuteMigrations(ctx, migrations, limit, migrationTableName)
if err != nil {
return err
}
case spanner.ExistingMigrationsUpgradeCompleted:
migrationsOutput, err = client.ExecuteMigrations(ctx, migrations, limit, migrationTableName, concurrency)
if err != nil {
return err
}
default:
return &Error{
cmd: c,
err: errors.New("migration in undetermined state"),
}
}
if verbose {
fmt.Print(migrationsOutput.String())
}

return nil
}

Expand Down Expand Up @@ -282,36 +234,13 @@ func migrateHistory(c *cobra.Command, args []string) error {
}
defer client.Close()

lock, err := client.GetMigrationLock(ctx, migrationLockTable, lockIdentifier)
defer lock.Release()
err = core.MigrateHistory(ctx, client, core.WithLockTable(migrationLockTable), core.WithLockIdentifier(lockIdentifier))
if err != nil {
return &Error{
cmd: c,
err: err,
}
}
if !lock.Success {
return &Error{
cmd: c,
err: fmt.Errorf("lock taken by another process %s which expires %v", lock.LockIdentifier, lock.Expiry),
}
}

history, err := client.GetMigrationHistory(ctx, migrationTableName)
if err != nil {
return err
}
sort.SliceStable(history, func(i, j int) bool {
return history[i].Created.Before(history[j].Created) // order by Created
})

writer := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', tabwriter.AlignRight)
fmt.Fprintln(writer, "Version\tDirty\tCreated\tModified")
for i := range history {
h := history[i]
fmt.Fprintf(writer, "%d\t%v\t%v\t%v\n", h.Version, h.Dirty, h.Created, h.Modified)
}
writer.Flush()

return nil
}
Expand All @@ -324,35 +253,18 @@ func migrateRepair(c *cobra.Command, args []string) error {
return err
}
defer client.Close()
lock, err := client.GetMigrationLock(ctx, migrationLockTable, lockIdentifier)
defer lock.Release()
if err != nil {
return &Error{
cmd: c,
err: err,
}
}
if !lock.Success {
return &Error{
cmd: c,
err: fmt.Errorf("lock taken by another process %s which expires %v", lock.LockIdentifier, lock.Expiry),
}
}

if err = client.EnsureMigrationTable(ctx, migrationTableName); err != nil {
return &Error{
cmd: c,
err: err,
}
}

if err := client.RepairMigration(ctx, migrationTableName); err != nil {
err = core.MigrateRepair(ctx, client,
core.WithLockTable(migrationLockTable),
core.WithLockIdentifier(lockIdentifier),
core.WithVersionTable(migrationTableName),
)
if err != nil {
return &Error{
cmd: c,
err: err,
}
}

return nil
}

Expand All @@ -365,7 +277,7 @@ func migrateLocker(c *cobra.Command, args []string) error {
}
defer client.Close()

if err := client.SetupMigrationLock(ctx, migrationLockTable); err != nil {
if err := core.MigrateSetupLock(ctx, client, core.WithLockTable(migrationLockTable)); err != nil {
return &Error{
cmd: c,
err: err,
Expand All @@ -374,10 +286,6 @@ func migrateLocker(c *cobra.Command, args []string) error {
return nil
}

func roundNext(n, next uint) uint {
return uint(math.Round(float64(n)/float64(next)))*next + next
}

func promptDescription() string {
fmt.Print("Please enter a short description for the migration file. Or press Enter to skip.\n>")
scanner := bufio.NewScanner(os.Stdin)
Expand All @@ -388,40 +296,3 @@ func promptDescription() string {
}
return strings.ReplaceAll(clean, ".", "-") // Dot should separate .up.sql or .sql only
}

func createMigrationFile(dir string, name string, digits int) (string, error) {
if name != "" && !spanner.MigrationNameRegex.MatchString(name) {
return "", errors.New("Invalid migration file name.")
}

ms, err := spanner.LoadMigrations(dir, nil, false)
if err != nil {
return "", err
}

var v uint = 1
if len(ms) > 0 {
v = roundNext(ms[len(ms)-1].Version, uint(sequenceInterval))
}
vStr := fmt.Sprint(v)

padding := digits - len(vStr)
if padding > 0 {
vStr = strings.Repeat("0", padding) + vStr
}

var filename string
if name == "" {
filename = filepath.Join(dir, fmt.Sprintf("%s.sql", vStr))
} else {
filename = filepath.Join(dir, fmt.Sprintf("%s_%s.sql", vStr, name))
}

fp, err := os.Create(filename)
if err != nil {
return "", err
}
fp.Close()

return filename, nil
}
6 changes: 3 additions & 3 deletions cmd/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"path/filepath"
"testing"

"github.com/roryq/wrench/cmd"
"github.com/roryq/wrench/pkg/core"
)

func TestCreateMigrationFile(t *testing.T) {
Expand All @@ -49,7 +49,7 @@ func TestCreateMigrationFile(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.filename, func(t *testing.T) {
filename, err := cmd.CreateMigrationFile(testdatadir, tc.filename, tc.digits)
filename, err := core.CreateMigrationFile(testdatadir, tc.filename, core.WithInterval(1), core.WithZeroPrefixLength(tc.digits))
if err != nil {
t.Fatal(err)
}
Expand All @@ -64,7 +64,7 @@ func TestCreateMigrationFile(t *testing.T) {
}

t.Run("invalid name", func(t *testing.T) {
_, err := cmd.CreateMigrationFile(testdatadir, "あああ", 6)
_, err := core.CreateMigrationFile(testdatadir, "あああ")
if err.Error() != "Invalid migration file name." {
t.Errorf("err want `invalid name`, but got `%v`", err)
}
Expand Down
Loading

0 comments on commit 002f47b

Please sign in to comment.