From 78ca65ba2385b4336472d39b2e1afd75af653ce6 Mon Sep 17 00:00:00 2001 From: pingke Date: Tue, 28 Nov 2023 22:48:15 +0800 Subject: [PATCH 1/7] Change the order in which peers are assigned to tasks --- ethstorage/p2p/protocol/syncclient.go | 16 ++++++++++++++-- ethstorage/p2p/protocol/task.go | 11 ++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/ethstorage/p2p/protocol/syncclient.go b/ethstorage/p2p/protocol/syncclient.go index c02bfb9c..ec81e406 100644 --- a/ethstorage/p2p/protocol/syncclient.go +++ b/ethstorage/p2p/protocol/syncclient.go @@ -305,6 +305,7 @@ func (s *SyncClient) createTask(sid uint64, lastKvIndex uint64) *task { task := task{ Contract: s.storageManager.ContractAddress(), ShardId: sid, + lastSubTaskIdx: 0, statelessPeers: make(map[peer.ID]struct{}), peers: make(map[peer.ID]struct{}), } @@ -420,6 +421,9 @@ func (s *SyncClient) cleanTasks() { t.SubTasks[i].First = min if t.SubTasks[i].done && !exist { t.SubTasks = append(t.SubTasks[:i], t.SubTasks[i+1:]...) + if t.lastSubTaskIdx >= i { + t.lastSubTaskIdx-- + } i-- } } @@ -618,8 +622,12 @@ func (s *SyncClient) assignBlobRangeTasks() { // Iterate over all the tasks and try to find a pending one for _, t := range s.tasks { maxRange := s.syncerParams.MaxRequestSize / ethstorage.ContractToShardManager[t.Contract].MaxKvSize() * 2 - for _, stask := range t.SubTasks { - st := stask + nextIdx := t.lastSubTaskIdx + 1 + subTaskCount := len(t.SubTasks) + for idx := 0; idx < subTaskCount; idx++ { + nextIdx = nextIdx % subTaskCount + st := t.SubTasks[nextIdx] + nextIdx++ if st.done { continue } @@ -627,6 +635,9 @@ func (s *SyncClient) assignBlobRangeTasks() { if st.isRunning { continue } + if len(s.idlerPeers) == 0 { + break + } pr := s.getIdlePeerForTask(t) if pr == nil { continue @@ -689,6 +700,7 @@ func (s *SyncClient) assignBlobRangeTasks() { s.OnBlobsByRange(res) }(pr.id) } + t.lastSubTaskIdx = nextIdx - 1 } } diff --git a/ethstorage/p2p/protocol/task.go b/ethstorage/p2p/protocol/task.go index f2084a27..8afebb3e 100644 --- a/ethstorage/p2p/protocol/task.go +++ b/ethstorage/p2p/protocol/task.go @@ -13,11 +13,12 @@ import ( // task represents the sync task for a storage shard. type task struct { // These fields get serialized to leveldb on shutdown - Contract common.Address // Contract address - ShardId uint64 // ShardId - SubTasks []*subTask - healTask *healTask - SubEmptyTasks []*subEmptyTask + Contract common.Address // Contract address + ShardId uint64 // ShardId + SubTasks []*subTask + lastSubTaskIdx int + healTask *healTask + SubEmptyTasks []*subEmptyTask // TODO: consider whether we need to retry those stateless peers or disconnect the peer statelessPeers map[peer.ID]struct{} // Peers that failed to deliver kv Data From fdf21fa0a0de96630ee311bd0f4a1b523bfc5549 Mon Sep 17 00:00:00 2001 From: pingke Date: Thu, 30 Nov 2023 11:50:42 +0800 Subject: [PATCH 2/7] expose miningInfo Attributes to other package --- ethstorage/miner/l1_mining_api.go | 6 +++--- ethstorage/miner/miner.go | 12 ++++++------ ethstorage/miner/worker.go | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ethstorage/miner/l1_mining_api.go b/ethstorage/miner/l1_mining_api.go index 35b07033..1c38dd78 100644 --- a/ethstorage/miner/l1_mining_api.go +++ b/ethstorage/miner/l1_mining_api.go @@ -52,9 +52,9 @@ func (m *l1MiningAPI) GetMiningInfo(ctx context.Context, contract common.Address return nil, err } mi := &miningInfo{ - lastMineTime: res[0].(*big.Int).Uint64(), - difficulty: res[1].(*big.Int), - blockMined: res[2].(*big.Int), + LastMineTime: res[0].(*big.Int).Uint64(), + Difficulty: res[1].(*big.Int), + BlockMined: res[2].(*big.Int), } return mi, nil } diff --git a/ethstorage/miner/miner.go b/ethstorage/miner/miner.go index ab73a321..71580b48 100644 --- a/ethstorage/miner/miner.go +++ b/ethstorage/miner/miner.go @@ -31,17 +31,17 @@ type MiningProver interface { } type miningInfo struct { - lastMineTime uint64 - difficulty *big.Int - blockMined *big.Int + LastMineTime uint64 + Difficulty *big.Int + BlockMined *big.Int } func (a *miningInfo) String() string { return fmt.Sprintf( "LastMineTime: %d, Difficulty: %s, BlockMined: %s", - a.lastMineTime, - a.difficulty.String(), - a.blockMined.String(), + a.LastMineTime, + a.Difficulty.String(), + a.BlockMined.String(), ) } diff --git a/ethstorage/miner/worker.go b/ethstorage/miner/worker.go index 37607a0f..371e617c 100644 --- a/ethstorage/miner/worker.go +++ b/ethstorage/miner/worker.go @@ -235,11 +235,11 @@ func (w *worker) updateDifficulty(shardIdx, blockTime uint64) (*big.Int, error) w.lg.Warn("Failed to get es mining info", "error", err.Error()) return nil, err } - w.lg.Info("Mining info retrieved", "shard", shardIdx, "lastMineTime", info.lastMineTime, "difficulty", info.difficulty, "proofsSubmitted", info.blockMined) + w.lg.Info("Mining info retrieved", "shard", shardIdx, "LastMineTime", info.LastMineTime, "Difficulty", info.Difficulty, "proofsSubmitted", info.BlockMined) reqDiff := new(big.Int).Div(maxUint256, expectedDiff( - info.lastMineTime, + info.LastMineTime, blockTime, - info.difficulty, + info.Difficulty, w.config.Cutoff, w.config.DiffAdjDivisor, w.config.MinimumDiff, From dcc4d1507b2fb3b7a3d121975ef4c4ddf1f1a31b Mon Sep 17 00:00:00 2001 From: pingke Date: Thu, 30 Nov 2023 12:24:32 +0800 Subject: [PATCH 3/7] format code --- ethstorage/data_file.go | 2 +- ethstorage/eth/beacon_client.go | 24 ++++++++++++------------ ethstorage/p2p/host.go | 3 +-- ethstorage/p2p/peer_gater.go | 4 ++-- ethstorage/p2p/peer_params.go | 2 +- ethstorage/p2p/peer_scorer.go | 4 ++-- ethstorage/p2p/protocol/sync_test.go | 2 +- ethstorage/p2p/topic_params.go | 2 +- ethstorage/prover/kzg_prover_test.go | 2 +- ethstorage/prover/zk_prover.go | 2 +- ethstorage/prover/zk_prover_test.go | 4 ++-- ethstorage/storage_manager.go | 4 ++-- 12 files changed, 27 insertions(+), 28 deletions(-) diff --git a/ethstorage/data_file.go b/ethstorage/data_file.go index 57d678bf..ab3417e6 100644 --- a/ethstorage/data_file.go +++ b/ethstorage/data_file.go @@ -156,7 +156,7 @@ func (df *DataFile) Read(chunkIdx uint64, len int) ([]byte, error) { return nil, fmt.Errorf("chunk not found") } if len > int(df.chunkSize) { - return nil, fmt.Errorf(("read too large")) + return nil, fmt.Errorf("read too large") } md := make([]byte, len) n, err := df.file.ReadAt(md, HEADER_SIZE+int64(chunkIdx-df.chunkIdxStart)*int64(df.chunkSize)) diff --git a/ethstorage/eth/beacon_client.go b/ethstorage/eth/beacon_client.go index 074563ee..4605e863 100644 --- a/ethstorage/eth/beacon_client.go +++ b/ethstorage/eth/beacon_client.go @@ -14,15 +14,15 @@ import ( ) type BeaconClient struct { - beaconURL string - basedTime uint64 - basedSlot uint64 - slotTime uint64 + beaconURL string + basedTime uint64 + basedSlot uint64 + slotTime uint64 } type Blob struct { - VersionedHash common.Hash - Data []byte + VersionedHash common.Hash + Data []byte } type beaconBlobs struct { @@ -45,13 +45,13 @@ func NewBeaconClient(url string, basedTime uint64, basedSlot uint64, slotTime ui beaconURL: url, basedTime: basedTime, basedSlot: basedSlot, - slotTime: slotTime, + slotTime: slotTime, } return res } func (c *BeaconClient) Timestamp2Slot(time uint64) uint64 { - return (time - c.basedTime) / c.slotTime + c.basedSlot + return (time-c.basedTime)/c.slotTime + c.basedSlot } func (c *BeaconClient) DownloadBlobs(slot uint64) (map[common.Hash]Blob, error) { @@ -82,19 +82,19 @@ func (c *BeaconClient) DownloadBlobs(slot uint64) (map[common.Hash]Blob, error) if err != nil { return nil, err } - res[hash] = Blob{VersionedHash: hash, Data:asciiBytes} + res[hash] = Blob{VersionedHash: hash, Data: asciiBytes} } return res, nil } func kzgToVersionedHash(commit string) (common.Hash, error) { - b, err := hex.DecodeString(commit[2:]); + b, err := hex.DecodeString(commit[2:]) if err != nil { return common.Hash{}, err } c := [48]byte{} - copy(c[:], b[:]) + copy(c[:], b[:]) return common.Hash(eth.KZGToVersionedHash(c)), nil -} \ No newline at end of file +} diff --git a/ethstorage/p2p/host.go b/ethstorage/p2p/host.go index 27689bfa..359e7880 100644 --- a/ethstorage/p2p/host.go +++ b/ethstorage/p2p/host.go @@ -7,7 +7,7 @@ import ( "sync" "time" - libp2p "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p" lconf "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" @@ -247,7 +247,6 @@ func YamuxC() libp2p.Option { return libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport) } - func NoiseC() libp2p.Option { return libp2p.Security(noise.ID, noise.New) } diff --git a/ethstorage/p2p/peer_gater.go b/ethstorage/p2p/peer_gater.go index da606190..e3efe832 100644 --- a/ethstorage/p2p/peer_gater.go +++ b/ethstorage/p2p/peer_gater.go @@ -1,8 +1,8 @@ package p2p import ( - log "github.com/ethereum/go-ethereum/log" - peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/ethereum/go-ethereum/log" + "github.com/libp2p/go-libp2p/core/peer" ) // ConnectionFactor is the factor by which we multiply the connection score. diff --git a/ethstorage/p2p/peer_params.go b/ethstorage/p2p/peer_params.go index 0e70d119..d61cfb91 100644 --- a/ethstorage/p2p/peer_params.go +++ b/ethstorage/p2p/peer_params.go @@ -86,7 +86,7 @@ var DisabledPeerScoreParams = func(blockTime uint64) pubsub.PeerScoreParams { } // PeerScoreParamsByName is a map of name to function that returns a [pubsub.PeerScoreParams] based on the provided [rollup.Config]. -var PeerScoreParamsByName = map[string](func(blockTime uint64) pubsub.PeerScoreParams){ +var PeerScoreParamsByName = map[string]func(blockTime uint64) pubsub.PeerScoreParams{ "light": LightPeerScoreParams, "none": DisabledPeerScoreParams, } diff --git a/ethstorage/p2p/peer_scorer.go b/ethstorage/p2p/peer_scorer.go index 630775a4..64582a35 100644 --- a/ethstorage/p2p/peer_scorer.go +++ b/ethstorage/p2p/peer_scorer.go @@ -6,9 +6,9 @@ import ( "strconv" "strings" - log "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/log" pubsub "github.com/libp2p/go-libp2p-pubsub" - peer "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peer" ) type scorer struct { diff --git a/ethstorage/p2p/protocol/sync_test.go b/ethstorage/p2p/protocol/sync_test.go index aaf9d98e..ed335bca 100644 --- a/ethstorage/p2p/protocol/sync_test.go +++ b/ethstorage/p2p/protocol/sync_test.go @@ -18,7 +18,7 @@ import ( "time" "github.com/detailyang/go-fallocate" - ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" diff --git a/ethstorage/p2p/topic_params.go b/ethstorage/p2p/topic_params.go index cee5b2fe..950832f2 100644 --- a/ethstorage/p2p/topic_params.go +++ b/ethstorage/p2p/topic_params.go @@ -86,7 +86,7 @@ var DisabledTopicScoreParams = func(blockTime uint64) pubsub.TopicScoreParams { } // TopicScoreParamsByName is a map of name to [pubsub.TopicScoreParams]. -var TopicScoreParamsByName = map[string](func(blockTime uint64) pubsub.TopicScoreParams){ +var TopicScoreParamsByName = map[string]func(blockTime uint64) pubsub.TopicScoreParams{ "light": LightTopicScoreParams, "none": DisabledTopicScoreParams, } diff --git a/ethstorage/prover/kzg_prover_test.go b/ethstorage/prover/kzg_prover_test.go index 7f516d88..16189827 100644 --- a/ethstorage/prover/kzg_prover_test.go +++ b/ethstorage/prover/kzg_prover_test.go @@ -130,7 +130,7 @@ func verifyInclusive(trunkIdx uint64, peInput []byte) error { dataHash := common.Hash{} copy(dataHash[:], peInput[:24]) - index := new(big.Int).SetInt64((int64(trunkIdx))) + index := new(big.Int).SetInt64(int64(trunkIdx)) decodedData := new(big.Int).SetBytes(peInput[64:96]) h := crypto.Keccak256Hash([]byte("checkInclusive(bytes32,uint256,uint256,bytes)")) diff --git a/ethstorage/prover/zk_prover.go b/ethstorage/prover/zk_prover.go index d16f6d50..c9381df5 100644 --- a/ethstorage/prover/zk_prover.go +++ b/ethstorage/prover/zk_prover.go @@ -173,7 +173,7 @@ func readMask(publicFile string) (common.Hash, error) { } defer f.Close() var output []string - var decoder *json.Decoder = json.NewDecoder(f) + var decoder = json.NewDecoder(f) err = decoder.Decode(&output) if err != nil { return common.Hash{}, err diff --git a/ethstorage/prover/zk_prover_test.go b/ethstorage/prover/zk_prover_test.go index 6f54c0cf..8ae4b58b 100644 --- a/ethstorage/prover/zk_prover_test.go +++ b/ethstorage/prover/zk_prover_test.go @@ -131,7 +131,7 @@ func readXIn(buildDir string) (string, error) { } defer f.Close() var input InputPair - var decoder *json.Decoder = json.NewDecoder(f) + var decoder = json.NewDecoder(f) err = decoder.Decode(&input) if err != nil { return "", err @@ -162,7 +162,7 @@ func verifyDecodeSample(proof ZKProof, trunkIdx uint64, encodingKey, mask common defer client.Close() encodingKeyBN := new(big.Int).SetBytes(encodingKey[:]) - indexBN := new(big.Int).SetInt64((int64(trunkIdx))) + indexBN := new(big.Int).SetInt64(int64(trunkIdx)) maskBN := new(big.Int).SetBytes(mask[:]) h := crypto.Keccak256Hash([]byte("decodeSample(((uint256,uint256),(uint256[2],uint256[2]),(uint256,uint256)),uint256,uint256,uint256)")) diff --git a/ethstorage/storage_manager.go b/ethstorage/storage_manager.go index 9a96c6f3..52a713c7 100644 --- a/ethstorage/storage_manager.go +++ b/ethstorage/storage_manager.go @@ -102,8 +102,8 @@ func (s *StorageManager) DownloadFinished(newL1 int64, kvIndices []uint64, blobs wg.Wait() for i := 0; i < taskIdx; i++ { - res := <- chanRes - if (res != nil) { + res := <-chanRes + if res != nil { return res } } From 8fcf5a8c716d745434747f09fd4dbac42ce5259a Mon Sep 17 00:00:00 2001 From: pingke Date: Thu, 30 Nov 2023 20:45:28 +0800 Subject: [PATCH 4/7] remove expose miningInfo change --- ethstorage/miner/l1_mining_api.go | 6 +++--- ethstorage/miner/miner.go | 14 +++++++------- ethstorage/miner/worker.go | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/ethstorage/miner/l1_mining_api.go b/ethstorage/miner/l1_mining_api.go index 1c38dd78..35b07033 100644 --- a/ethstorage/miner/l1_mining_api.go +++ b/ethstorage/miner/l1_mining_api.go @@ -52,9 +52,9 @@ func (m *l1MiningAPI) GetMiningInfo(ctx context.Context, contract common.Address return nil, err } mi := &miningInfo{ - LastMineTime: res[0].(*big.Int).Uint64(), - Difficulty: res[1].(*big.Int), - BlockMined: res[2].(*big.Int), + lastMineTime: res[0].(*big.Int).Uint64(), + difficulty: res[1].(*big.Int), + blockMined: res[2].(*big.Int), } return mi, nil } diff --git a/ethstorage/miner/miner.go b/ethstorage/miner/miner.go index 71580b48..22f3efe0 100644 --- a/ethstorage/miner/miner.go +++ b/ethstorage/miner/miner.go @@ -31,17 +31,17 @@ type MiningProver interface { } type miningInfo struct { - LastMineTime uint64 - Difficulty *big.Int - BlockMined *big.Int + lastMineTime uint64 + difficulty *big.Int + blockMined *big.Int } func (a *miningInfo) String() string { return fmt.Sprintf( - "LastMineTime: %d, Difficulty: %s, BlockMined: %s", - a.LastMineTime, - a.Difficulty.String(), - a.BlockMined.String(), + "lastMineTime: %d, difficulty: %s, blockMined: %s", + a.lastMineTime, + a.difficulty.String(), + a.blockMined.String(), ) } diff --git a/ethstorage/miner/worker.go b/ethstorage/miner/worker.go index 371e617c..37607a0f 100644 --- a/ethstorage/miner/worker.go +++ b/ethstorage/miner/worker.go @@ -235,11 +235,11 @@ func (w *worker) updateDifficulty(shardIdx, blockTime uint64) (*big.Int, error) w.lg.Warn("Failed to get es mining info", "error", err.Error()) return nil, err } - w.lg.Info("Mining info retrieved", "shard", shardIdx, "LastMineTime", info.LastMineTime, "Difficulty", info.Difficulty, "proofsSubmitted", info.BlockMined) + w.lg.Info("Mining info retrieved", "shard", shardIdx, "lastMineTime", info.lastMineTime, "difficulty", info.difficulty, "proofsSubmitted", info.blockMined) reqDiff := new(big.Int).Div(maxUint256, expectedDiff( - info.LastMineTime, + info.lastMineTime, blockTime, - info.Difficulty, + info.difficulty, w.config.Cutoff, w.config.DiffAdjDivisor, w.config.MinimumDiff, From c277fbcab8290e77124eca0b8c0289abf5af4c9c Mon Sep 17 00:00:00 2001 From: pingke Date: Thu, 30 Nov 2023 20:47:45 +0800 Subject: [PATCH 5/7] remove useless change --- ethstorage/miner/miner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethstorage/miner/miner.go b/ethstorage/miner/miner.go index 22f3efe0..ab73a321 100644 --- a/ethstorage/miner/miner.go +++ b/ethstorage/miner/miner.go @@ -38,7 +38,7 @@ type miningInfo struct { func (a *miningInfo) String() string { return fmt.Sprintf( - "lastMineTime: %d, difficulty: %s, blockMined: %s", + "LastMineTime: %d, Difficulty: %s, BlockMined: %s", a.lastMineTime, a.difficulty.String(), a.blockMined.String(), From e9c43ba1e9e31bec56361f48715b085ae82256fd Mon Sep 17 00:00:00 2001 From: pingke Date: Fri, 1 Dec 2023 09:59:56 +0800 Subject: [PATCH 6/7] resolve comments --- ethstorage/p2p/protocol/syncclient.go | 14 ++++++-------- ethstorage/p2p/protocol/task.go | 12 ++++++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/ethstorage/p2p/protocol/syncclient.go b/ethstorage/p2p/protocol/syncclient.go index ec81e406..79b6b5a7 100644 --- a/ethstorage/p2p/protocol/syncclient.go +++ b/ethstorage/p2p/protocol/syncclient.go @@ -305,7 +305,7 @@ func (s *SyncClient) createTask(sid uint64, lastKvIndex uint64) *task { task := task{ Contract: s.storageManager.ContractAddress(), ShardId: sid, - lastSubTaskIdx: 0, + nextIdx: 0, statelessPeers: make(map[peer.ID]struct{}), peers: make(map[peer.ID]struct{}), } @@ -421,8 +421,8 @@ func (s *SyncClient) cleanTasks() { t.SubTasks[i].First = min if t.SubTasks[i].done && !exist { t.SubTasks = append(t.SubTasks[:i], t.SubTasks[i+1:]...) - if t.lastSubTaskIdx >= i { - t.lastSubTaskIdx-- + if t.nextIdx > i { + t.nextIdx-- } i-- } @@ -622,12 +622,11 @@ func (s *SyncClient) assignBlobRangeTasks() { // Iterate over all the tasks and try to find a pending one for _, t := range s.tasks { maxRange := s.syncerParams.MaxRequestSize / ethstorage.ContractToShardManager[t.Contract].MaxKvSize() * 2 - nextIdx := t.lastSubTaskIdx + 1 subTaskCount := len(t.SubTasks) for idx := 0; idx < subTaskCount; idx++ { - nextIdx = nextIdx % subTaskCount - st := t.SubTasks[nextIdx] - nextIdx++ + t.nextIdx = t.nextIdx % subTaskCount + st := t.SubTasks[t.nextIdx] + t.nextIdx++ if st.done { continue } @@ -700,7 +699,6 @@ func (s *SyncClient) assignBlobRangeTasks() { s.OnBlobsByRange(res) }(pr.id) } - t.lastSubTaskIdx = nextIdx - 1 } } diff --git a/ethstorage/p2p/protocol/task.go b/ethstorage/p2p/protocol/task.go index 8afebb3e..ae92fa08 100644 --- a/ethstorage/p2p/protocol/task.go +++ b/ethstorage/p2p/protocol/task.go @@ -13,12 +13,12 @@ import ( // task represents the sync task for a storage shard. type task struct { // These fields get serialized to leveldb on shutdown - Contract common.Address // Contract address - ShardId uint64 // ShardId - SubTasks []*subTask - lastSubTaskIdx int - healTask *healTask - SubEmptyTasks []*subEmptyTask + Contract common.Address // Contract address + ShardId uint64 // ShardId + SubTasks []*subTask + nextIdx int + healTask *healTask + SubEmptyTasks []*subEmptyTask // TODO: consider whether we need to retry those stateless peers or disconnect the peer statelessPeers map[peer.ID]struct{} // Peers that failed to deliver kv Data From 74cd27cff64d8898bd0173842ed451e9061d6501 Mon Sep 17 00:00:00 2001 From: pingke Date: Tue, 5 Dec 2023 16:00:32 +0800 Subject: [PATCH 7/7] resolve comments --- ethstorage/p2p/protocol/syncclient.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/ethstorage/p2p/protocol/syncclient.go b/ethstorage/p2p/protocol/syncclient.go index fd304081..1d9774da 100644 --- a/ethstorage/p2p/protocol/syncclient.go +++ b/ethstorage/p2p/protocol/syncclient.go @@ -633,6 +633,10 @@ func (s *SyncClient) assignBlobRangeTasks() { maxRange := s.syncerParams.MaxRequestSize / ethstorage.ContractToShardManager[t.Contract].MaxKvSize() * 2 subTaskCount := len(t.SubTasks) for idx := 0; idx < subTaskCount; idx++ { + pr := s.getIdlePeerForTask(t) + if pr == nil { + break + } t.nextIdx = t.nextIdx % subTaskCount st := t.SubTasks[t.nextIdx] t.nextIdx++ @@ -643,13 +647,6 @@ func (s *SyncClient) assignBlobRangeTasks() { if st.isRunning { continue } - if len(s.idlerPeers) == 0 { - break - } - pr := s.getIdlePeerForTask(t) - if pr == nil { - continue - } last := st.next + maxRange if last > st.Last {