diff --git a/store/v2/commitment/store.go b/store/v2/commitment/store.go index 4b2756fec206..7a15ff48a2f0 100644 --- a/store/v2/commitment/store.go +++ b/store/v2/commitment/store.go @@ -380,7 +380,7 @@ func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio var ( importer Importer snapshotItem snapshotstypes.SnapshotItem - storeKey string + storeKey []byte ) loop: @@ -402,10 +402,10 @@ loop: importer.Close() } - storeKey = item.Store.Name + storeKey = []byte(item.Store.Name) tree := c.multiTrees[item.Store.Name] if tree == nil { - return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) + return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", item.Store.Name) } importer, err = tree.Import(version) if err != nil { @@ -432,15 +432,13 @@ loop: node.Value = []byte{} } - key := []byte(storeKey) // If the node is a leaf node, it will be written to the storage. chStorage <- &corestore.StateChanges{ - Actor: key, + Actor: storeKey, StateChanges: []corestore.KVPair{ { - Key: node.Key, - Value: node.Value, - Remove: false, + Key: node.Key, + Value: node.Value, }, }, } diff --git a/store/v2/migration/manager.go b/store/v2/migration/manager.go index 43dc7dffc033..bae585b1e503 100644 --- a/store/v2/migration/manager.go +++ b/store/v2/migration/manager.go @@ -2,7 +2,9 @@ package migration import ( "encoding/binary" + "errors" "fmt" + "io" "sync" "time" @@ -14,6 +16,7 @@ import ( "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/internal/encoding" "cosmossdk.io/store/v2/snapshots" + snapshotstypes "cosmossdk.io/store/v2/snapshots/types" "cosmossdk.io/store/v2/storage" ) @@ -49,6 +52,8 @@ type Manager struct { } // NewManager returns a new Manager. +// +// NOTE: `sc` can be `nil` if don't want to migrate the commitment. func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager { return &Manager{ logger: logger, @@ -106,8 +111,51 @@ func (m *Manager) Migrate(height uint64) error { }) eg.Go(func() error { defer close(chStorage) - _, err := m.stateCommitment.Restore(height, 0, ms, chStorage) - return err + if m.stateCommitment != nil { + if _, err := m.stateCommitment.Restore(height, 0, ms, chStorage); err != nil { + return err + } + } else { // there is no commitment migration, just consume the stream to restore the state storage + var storeKey []byte + loop: + for { + snapshotItem := snapshotstypes.SnapshotItem{} + err := ms.ReadMsg(&snapshotItem) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return fmt.Errorf("failed to read snapshot item: %w", err) + } + switch item := snapshotItem.Item.(type) { + case *snapshotstypes.SnapshotItem_Store: + storeKey = []byte(item.Store.Name) + case *snapshotstypes.SnapshotItem_IAVL: + if item.IAVL.Height == 0 { // only restore the leaf nodes + key := item.IAVL.Key + if key == nil { + key = []byte{} + } + value := item.IAVL.Value + if value == nil { + value = []byte{} + } + chStorage <- &corestore.StateChanges{ + Actor: storeKey, + StateChanges: []corestore.KVPair{ + { + Key: key, + Value: value, + }, + }, + } + } + default: + break loop + } + } + } + return nil }) if err := eg.Wait(); err != nil { @@ -186,12 +234,13 @@ func (m *Manager) Sync() error { if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil { return fmt.Errorf("failed to unmarshal changeset: %w", err) } - - if err := m.stateCommitment.WriteBatch(cs); err != nil { - return fmt.Errorf("failed to write changeset to commitment: %w", err) - } - if _, err := m.stateCommitment.Commit(version); err != nil { - return fmt.Errorf("failed to commit changeset to commitment: %w", err) + if m.stateCommitment != nil { + if err := m.stateCommitment.WriteBatch(cs); err != nil { + return fmt.Errorf("failed to write changeset to commitment: %w", err) + } + if _, err := m.stateCommitment.Commit(version); err != nil { + return fmt.Errorf("failed to commit changeset to commitment: %w", err) + } } if err := m.stateStorage.ApplyChangeset(version, cs); err != nil { return fmt.Errorf("failed to write changeset to storage: %w", err) @@ -212,7 +261,9 @@ func (m *Manager) Close() error { if err := m.db.Close(); err != nil { return fmt.Errorf("failed to close db: %w", err) } - m.snapshotsManager.EndMigration(m.stateCommitment) + if m.stateCommitment != nil { + m.snapshotsManager.EndMigration(m.stateCommitment) + } return nil } diff --git a/store/v2/migration/manager_test.go b/store/v2/migration/manager_test.go index 0985dd561550..164ebca0b388 100644 --- a/store/v2/migration/manager_test.go +++ b/store/v2/migration/manager_test.go @@ -18,7 +18,7 @@ import ( var storeKeys = []string{"store1", "store2"} -func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { +func setupMigrationManager(t *testing.T, noCommitStore bool) (*Manager, *commitment.CommitStore) { t.Helper() db := dbm.NewMemDB() @@ -49,57 +49,66 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2 require.NoError(t, err) + if noCommitStore { + newCommitStore = nil + } return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore } func TestMigrateState(t *testing.T) { - m, orgCommitStore := setupMigrationManager(t) - - // apply changeset - toVersion := uint64(100) - keyCount := 10 - for version := uint64(1); version <= toVersion; version++ { - cs := corestore.NewChangeset() - for _, storeKey := range storeKeys { - for i := 0; i < keyCount; i++ { - cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + for _, noCommitStore := range []bool{false, true} { + t.Run(fmt.Sprintf("Migrate noCommitStore=%v", noCommitStore), func(t *testing.T) { + m, orgCommitStore := setupMigrationManager(t, noCommitStore) + + // apply changeset + toVersion := uint64(100) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + require.NoError(t, orgCommitStore.WriteBatch(cs)) + _, err := orgCommitStore.Commit(version) + require.NoError(t, err) } - } - require.NoError(t, orgCommitStore.WriteBatch(cs)) - _, err := orgCommitStore.Commit(version) - require.NoError(t, err) - } - - err := m.Migrate(toVersion - 1) - require.NoError(t, err) - // check the migrated state - for version := uint64(1); version < toVersion; version++ { - for _, storeKey := range storeKeys { - for i := 0; i < keyCount; i++ { - val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + err := m.Migrate(toVersion - 1) + require.NoError(t, err) + + if m.stateCommitment != nil { + // check the migrated state + for version := uint64(1); version < toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } + } + // check the latest state + val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1")) require.NoError(t, err) - require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + require.Nil(t, val) + val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0")) + require.NoError(t, err) + require.Nil(t, val) } - } - } - // check the latest state - val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1")) - require.NoError(t, err) - require.Nil(t, val) - val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0")) - require.NoError(t, err) - require.Nil(t, val) - // check the storage - for version := uint64(1); version < toVersion; version++ { - for _, storeKey := range storeKeys { - for i := 0; i < keyCount; i++ { - val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) - require.NoError(t, err) - require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + // check the storage + for version := uint64(1); version < toVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + require.NoError(t, err) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) + } + } } - } + }) } } diff --git a/store/v2/root/store.go b/store/v2/root/store.go index 91d007bd7dbb..8bbcc712498d 100644 --- a/store/v2/root/store.go +++ b/store/v2/root/store.go @@ -395,7 +395,10 @@ func (s *Store) writeSC(cs *corestore.Changeset) error { if err := s.stateCommitment.Close(); err != nil { return fmt.Errorf("failed to close the old SC store: %w", err) } - s.stateCommitment = s.migrationManager.GetStateCommitment() + newStateCommitment := s.migrationManager.GetStateCommitment() + if newStateCommitment != nil { + s.stateCommitment = newStateCommitment + } if err := s.migrationManager.Close(); err != nil { return fmt.Errorf("failed to close migration manager: %w", err) }