Skip to content

Commit

Permalink
feat(statesync): implement statesync spec for the new approach (#663)
Browse files Browse the repository at this point in the history
* feat: add ordered_map.go

* feat: introduce a new approach of a state sync

* refactor: modify kvstore to be compatible with a new statesync approach
  • Loading branch information
shotonoff authored Jul 19, 2023
1 parent 6cf43f7 commit 51b15af
Show file tree
Hide file tree
Showing 25 changed files with 2,355 additions and 2,168 deletions.
15 changes: 10 additions & 5 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (app *Application) LoadSnapshotChunk(_ context.Context, req *abci.RequestLo
app.mu.Lock()
defer app.mu.Unlock()

chunk, err := app.snapshots.LoadChunk(req.Height, req.Format, req.Chunk)
chunk, err := app.snapshots.LoadChunk(req.Height, req.Version, req.ChunkId)
if err != nil {
return &abci.ResponseLoadSnapshotChunk{}, err
}
Expand Down Expand Up @@ -523,7 +523,11 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
if app.offerSnapshot == nil {
return &abci.ResponseApplySnapshotChunk{}, fmt.Errorf("no restore in progress")
}
app.offerSnapshot.addChunk(int(req.Index), req.Chunk)

resp := &abci.ResponseApplySnapshotChunk{
Result: abci.ResponseApplySnapshotChunk_ACCEPT,
NextChunks: app.offerSnapshot.addChunk(req.ChunkId, req.Chunk),
}

if app.offerSnapshot.isFull() {
chunks := app.offerSnapshot.bytes()
Expand All @@ -538,11 +542,10 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
"snapshot_height", app.offerSnapshot.snapshot.Height,
"snapshot_apphash", app.offerSnapshot.appHash,
)
resp.Result = abci.ResponseApplySnapshotChunk_COMPLETE_SNAPSHOT
app.offerSnapshot = nil
}

resp := &abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}

app.logger.Debug("ApplySnapshotChunk", "resp", resp)
return resp, nil
}
Expand All @@ -556,7 +559,9 @@ func (app *Application) createSnapshot() error {
if err != nil {
return fmt.Errorf("create snapshot: %w", err)
}
app.logger.Info("created state sync snapshot", "height", height, "apphash", app.LastCommittedState.GetAppHash())
app.logger.Info("created state sync snapshot",
"height", height,
"apphash", app.LastCommittedState.GetAppHash())
err = app.snapshots.Prune(maxSnapshotCount)
if err != nil {
return fmt.Errorf("prune snapshots: %w", err)
Expand Down
30 changes: 13 additions & 17 deletions abci/example/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,24 +493,20 @@ func TestSnapshots(t *testing.T) {
})
require.NoError(t, err)
assert.Equal(t, types.ResponseOfferSnapshot_ACCEPT, respOffer.Result)
loaded, err := app.LoadSnapshotChunk(ctx, &types.RequestLoadSnapshotChunk{
Height: recentSnapshot.Height,
ChunkId: recentSnapshot.Hash,
Version: recentSnapshot.Version,
})
require.NoError(t, err)

for chunk := uint32(0); chunk < recentSnapshot.Chunks; chunk++ {
loaded, err := app.LoadSnapshotChunk(ctx, &types.RequestLoadSnapshotChunk{
Height: recentSnapshot.Height,
Chunk: chunk,
Format: recentSnapshot.Format,
})
require.NoError(t, err)

applied, err := dstApp.ApplySnapshotChunk(ctx, &types.RequestApplySnapshotChunk{
Index: chunk,
Chunk: loaded.Chunk,
Sender: "app",
})
require.NoError(t, err)
assert.Equal(t, types.ResponseApplySnapshotChunk_ACCEPT, applied.Result)
}

applied, err := dstApp.ApplySnapshotChunk(ctx, &types.RequestApplySnapshotChunk{
ChunkId: recentSnapshot.Hash,
Chunk: loaded.Chunk,
Sender: "app",
})
require.NoError(t, err)
assert.Equal(t, types.ResponseApplySnapshotChunk_COMPLETE_SNAPSHOT, applied.Result)
infoResp, err := dstApp.Info(ctx, &types.RequestInfo{})
require.NoError(t, err)
assertRespInfo(t, int64(recentSnapshot.Height), appHashes[snapshotHeight], *infoResp)
Expand Down
113 changes: 73 additions & 40 deletions abci/example/kvstore/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package kvstore

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"

Expand All @@ -15,6 +15,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/ds"
)

