Skip to content

Commit

Permalink
Merge pull request onflow#6032 from onflow/leo/add-check-to-checkpoin…
Browse files Browse the repository at this point in the history
…t-importing

Check checkpoint has only 1 trie before importing
  • Loading branch information
zhangchiqing authored Jun 5, 2024
2 parents b605dd6 + ed6d835 commit 0b1be6e
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 19 deletions.
14 changes: 13 additions & 1 deletion ledger/complete/wal/checkpoint_v6_leaf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,24 @@ func nodeToLeaf(leaf *node.Node) *LeafNode {
// OpenAndReadLeafNodesFromCheckpointV6 takes a channel for pushing the leaf nodes that are read from
// the given checkpoint file specified by dir and fileName.
// It returns when finish reading the checkpoint file and the input channel can be closed.
func OpenAndReadLeafNodesFromCheckpointV6(allLeafNodesCh chan<- *LeafNode, dir string, fileName string, logger zerolog.Logger) (errToReturn error) {
// It requires the checkpoint file only has one trie.
func OpenAndReadLeafNodesFromCheckpointV6(
allLeafNodesCh chan<- *LeafNode,
dir string,
fileName string,
expectedRootHash ledger.RootHash,
logger zerolog.Logger) (
errToReturn error) {
// we are the only sender of the channel, closing it after done
defer func() {
close(allLeafNodesCh)
}()

err := checkpointHasSingleRootHash(logger, dir, fileName, expectedRootHash)
if err != nil {
return fmt.Errorf("fail to check checkpoint has single root hash: %w", err)
}

filepath := filePathCheckpointHeader(dir, fileName)

f, err := os.Open(filepath)
Expand Down
18 changes: 18 additions & 0 deletions ledger/complete/wal/checkpoint_v6_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func ReadTriesRootHash(logger zerolog.Logger, dir string, fileName string) (
}

var CheckpointHasRootHash = checkpointHasRootHash
var CheckpointHasSingleRootHash = checkpointHasSingleRootHash

// readCheckpointV6 reads checkpoint file from a main file and 17 file parts.
// the main file stores:
Expand Down Expand Up @@ -724,6 +725,23 @@ func checkpointHasRootHash(logger zerolog.Logger, bootstrapDir, filename string,
return fmt.Errorf("could not find expected root hash %v in checkpoint file which contains: %v ", expectedRootHash, roots)
}

func checkpointHasSingleRootHash(logger zerolog.Logger, bootstrapDir, filename string, expectedRootHash ledger.RootHash) error {
roots, err := ReadTriesRootHash(logger, bootstrapDir, filename)
if err != nil {
return fmt.Errorf("could not read checkpoint root hash: %w", err)
}

if len(roots) != 1 {
return fmt.Errorf("expected 1 root hash in checkpoint file, but got %v", len(roots))
}

if roots[0] != expectedRootHash {
return fmt.Errorf("expected root hash %v, but got %v", expectedRootHash, roots[0])
}

return nil
}

func readFileHeader(reader io.Reader) (uint16, uint16, error) {
bytes := make([]byte, encMagicSize+encVersionSize)
_, err := io.ReadFull(reader, bytes)
Expand Down
56 changes: 46 additions & 10 deletions ledger/complete/wal/checkpoint_v6_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -81,7 +82,7 @@ func createSimpleTrie(t *testing.T) []*trie.MTrie {

updatedTrie, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true)
require.NoError(t, err)
tries := []*trie.MTrie{emptyTrie, updatedTrie}
tries := []*trie.MTrie{updatedTrie}
return tries
}

Expand Down Expand Up @@ -322,7 +323,7 @@ func TestWriteAndReadCheckpointV6LeafEmptyTrie(t *testing.T) {
bufSize := 10
leafNodesCh := make(chan *LeafNode, bufSize)
go func() {
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger)
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, tries[0].RootHash(), logger)
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
}()
for range leafNodesCh {
Expand All @@ -339,8 +340,9 @@ func TestWriteAndReadCheckpointV6LeafSimpleTrie(t *testing.T) {
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, logger), "fail to store checkpoint")
bufSize := 1
leafNodesCh := make(chan *LeafNode, bufSize)

go func() {
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger)
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, tries[0].RootHash(), logger)
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
}()
resultPayloads := make([]*ledger.Payload, 0)
Expand All @@ -350,27 +352,61 @@ func TestWriteAndReadCheckpointV6LeafSimpleTrie(t *testing.T) {
resultPayloads = append(resultPayloads, leafNode.Payload)
}
}
require.EqualValues(t, tries[1].AllPayloads(), resultPayloads)
require.EqualValues(t, tries[0].AllPayloads(), resultPayloads)
})
}

