Skip to content

Commit

Permalink
Merge branch 'wip-check-parent-header'
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Aug 13, 2018
2 parents 9bd9355 + 7bea47e commit fe8a4db
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.pattern.ask
import akka.pattern.pipe
import akka.util.ByteString
import java.math.BigInteger
import java.util.concurrent.ThreadLocalRandom
import khipu.Hash
import khipu.blockchain.sync
import khipu.blockchain.sync.HandshakedPeersService.BlacklistPeer
Expand Down Expand Up @@ -141,7 +142,7 @@ trait FastSyncService { _: SyncService =>
// res22: scala.concurrent.Future[List[Int]] = Success(List(2, 0))
Future.sequence(peersUsedToChooseTarget map {
case (peer, PeerInfo(Status(protocolVersion, networkId, totalDifficulty, bestHash, genesisHash), _, _, _)) =>
requestingHeaders(peer, Right(bestHash), 1, 0, reverse = false) map {
requestingHeaders(peer, None, Right(bestHash), 1, 0, reverse = false) map {
case Some(BlockHeadersResponse(peerId, Seq(header), true)) =>
log.debug(s"Got BlockHeadersResponse with 1 header number=${header.number} from ${peer.id}")
Some(peer -> header)
Expand Down Expand Up @@ -193,7 +194,7 @@ trait FastSyncService { _: SyncService =>
log.info(s"Fetching block headers of target $targetBlockNumber")

val fs = Future.sequence(receivedHeaders.toSeq.map(_._1) map { peer =>
requestingHeaders(peer, Left(targetBlockNumber), 1, 0, reverse = false) transform {
requestingHeaders(peer, None, Left(targetBlockNumber), 1, 0, reverse = false) transform {
case Success(Some(BlockHeadersResponse(peerId, headers, true))) =>
log.debug(s"Got BlockHeadersResponse with 1 header from ${peer.id}")
headers.find(_.number == targetBlockNumber) match {
Expand Down Expand Up @@ -383,7 +384,7 @@ trait FastSyncService { _: SyncService =>

if (syncState.pendingNonMptNodes.nonEmpty || syncState.pendingMptNodes.nonEmpty) {
val blockchainOnlys = blockchainOnlyPeers.values.toSet
val nodeWorks = (unassignedPeers -- blockchainOnlys)
val nodeWorks = unassignedPeers.filterNot(blockchainOnlys.contains)
.take(maxConcurrentRequests - workingPeers.size)
.toSeq.sortBy(_.id)
.foldLeft(Vector[(Peer, List[NodeHash])]()) {
Expand Down Expand Up @@ -411,7 +412,6 @@ trait FastSyncService { _: SyncService =>
if (syncState.pendingReceipts.nonEmpty) {
val receiptWorks = unassignedPeers
.take(maxConcurrentRequests - workingPeers.size)
.toSeq.sortBy(_.id)
.foldLeft(Vector[(Peer, List[Hash])]()) {
case (acc, peer) =>
if (syncState.pendingReceipts.nonEmpty) {
Expand All @@ -434,7 +434,6 @@ trait FastSyncService { _: SyncService =>
if (syncState.pendingBlockBodies.nonEmpty) {
val bodyWorks = unassignedPeers
.take(maxConcurrentRequests - workingPeers.size)
.toSeq.sortBy(_.id)
.foldLeft(Vector[(Peer, List[Hash])]()) {
case (acc, peer) =>
if (syncState.pendingBlockBodies.nonEmpty) {
Expand All @@ -455,28 +454,45 @@ trait FastSyncService { _: SyncService =>
}

if (isThereHeaderToDownload && headerWorkingPeer.isEmpty) {
val headerWorks = unassignedPeers
val candicates = unassignedPeers
.take(maxConcurrentRequests - workingPeers.size)
.toSeq.sortBy(_.id)
.headOption flatMap { peer =>
if (isThereHeaderToDownload && headerWorkingPeer.isEmpty) {
headerWorkingPeer = Some(peer.id)
workingPeers += peer.id

Some(peer)
} else {
None
}
.toSeq.sortBy(_.id).take(3)

val headerWorks = nextPeer(candicates) flatMap { peer =>
if (isThereHeaderToDownload && headerWorkingPeer.isEmpty) {
headerWorkingPeer = Some(peer.id)
workingPeers += peer.id

Some(peer)
} else {
None
}
}

headerWorks foreach { requestBlockHeaders }
}
}
}

private def nextPeer(candicates: Seq[Peer]): Option[Peer] = {
if (candicates.nonEmpty) {
Some(candicates(nextCandicate(0, candicates.length)))
} else {
None
}
}

private def nextCandicate(low: Int, high: Int) = { // >= low and < high
val rnd = ThreadLocalRandom.current()
rnd.nextInt(high - low) + low
}

private def unassignedPeers: Set[Peer] =
peersToDownloadFrom.keySet filterNot { peer => workingPeers.contains(peer.id) }
private def unassignedPeers: List[Peer] = {
val peerToUse = peersToDownloadFrom collect {
case (peer, PeerInfo(_, totalDifficulty, true, _)) if !workingPeers.contains(peer.id) => (peer, totalDifficulty)
}
peerToUse.toList.sortBy { _._2.negate } map (_._1)
}

private def isFullySynced =
!isAnythingToDownload && workingPeers.isEmpty
Expand All @@ -499,28 +515,33 @@ trait FastSyncService { _: SyncService =>

val limit = math.min(blockHeadersPerRequest, syncState.targetBlockNumber - syncState.bestBlockHeaderNumber)
//log.debug(s"Request block headers: ${request.message.block.fold(n => n, _.hexString)}, bestBlockHeaderNumber is ${syncState.bestBlockHeaderNumber}, target is ${syncState.targetBlockNumber}")
val future = requestingHeaders(peer, Left(syncState.bestBlockHeaderNumber + 1), limit, skip = 0, reverse = false) map {
case Some(BlockHeadersResponse(peerId, headers, true)) =>
log.debug(s"Got block headers ${headers.size} from ${peer.id} in ${System.currentTimeMillis - start}ms")
insertHeaders(headers)
val blockHashes = headers.map(_.hash)
self ! EnqueueBlockBodies(blockHashes)
self ! EnqueueReceipts(blockHashes)
blockchain.getBlockHeaderByNumber(syncState.bestBlockHeaderNumber) match {
case Some(parentHeader) =>
val future = requestingHeaders(peer, Some(parentHeader), Left(syncState.bestBlockHeaderNumber + 1), limit, skip = 0, reverse = false) map {
case Some(BlockHeadersResponse(peerId, headers, true)) =>
log.debug(s"Got block headers ${headers.size} from ${peer.id} in ${System.currentTimeMillis - start}ms")
val blockHeaderObtained = insertHeaders(headers)
val blockHashes = blockHeaderObtained.map(_.hash)
self ! EnqueueBlockBodies(blockHashes)
self ! EnqueueReceipts(blockHashes)

case Some(BlockHeadersResponse(peerId, headers, false)) =>
self ! BlacklistPeer(peer.id, s"Got block headers non-consistent for requested: ${syncState.bestBlockHeaderNumber + 1}")
case Some(BlockHeadersResponse(peerId, headers, false)) =>
self ! BlacklistPeer(peer.id, s"Got block headers non-consistent for requested: ${syncState.bestBlockHeaderNumber + 1}")

case None =>
self ! BlacklistPeer(peer.id, s"Got block headers empty for known header: ${syncState.bestBlockHeaderNumber + 1}")
case None =>
self ! BlacklistPeer(peer.id, s"Got block headers empty for known header: ${syncState.bestBlockHeaderNumber + 1}")

} andThen {
case Failure(e: AskTimeoutException) =>
self ! BlacklistPeer(peer.id, s"${e.getMessage}")
case Failure(e) =>
self ! BlacklistPeer(peer.id, s"${e.getMessage}")
case _ =>
} andThen {
case _ => self ! HeaderWorkDone(peer.id)
} andThen {
case Failure(e: AskTimeoutException) =>
self ! BlacklistPeer(peer.id, s"${e.getMessage}")
case Failure(e) =>
self ! BlacklistPeer(peer.id, s"${e.getMessage}")
case _ =>
} andThen {
case _ => self ! HeaderWorkDone(peer.id)
}
case None => // TODO
log.error(s"previous best block ${syncState.bestBlockHeaderNumber} does not exist yet, something wrong !!!")
}
}

Expand Down Expand Up @@ -651,7 +672,7 @@ trait FastSyncService { _: SyncService =>
} getOrElse (Valid)
}

private def insertHeaders(headers: List[BlockHeader]) {
private def insertHeaders(headers: List[BlockHeader]): List[BlockHeader] = {
val blockHeadersObtained = headers.takeWhile { header =>
blockchain.getTotalDifficultyByHash(header.parentHash) match {
case Some(parentTotalDifficulty) =>
Expand All @@ -669,6 +690,8 @@ trait FastSyncService { _: SyncService =>
syncState.bestBlockHeaderNumber = lastHeader.number
}
}

blockHeadersObtained
}

private def reportStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ trait RegularSyncService { _: SyncService =>

log.debug(s"Request block headers beginning at $nextBlockNumber via best peer $peer")

requestingHeaders(peer, Left(nextBlockNumber), blockHeadersPerRequest, skip = 0, reverse = false)(syncRequestTimeout) map {
requestingHeaders(peer, None, Left(nextBlockNumber), blockHeadersPerRequest, skip = 0, reverse = false)(syncRequestTimeout) map {
case Some(sync.BlockHeadersResponse(peerId, headers, true)) =>
log.debug(s"Got block headers from $peer")
if (lookbackFromBlock.isDefined) {
Expand Down Expand Up @@ -196,7 +196,7 @@ trait RegularSyncService { _: SyncService =>
} else {
log.info(s"[sync] Received branch block ${headers.head.number} from ${peer.id}, resolving fork ...")

requestingHeaders(peer, Right(headers.head.parentHash), blockResolveDepth, skip = 0, reverse = true)(syncRequestTimeout) map {
requestingHeaders(peer, None, Right(headers.head.parentHash), blockResolveDepth, skip = 0, reverse = true)(syncRequestTimeout) map {
case Some(sync.BlockHeadersResponse(peerId, headers, true)) =>
self ! ProcessBlockHeaders(peer, headers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import khipu.Hash
import khipu.Stop
import khipu.crypto
import khipu.domain.Block
import khipu.domain.BlockHeader
import khipu.network.p2p.Message
import khipu.network.p2p.messages.PV62
import khipu.network.p2p.messages.PV63
Expand Down Expand Up @@ -142,9 +143,9 @@ class SyncService() extends FastSyncService with RegularSyncService with Handsha
case Stop => context stop self
}

protected def requestingHeaders(peer: Peer, blockNumberOrHash: Either[Long, Hash], maxHeaders: Long, skip: Long, reverse: Boolean)(implicit timeout: FiniteDuration) = {
protected def requestingHeaders(peer: Peer, parentHeader: Option[BlockHeader], blockNumberOrHash: Either[Long, Hash], maxHeaders: Long, skip: Long, reverse: Boolean)(implicit timeout: FiniteDuration) = {
isRequesting = true
val request = BlockHeadersRequest(peer.id, PV62.GetBlockHeaders(blockNumberOrHash, maxHeaders, skip, reverse))
val request = BlockHeadersRequest(peer.id, parentHeader, PV62.GetBlockHeaders(blockNumberOrHash, maxHeaders, skip, reverse))
(peer.entity ? request)(timeout).mapTo[Option[BlockHeadersResponse]] andThen {
case _ => isRequesting = false
}
Expand Down
4 changes: 2 additions & 2 deletions khipu-eth/src/main/scala/khipu/blockchain/sync/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ package object sync {
}

final case class BlockHeadersResponse(peerId: String, headers: List[BlockHeader], isConsistent: Boolean) extends PeerResponse
final case class BlockHeadersRequest(peerId: String, message: PV62.GetBlockHeaders) extends RequestToPeer[PV62.BlockHeaders, BlockHeadersResponse] {
final case class BlockHeadersRequest(peerId: String, parentHeader: Option[BlockHeader], message: PV62.GetBlockHeaders) extends RequestToPeer[PV62.BlockHeaders, BlockHeadersResponse] {
def messageToSend = message

def processResponse(blockHeaders: PV62.BlockHeaders) = {
val headers = (if (message.reverse) blockHeaders.headers.reverse else blockHeaders.headers).toList

if (headers.nonEmpty) {
if (isHeadersConsistent(headers)) {
if (parentHeader.fold(true) { prevHeader => headers.head.parentHash == prevHeader.hash } && isHeadersConsistent(headers)) {
Some(BlockHeadersResponse(peerId, headers, true))
} else {
Some(BlockHeadersResponse(peerId, List(), false))
Expand Down

0 comments on commit fe8a4db

Please sign in to comment.