const (
Expand All @@ -27,11 +28,17 @@ const (
// SnapshotStore stores state sync snapshots. Snapshots are stored simply as
// JSON files, and chunks are generated on-the-fly by splitting the JSON data
// into fixed-size chunks.
type SnapshotStore struct {
sync.RWMutex
dir string
metadata []abci.Snapshot
}
type (
SnapshotStore struct {
sync.RWMutex
dir string
metadata []abci.Snapshot
}
chunkItem struct {
Data []byte `json:"data"`
NextChunkIDs [][]byte `json:"nextChunkIDs"`
}
)

// NewSnapshotStore creates a new snapshot store.
func NewSnapshotStore(dir string) (*SnapshotStore, error) {
Expand All @@ -49,7 +56,7 @@ func NewSnapshotStore(dir string) (*SnapshotStore, error) {
// called internally on construction.
func (s *SnapshotStore) loadMetadata() error {
file := filepath.Join(s.dir, "metadata.json")
metadata := []abci.Snapshot{}
var metadata []abci.Snapshot

bz, err := os.ReadFile(file)
switch {
Expand Down Expand Up @@ -96,10 +103,9 @@ func (s *SnapshotStore) Create(state State) (abci.Snapshot, error) {
}
height := state.GetHeight()
snapshot := abci.Snapshot{
Height: uint64(height),
Format: 1,
Hash: crypto.Checksum(bz),
Chunks: byteChunks(bz),
Height: uint64(height),
Version: 1,
Hash: crypto.Checksum(bz),
}
err = os.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)), bz, 0644)
if err != nil {
Expand Down Expand Up @@ -152,16 +158,18 @@ func (s *SnapshotStore) List() ([]*abci.Snapshot, error) {
}

// LoadChunk loads a snapshot chunk.
func (s *SnapshotStore) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
func (s *SnapshotStore) LoadChunk(height uint64, version uint32, chunkID []byte) ([]byte, error) {
s.RLock()
defer s.RUnlock()
for _, snapshot := range s.metadata {
if snapshot.Height == height && snapshot.Format == format {
bz, err := os.ReadFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)))
if snapshot.Height == height && snapshot.Version == version {
bz, err := os.ReadFile(filepath.Join(s.dir, fmt.Sprintf("%d.json", height)))
if err != nil {
return nil, err
}
return byteChunk(bz, chunk), nil
chunks := makeChunks(bz, snapshotChunkSize)
item := makeChunkItem(chunks, chunkID)
return json.Marshal(item)
}
}
return nil, nil
Expand All @@ -170,54 +178,79 @@ func (s *SnapshotStore) LoadChunk(height uint64, format uint32, chunk uint32) ([
type offerSnapshot struct {
snapshot *abci.Snapshot
appHash tmbytes.HexBytes
chunks [][]byte
chunkCnt int
chunks *ds.OrderedMap[string, []byte]
}

func newOfferSnapshot(snapshot *abci.Snapshot, appHash tmbytes.HexBytes) *offerSnapshot {
return &offerSnapshot{
snapshot: snapshot,
appHash: appHash,
chunks: make([][]byte, snapshot.Chunks),
chunkCnt: 0,
chunks: ds.NewOrderedMap[string, []byte](),
}
}

func (s *offerSnapshot) addChunk(index int, chunk []byte) {
if s.chunks[index] != nil {
return
func (s *offerSnapshot) addChunk(chunkID tmbytes.HexBytes, data []byte) [][]byte {
chunkIDStr := chunkID.String()
if s.chunks.Has(chunkIDStr) {
return nil
}
s.chunks[index] = chunk
s.chunkCnt++
var item chunkItem
err := json.Unmarshal(data, &item)
if err != nil {
panic("failed to decode a chunk data: " + err.Error())
}
s.chunks.Put(chunkIDStr, item.Data)
return item.NextChunkIDs
}

func (s *offerSnapshot) isFull() bool {
return s.chunkCnt == int(s.snapshot.Chunks)
return bytes.Equal(crypto.Checksum(s.bytes()), s.snapshot.Hash)
}

func (s *offerSnapshot) bytes() []byte {
chunks := s.chunks.Values()
buf := bytes.NewBuffer(nil)
for _, chunk := range s.chunks {
for _, chunk := range chunks {
buf.Write(chunk)
}
return buf.Bytes()
}

// byteChunk returns the chunk at a given index from the full byte slice.
func byteChunk(bz []byte, index uint32) []byte {
start := int(index * snapshotChunkSize)
end := int((index + 1) * snapshotChunkSize)
switch {
case start >= len(bz):
return nil
case end >= len(bz):
return bz[start:]
default:
return bz[start:end]
// makeChunkItem returns the chunk at a given index from the full byte slice.
func makeChunkItem(chunks *ds.OrderedMap[string, []byte], chunkID []byte) chunkItem {
chunkIDStr := hex.EncodeToString(chunkID)
val, ok := chunks.Get(chunkIDStr)
if !ok {
panic("chunk not found")
}
chunkIDs := chunks.Keys()
ci := chunkItem{Data: val}
i := 0
for ; i < len(chunkIDs) && chunkIDs[i] != chunkIDStr; i++ {
}
if i+1 < len(chunkIDs) {
data, err := hex.DecodeString(chunkIDs[i+1])
if err != nil {
panic(err)
}
ci.NextChunkIDs = [][]byte{data}
}
return ci
}

// byteChunks calculates the number of chunks in the byte slice.
func byteChunks(bz []byte) uint32 {
return uint32(math.Ceil(float64(len(bz)) / snapshotChunkSize))
func makeChunks(bz []byte, chunkSize int) *ds.OrderedMap[string, []byte] {
chunks := ds.NewOrderedMap[string, []byte]()
totalHash := hex.EncodeToString(crypto.Checksum(bz))
key := totalHash
for i := 0; i < len(bz); i += chunkSize {
j := i + chunkSize
if j > len(bz) {
j = len(bz)
}
if i > 1 {
key = hex.EncodeToString(crypto.Checksum(bz[i:j]))
}
chunks.Put(key, append([]byte(nil), bz[i:j]...))
}
return chunks
}
37 changes: 37 additions & 0 deletions abci/example/kvstore/snapshots_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kvstore

import (
"encoding/hex"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestChunkItem(t *testing.T) {
const size = 64
chunks := makeChunks(makeBytes(1032), size)
keys := chunks.Keys()
values := chunks.Values()
for i, key := range keys {
chunkID, err := hex.DecodeString(key)
require.NoError(t, err)
item := makeChunkItem(chunks, chunkID)
require.Equal(t, values[i], item.Data)
if i+1 < len(keys) {
nextChunkID, err := hex.DecodeString(keys[i+1])
require.NoError(t, err)
require.Equal(t, [][]byte{nextChunkID}, item.NextChunkIDs)
} else {
require.Nil(t, item.NextChunkIDs)
}
}
}

func makeBytes(size int) []byte {
bz := make([]byte, size)
for i := 0; i < size; i++ {
bz[i] = byte(rand.Int63n(256))
}
return bz
}
Loading

0 comments on commit 51b15af

Please sign in to comment.