func TestWriteAndReadCheckpointV6LeafMultipleTries(t *testing.T) {
func TestWriteAndReadCheckpointV6LeafMultipleTriesFail(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
fileName := "checkpoint-multi-leaf-file"
tries := createMultipleRandomTriesMini(t)
logger := unittest.Logger()
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, logger), "fail to store checkpoint")
bufSize := 5
leafNodesCh := make(chan *LeafNode, bufSize)

// verify it should fail because the checkpoint has multiple trie
require.Error(t, OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, tries[0].RootHash(), logger))
})
}

func TestWriteAndReadCheckpointV6LeafMultipleTriesOK(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
fileName := "checkpoint-multi-leaf-file"
multi := createMultipleRandomTriesMini(t)

tries := multi[1:2]

logger := unittest.Logger()
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, logger), "fail to store checkpoint")
bufSize := 5
leafNodesCh := make(chan *LeafNode, bufSize)

go func() {
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger)
err := OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, tries[0].RootHash(), logger)
require.NoErrorf(t, err, "fail to read checkpoint %v/%v", dir, fileName)
}()
resultPayloads := make([]ledger.Payload, 0)

allPayloads := tries[0].AllPayloads()
payloadMap := make(map[string]ledger.Payload, len(allPayloads))
for _, payload := range allPayloads {
key := payload.EncodedKey()

payloadMap[hex.EncodeToString(key)] = *payload
}

for leafNode := range leafNodesCh {
resultPayloads = append(resultPayloads, *leafNode.Payload)
// avoid dummy payload from empty trie
if leafNode.Payload != nil {
key := hex.EncodeToString(leafNode.Payload.EncodedKey())
expected, ok := payloadMap[key]
require.True(t, ok, "payload not found")
require.Equal(t, expected, *leafNode.Payload, "payload not equal")
delete(payloadMap, key)
}
}
require.NotEmpty(t, resultPayloads)

require.Empty(t, payloadMap, fmt.Sprintf("not all payloads are read: %v", len(payloadMap)))
})
}

Expand Down Expand Up @@ -541,7 +577,7 @@ func TestAllPartFileExistLeafReader(t *testing.T) {

bufSize := 10
leafNodesCh := make(chan *LeafNode, bufSize)
err = OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, logger)
err = OpenAndReadLeafNodesFromCheckpointV6(leafNodesCh, dir, fileName, tries[0].RootHash(), logger)
require.ErrorIs(t, err, os.ErrNotExist, "wrong error type returned")
}
})
Expand Down
8 changes: 1 addition & 7 deletions storage/pebble/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context, workerCount
cct, cancel := context.WithCancel(ctx)
defer cancel()

// validate the checkpoint has correct root hash
err := wal.CheckpointHasRootHash(b.log, b.checkpointDir, b.checkpointFileName, b.rootHash)
if err != nil {
return fmt.Errorf("the root checkpoint to have the trie root hash %v does not match with the root state commitment: %w", b.rootHash, err)
}

g, gCtx := errgroup.WithContext(cct)

start := time.Now()
Expand All @@ -151,7 +145,7 @@ func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context, workerCount
})
}

err = wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log)
err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.rootHash, b.log)
if err != nil {
return fmt.Errorf("error reading leaf node: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion storage/pebble/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) {
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries, registerIDs := simpleTrieWithValidRegisterIDs(t)
// exclude the empty trie
rootHash := tries[0].RootHash()
fileName := "simple-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
Expand Down Expand Up @@ -221,7 +222,7 @@ func trieWithValidRegisterIDs(t *testing.T, n uint16) ([]*trie.MTrie, []*flow.Re
// make sure it has at least 1 leaf node
require.GreaterOrEqual(t, depth, uint16(1))
require.NoError(t, err)
resultTries := []*trie.MTrie{emptyTrie, populatedTrie}
resultTries := []*trie.MTrie{populatedTrie}
return resultTries, resultRegisterIDs
}

Expand Down
1 change: 1 addition & 0 deletions utils/grpcutils/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func DefaultClientTLSConfig(publicKey crypto.PublicKey) (*tls.Config, error) {
config := &tls.Config{
MinVersion: tls.VersionTLS13,
// This is not insecure here. We will verify the cert chain ourselves.
// nolint
InsecureSkipVerify: true,
ClientAuth: tls.RequireAnyClientCert,
}
Expand Down

0 comments on commit 0b1be6e

Please sign in to comment.