Skip to content

Commit

Permalink
Use ctx in BucketMigrator (#1521)
Browse files Browse the repository at this point in the history
  • Loading branch information
kirugan authored Jul 4, 2024
1 parent f8f0b48 commit aaad8a7
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
36 changes: 35 additions & 1 deletion migration/bucket_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package migration
import (
"bytes"
"context"
"errors"
"os"
"os/signal"
"syscall"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
Expand Down Expand Up @@ -72,14 +76,43 @@ func (m *BucketMigrator) Before(_ []byte) error {
return nil
}

func (m *BucketMigrator) Migrate(_ context.Context, txn db.Transaction, network *utils.Network) ([]byte, error) {
func (m *BucketMigrator) Migrate(ctx context.Context, txn db.Transaction, network *utils.Network, log utils.SimpleLogger) ([]byte, error) {
remainingInBatch := m.batchSize
iterator, err := txn.NewIterator()
if err != nil {
return nil, err
}

var (
firstInterrupt = ctx.Done()
secondInterrupt chan os.Signal // initially nil
)
for iterator.Seek(m.startFrom); iterator.Valid(); iterator.Next() {
select {
case <-firstInterrupt:
if errors.Is(ctx.Err(), context.Canceled) {
msg := "WARNING: Migration is in progress, but you tried to interrupt it.\n" +
"Database may be in an inconsistent state.\n" +
"To force cancellation and potentially corrupt data, send interrupt signal again.\n" +
"Otherwise, please allow the migration to complete."
log.Warnw(msg)

// after context canceled on upper level there is no way to check how many interrupts were made from ctx.Done()
// but we can Initialise additional channel to receive the signals, they will be copied by runtime and provided
// to all callers (i.e. here and on upper level)
secondInterrupt = make(chan os.Signal, 1)
signal.Notify(secondInterrupt, os.Interrupt, syscall.SIGTERM)
// if we don't set firstInterrupt to nil this case may be fired all the time because
// execution order of cases in select is not guaranteed and selecting from nil channel is blocked operation
firstInterrupt = nil
}
case <-secondInterrupt:
err := errors.New("migration interrupt")
return nil, utils.RunAndWrapOnError(iterator.Close, err)
default:
// keep going
}

key := iterator.Key()
if !bytes.HasPrefix(key, m.target.Key()) {
break
Expand All @@ -104,6 +137,7 @@ func (m *BucketMigrator) Migrate(_ context.Context, txn db.Transaction, network
}
}
}
signal.Stop(secondInterrupt)

return nil, iterator.Close()
}
4 changes: 2 additions & 2 deletions migration/bucket_migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func TestBucketMover(t *testing.T) {
err error
)
err = testDB.Update(func(txn db.Transaction) error {
intermediateState, err = mover.Migrate(context.Background(), txn, &utils.Mainnet)
intermediateState, err = mover.Migrate(context.Background(), txn, &utils.Mainnet, nil)
require.ErrorIs(t, err, migration.ErrCallWithNewTransaction)
return nil
})
require.NoError(t, err)
err = testDB.Update(func(txn db.Transaction) error {
intermediateState, err = mover.Migrate(context.Background(), txn, &utils.Mainnet)
intermediateState, err = mover.Migrate(context.Background(), txn, &utils.Mainnet, nil)
require.NoError(t, err)
return nil
})
Expand Down
8 changes: 4 additions & 4 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ type schemaMetadata struct {
type Migration interface {
Before(intermediateState []byte) error
// Migration should return intermediate state whenever it requests new txn or detects cancelled ctx.
Migrate(context.Context, db.Transaction, *utils.Network) ([]byte, error)
Migrate(context.Context, db.Transaction, *utils.Network, utils.SimpleLogger) ([]byte, error)
}

type MigrationFunc func(db.Transaction, *utils.Network) error

// Migrate returns f(txn).
func (f MigrationFunc) Migrate(_ context.Context, txn db.Transaction, network *utils.Network) ([]byte, error) {
func (f MigrationFunc) Migrate(_ context.Context, txn db.Transaction, network *utils.Network, _ utils.SimpleLogger) ([]byte, error) {
return nil, f(txn, network)
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func migrateIfNeeded(ctx context.Context, targetDB db.DB, network *utils.Network
for {
callWithNewTransaction := false
if dbErr := targetDB.Update(func(txn db.Transaction) error {
metadata.IntermediateState, err = migration.Migrate(ctx, txn, network)
metadata.IntermediateState, err = migration.Migrate(ctx, txn, network, log)
switch {
case err == nil || errors.Is(err, ctx.Err()):
if metadata.IntermediateState == nil {
Expand Down Expand Up @@ -366,7 +366,7 @@ func (n *node) _UnmarshalBinary(data []byte) error {
return err
}

func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction, _ *utils.Network) ([]byte, error) {
func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction, _ *utils.Network, _ utils.SimpleLogger) ([]byte, error) {
// If we made n a trie.Node, the encoder would fall back to the custom encoding methods.
// We instead define a cutom struct to force the encoder to use the default encoding.
var n node
Expand Down
8 changes: 4 additions & 4 deletions migration/migration_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestChangeTrieNodeEncoding(t *testing.T) {
m := new(changeTrieNodeEncoding)
require.NoError(t, m.Before(nil))
require.NoError(t, testdb.Update(func(txn db.Transaction) error {
_, err := m.Migrate(context.Background(), txn, &utils.Mainnet)
_, err := m.Migrate(context.Background(), txn, &utils.Mainnet, nil)
return err
}))

Expand Down Expand Up @@ -427,7 +427,7 @@ type testMigration struct {
before func([]byte) error
}

func (f testMigration) Migrate(ctx context.Context, txn db.Transaction, network *utils.Network) ([]byte, error) {
func (f testMigration) Migrate(ctx context.Context, txn db.Transaction, network *utils.Network, _ utils.SimpleLogger) ([]byte, error) {
return f.exec(ctx, txn, network)
}

Expand Down Expand Up @@ -507,7 +507,7 @@ func TestChangeStateDiffStructEmptyDB(t *testing.T) {
require.NoError(t, testdb.Update(func(txn db.Transaction) error {
migrator := NewBucketMigrator(db.StateUpdatesByBlockNumber, changeStateDiffStruct)
require.NoError(t, migrator.Before(nil))
intermediateState, err := migrator.Migrate(context.Background(), txn, &utils.Mainnet)
intermediateState, err := migrator.Migrate(context.Background(), txn, &utils.Mainnet, nil)
require.NoError(t, err)
require.Nil(t, intermediateState)

Expand Down Expand Up @@ -584,7 +584,7 @@ func TestChangeStateDiffStruct(t *testing.T) {
require.NoError(t, testdb.Update(func(txn db.Transaction) error {
migrator := NewBucketMigrator(db.StateUpdatesByBlockNumber, changeStateDiffStruct)
require.NoError(t, migrator.Before(nil))
intermediateState, err := migrator.Migrate(context.Background(), txn, &utils.Mainnet)
intermediateState, err := migrator.Migrate(context.Background(), txn, &utils.Mainnet, nil)
require.NoError(t, err)
require.Nil(t, intermediateState)
return nil
Expand Down

0 comments on commit aaad8a7

Please sign in to comment.