Skip to content

Commit

Permalink
Managed to sync ~200MB data
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowosie committed Sep 11, 2024
1 parent 407c6a7 commit 75678cc
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 27 deletions.
39 changes: 24 additions & 15 deletions p2p/snap_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (b *snapServer) GetClassRange(request *spec.ClassRangeRequest) (iter.Seq[pr

stateRoot := p2p2core.AdaptHash(request.Root)
startAddr := p2p2core.AdaptHash(request.Start)
b.log.Debugw("GetClassRange", "root", stateRoot.String(), "start", startAddr.String(), "chunks", request.ChunksPerProof)
b.log.Debugw("GetClassRange", "root", stateRoot, "start", startAddr, "chunks", request.ChunksPerProof)

return func(yield yieldFunc) {
s, err := b.blockchain.GetStateForStateRoot(stateRoot)
Expand Down Expand Up @@ -131,7 +131,7 @@ func (b *snapServer) GetClassRange(request *spec.ClassRangeRequest) (iter.Seq[pr
}

classkeys := []*felt.Felt{}
proofs, finished, err := iterateWithLimit(ctrie, startAddr, limitAddr, determineMaxNodes(request.ChunksPerProof),
proofs, finished, err := iterateWithLimit(ctrie, startAddr, limitAddr, determineMaxNodes(request.ChunksPerProof), b.log,
func(key, value *felt.Felt) error {
classkeys = append(classkeys, key)
return nil
Expand Down Expand Up @@ -186,7 +186,7 @@ func (b *snapServer) GetContractRange(request *spec.ContractRangeRequest) (iter.
}
stateRoot := p2p2core.AdaptHash(request.StateRoot)
startAddr := p2p2core.AdaptAddress(request.Start)
b.log.Debugw("GetContractRange", "root", stateRoot.String(), "start", startAddr.String(), "chunks", request.ChunksPerProof)
b.log.Debugw("GetContractRange", "root", stateRoot, "start", startAddr, "chunks", request.ChunksPerProof)

return func(yield yieldFunc) {
s, err := b.blockchain.GetStateForStateRoot(stateRoot)
Expand All @@ -213,7 +213,7 @@ func (b *snapServer) GetContractRange(request *spec.ContractRangeRequest) (iter.
states := []*spec.ContractState{}

for {
proofs, finished, err := iterateWithLimit(strie, startAddr, limitAddr, determineMaxNodes(request.ChunksPerProof),
proofs, finished, err := iterateWithLimit(strie, startAddr, limitAddr, determineMaxNodes(request.ChunksPerProof), b.log,
func(key, value *felt.Felt) error {
classHash, err := s.ContractClassHash(key)
if err != nil {
Expand Down Expand Up @@ -280,7 +280,9 @@ func (b *snapServer) GetStorageRange(request *spec.ContractStorageRequest) (iter
var finMsg proto.Message = &spec.ContractStorageResponse{
Responses: &spec.ContractStorageResponse_Fin{},
}
b.log.Debugw("GetStorageRange", "root", request.StateRoot.String(), "start[0]", request.Query[0].Start.Key.String())
stateRoot := p2p2core.AdaptHash(request.StateRoot)
startKey := p2p2core.AdaptFelt(request.Query[0].Start.Key)
b.log.Debugw("GetStorageRange", "root", stateRoot, "start[0]", startKey)

return func(yield yieldFunc) {
stateRoot := p2p2core.AdaptHash(request.StateRoot)
Expand All @@ -304,7 +306,7 @@ func (b *snapServer) GetStorageRange(request *spec.ContractStorageRequest) (iter
return
}

handled, err := b.handleStorageRangeRequest(strie, query, request.ChunksPerProof, contractLimit,
handled, err := b.handleStorageRangeRequest(strie, query, request.ChunksPerProof, contractLimit, b.log,
func(values []*spec.ContractStoredValue, proofs []trie.ProofNode) bool {
stoMsg := &spec.ContractStorageResponse{
StateRoot: request.StateRoot,
Expand Down Expand Up @@ -378,6 +380,7 @@ func (b *snapServer) handleStorageRangeRequest(
stTrie *trie.Trie,
request *spec.StorageRangeQuery,
maxChunkPerProof, nodeLimit uint32,
logger utils.SimpleLogger,
yield func([]*spec.ContractStoredValue, []trie.ProofNode) bool,
) (uint32, error) {
totalSent := 0
Expand All @@ -396,15 +399,16 @@ func (b *snapServer) handleStorageRangeRequest(
limit = nodeLimit
}

proofs, finish, err := iterateWithLimit(stTrie, startAddr, endAddr, limit, func(key, value *felt.Felt) error {
response = append(response, &spec.ContractStoredValue{
Key: core2p2p.AdaptFelt(key),
Value: core2p2p.AdaptFelt(value),
})
proofs, finish, err := iterateWithLimit(stTrie, startAddr, endAddr, limit, logger,
func(key, value *felt.Felt) error {
response = append(response, &spec.ContractStoredValue{
Key: core2p2p.AdaptFelt(key),
Value: core2p2p.AdaptFelt(value),
})

startAddr = key
return nil
})
startAddr = key
return nil
})
finished = finish

if err != nil {
Expand Down Expand Up @@ -435,12 +439,13 @@ func iterateWithLimit(
startAddr *felt.Felt,
limitAddr *felt.Felt,
maxNodes uint32,
logger utils.SimpleLogger,
consumer func(key, value *felt.Felt) error,
) ([]trie.ProofNode, bool, error) {
pathes := make([]*felt.Felt, 0)
hashes := make([]*felt.Felt, 0)

// TODO: Verify class trie
logger.Infow("entering IterateAndGenerateProof", "startAddr", startAddr, "endAddr", limitAddr, "maxNodes", maxNodes)
count := uint32(0)
proof, finished, err := srcTrie.IterateAndGenerateProof(startAddr, func(key *felt.Felt, value *felt.Felt) (bool, error) {
// Need at least one.
Expand All @@ -453,19 +458,23 @@ func iterateWithLimit(

err := consumer(key, value)
if err != nil {
logger.Errorw("error from consumer function", "err", err)
return false, err
}

count++
if count >= maxNodes {
logger.Infow("Max nodes reached", "count", count)
return false, nil
}
return true, nil
})
if err != nil {
logger.Errorw("IterateAndGenerateProof", "err", err, "finished", finished)
return nil, finished, err
}

logger.Infow("exiting IterateAndGenerateProof", "len(proof)", len(proof), "finished", finished, "err", err)
return proof, finished, err
}

Expand Down
23 changes: 17 additions & 6 deletions p2p/snap_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (

func TestClassRange(t *testing.T) {
// Note: set to true to make test super long to complete
shouldFetchAllClasses := false
shouldFetchAllClasses := true
var d db.DB
t.Skip("DB snapshot is needed for this test")
//t.Skip("DB snapshot is needed for this test")
d, _ = pebble.NewWithOptions("/Users/pnowosie/juno/snapshots/juno-sepolia", 128000000, 128, false)
defer func() { _ = d.Close() }()
bc := blockchain.New(d, &utils.Sepolia) // Needed because class loader need encoder to be registered
Expand All @@ -32,8 +32,9 @@ func TestClassRange(t *testing.T) {
fmt.Printf("headblock %d\n", b.Number)

stateRoot := b.GlobalStateRoot

logger, _ := utils.NewZapLogger(utils.DEBUG, false)
server := &snapServer{
log: logger,
blockchain: bc,
}

Expand All @@ -42,6 +43,10 @@ func TestClassRange(t *testing.T) {
chunksReceived := 0

chunksPerProof := 150
if shouldFetchAllClasses {
// decrease iteration count and hence speed up a bit
chunksPerProof *= 4
}
iter, err := server.GetClassRange(
&spec.ClassRangeRequest{
Root: core2p2p.AdaptHash(stateRoot),
Expand Down Expand Up @@ -86,7 +91,7 @@ func TestClassRange(t *testing.T) {

func TestContractRange(t *testing.T) {
var d db.DB
t.Skip("DB snapshot is needed for this test")
//t.Skip("DB snapshot is needed for this test")
d, _ = pebble.NewWithOptions("/Users/pnowosie/juno/snapshots/juno-sepolia", 128000000, 128, false)
defer func() { _ = d.Close() }()
bc := blockchain.New(d, &utils.Sepolia) // Needed because class loader need encoder to be registered
Expand All @@ -98,7 +103,9 @@ func TestContractRange(t *testing.T) {

stateRoot := b.GlobalStateRoot

logger, _ := utils.NewZapLogger(utils.DEBUG, false)
server := &snapServer{
log: logger,
blockchain: bc,
}

Expand Down Expand Up @@ -173,7 +180,7 @@ func TestContractRange_FinMsg_Received(t *testing.T) {

func TestContractStorageRange(t *testing.T) {
var d db.DB
t.Skip("DB snapshot is needed for this test")
//t.Skip("DB snapshot is needed for this test")
d, _ = pebble.NewWithOptions("/Users/pnowosie/juno/snapshots/juno-sepolia", 128000000, 128, false)
defer func() { _ = d.Close() }()
bc := blockchain.New(d, &utils.Sepolia) // Needed because class loader need encoder to be registered
Expand All @@ -185,7 +192,9 @@ func TestContractStorageRange(t *testing.T) {

stateRoot := b.GlobalStateRoot

logger, _ := utils.NewZapLogger(utils.DEBUG, false)
server := &snapServer{
log: logger,
blockchain: bc,
}

Expand Down Expand Up @@ -268,7 +277,7 @@ func TestContractStorageRange(t *testing.T) {

func TestGetClassesByHash(t *testing.T) {
var d db.DB
t.Skip("DB snapshot is needed for this test")
//t.Skip("DB snapshot is needed for this test")
d, _ = pebble.NewWithOptions("/Users/pnowosie/juno/snapshots/juno-sepolia", 128000000, 128, false)
defer func() { _ = d.Close() }()
bc := blockchain.New(d, &utils.Sepolia) // Needed because class loader need encoder to be registered
Expand All @@ -278,7 +287,9 @@ func TestGetClassesByHash(t *testing.T) {

fmt.Printf("headblock %d\n", b.Number)

logger, _ := utils.NewZapLogger(utils.DEBUG, false)
server := &snapServer{
log: logger,
blockchain: bc,
}

Expand Down
6 changes: 3 additions & 3 deletions p2p/snap_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (s *SnapSyncher) runFetchClassJob(ctx context.Context) error {
continue
}

classes := make([]*spec.Class, len(keyBatches))
classes := make([]*spec.Class, 0, len(keyBatches))
for response := range classIter {
if response == nil {
s.log.Errorw("class by keys respond with nil response")
Expand All @@ -554,14 +554,14 @@ func (s *SnapSyncher) runFetchClassJob(ctx context.Context) error {
s.log.Warnw("Unexpected ClassMessage from getClasses", "v", v)
}
}
s.log.Infow("class fetch job received response", "classes", len(classes))
s.log.Infow("class fetch job received response", "classes", len(classes), "asked keys", len(keyBatches))

processedClasses := map[felt.Felt]bool{}
newClasses := map[felt.Felt]core.Class{}
classHashes := map[felt.Felt]*felt.Felt{}
for i, class := range classes {
if class == nil {
s.log.Infow("class empty", "key", keyBatches[i])
s.log.Infow("class empty", "hash", keyBatches[i])
continue
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/starknet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

const (
unmarshalMaxSize = 15 * utils.Megabyte
readTimeout = 5 * time.Second
// TODO: allow a looot and tweak it later if needed, was 5 seconds before
readTimeout = 60 * time.Second
)

type NewStreamFunc func(ctx context.Context, pids ...protocol.ID) (network.Stream, error)
Expand Down
4 changes: 2 additions & 2 deletions p2p/starknetdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ func (m MockStarkData) BlockByNumber(ctx context.Context, blockNumber uint64) (*

func (m MockStarkData) BlockLatest(ctx context.Context) (*core.Block, error) {
// This is snapshot I have
root, _ := (&felt.Felt{}).SetString("0x472e84b65d387c9364b5117f4afaba3fb88897db1f28867b398506e2af89f25")
root, _ := (&felt.Felt{}).SetString("0x6df37678051ab529c243a5ae08e95eea4ddb40b874b4c537e2e6a9a459e2548")

return &core.Block{
Header: &core.Header{
Number: uint64(66477),
Number: uint64(66489),
GlobalStateRoot: root,
},
}, nil
Expand Down
1 change: 1 addition & 0 deletions utils/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (l *ZapLogger) Tracew(msg string, keysAndValues ...interface{}) {
}

var _ Logger = (*ZapLogger)(nil)
var _ SimpleLogger = (*ZapLogger)(nil)

func NewNopZapLogger() *ZapLogger {
return &ZapLogger{zap.NewNop().Sugar()}
Expand Down

0 comments on commit 75678cc

Please sign in to comment.