Skip to content

Commit

Permalink
feat: allow no migration of commitment (cosmos#20181)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored May 3, 2024
1 parent 43764cf commit 14f3ca0
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 60 deletions.
14 changes: 6 additions & 8 deletions store/v2/commitment/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio
var (
importer Importer
snapshotItem snapshotstypes.SnapshotItem
storeKey string
storeKey []byte
)

loop:
Expand All @@ -402,10 +402,10 @@ loop:
importer.Close()
}

storeKey = item.Store.Name
storeKey = []byte(item.Store.Name)
tree := c.multiTrees[item.Store.Name]
if tree == nil {
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey)
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", item.Store.Name)
}
importer, err = tree.Import(version)
if err != nil {
Expand All @@ -432,15 +432,13 @@ loop:
node.Value = []byte{}
}

key := []byte(storeKey)
// If the node is a leaf node, it will be written to the storage.
chStorage <- &corestore.StateChanges{
Actor: key,
Actor: storeKey,
StateChanges: []corestore.KVPair{
{
Key: node.Key,
Value: node.Value,
Remove: false,
Key: node.Key,
Value: node.Value,
},
},
}
Expand Down
69 changes: 60 additions & 9 deletions store/v2/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package migration

import (
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -14,6 +16,7 @@ import (
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/internal/encoding"
"cosmossdk.io/store/v2/snapshots"
snapshotstypes "cosmossdk.io/store/v2/snapshots/types"
"cosmossdk.io/store/v2/storage"
)

Expand Down Expand Up @@ -49,6 +52,8 @@ type Manager struct {
}

// NewManager returns a new Manager.
//
// NOTE: `sc` can be `nil` if don't want to migrate the commitment.
func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager {
return &Manager{
logger: logger,
Expand Down Expand Up @@ -106,8 +111,51 @@ func (m *Manager) Migrate(height uint64) error {
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.stateCommitment.Restore(height, 0, ms, chStorage)
return err
if m.stateCommitment != nil {
if _, err := m.stateCommitment.Restore(height, 0, ms, chStorage); err != nil {
return err
}
} else { // there is no commitment migration, just consume the stream to restore the state storage
var storeKey []byte
loop:
for {
snapshotItem := snapshotstypes.SnapshotItem{}
err := ms.ReadMsg(&snapshotItem)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("failed to read snapshot item: %w", err)
}
switch item := snapshotItem.Item.(type) {
case *snapshotstypes.SnapshotItem_Store:
storeKey = []byte(item.Store.Name)
case *snapshotstypes.SnapshotItem_IAVL:
if item.IAVL.Height == 0 { // only restore the leaf nodes
key := item.IAVL.Key
if key == nil {
key = []byte{}
}
value := item.IAVL.Value
if value == nil {
value = []byte{}
}
chStorage <- &corestore.StateChanges{
Actor: storeKey,
StateChanges: []corestore.KVPair{
{
Key: key,
Value: value,
},
},
}
}
default:
break loop
}
}
}
return nil
})

if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -186,12 +234,13 @@ func (m *Manager) Sync() error {
if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}

if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
if m.stateCommitment != nil {
if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
}
}
if err := m.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to write changeset to storage: %w", err)
Expand All @@ -212,7 +261,9 @@ func (m *Manager) Close() error {
if err := m.db.Close(); err != nil {
return fmt.Errorf("failed to close db: %w", err)
}
m.snapshotsManager.EndMigration(m.stateCommitment)
if m.stateCommitment != nil {
m.snapshotsManager.EndMigration(m.stateCommitment)
}

return nil
}
93 changes: 51 additions & 42 deletions store/v2/migration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var storeKeys = []string{"store1", "store2"}

func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {
func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitment.CommitStore) {
t.Helper()

db := dbm.NewMemDB()
Expand Down Expand Up @@ -49,57 +49,66 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) {

newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2
require.NoError(t, err)
if noCommitStore {
newCommitStore = nil
}

return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore
}

func TestMigrateState(t *testing.T) {
m, orgCommitStore := setupMigrationManager(t)

// apply changeset
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
for _, noCommitStore := range []bool{false, true} {
t.Run(fmt.Sprintf("Migrate noCommitStore=%v", noCommitStore), func(t *testing.T) {
m, orgCommitStore := setupMigrationManager(t, noCommitStore)

// apply changeset
toVersion := uint64(100)
keyCount := 10
for version := uint64(1); version <= toVersion; version++ {
cs := corestore.NewChangeset()
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}
}
require.NoError(t, orgCommitStore.WriteBatch(cs))
_, err := orgCommitStore.Commit(version)
require.NoError(t, err)
}

err := m.Migrate(toVersion - 1)
require.NoError(t, err)

// check the migrated state
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
err := m.Migrate(toVersion - 1)
require.NoError(t, err)

if m.stateCommitment != nil {
// check the migrated state
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
// check the latest state
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
require.Nil(t, val)
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)
}
}
}
// check the latest state
val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1"))
require.NoError(t, err)
require.Nil(t, val)
val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0"))
require.NoError(t, err)
require.Nil(t, val)

// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
// check the storage
for version := uint64(1); version < toVersion; version++ {
for _, storeKey := range storeKeys {
for i := 0; i < keyCount; i++ {
val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i)))
require.NoError(t, err)
require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val)
}
}
}
}
})
}
}
5 changes: 4 additions & 1 deletion store/v2/root/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ func (s *Store) writeSC(cs *corestore.Changeset) error {
if err := s.stateCommitment.Close(); err != nil {
return fmt.Errorf("failed to close the old SC store: %w", err)
}
s.stateCommitment = s.migrationManager.GetStateCommitment()
newStateCommitment := s.migrationManager.GetStateCommitment()
if newStateCommitment != nil {
s.stateCommitment = newStateCommitment
}
if err := s.migrationManager.Close(); err != nil {
return fmt.Errorf("failed to close migration manager: %w", err)
}
Expand Down

0 comments on commit 14f3ca0

Please sign in to comment.