Skip to content

Commit

Permalink
DEX-1096 Unnecessary reconnects to blockchain updates stream (#474)
Browse files Browse the repository at this point in the history
  • Loading branch information
vsuharnikov authored Mar 5, 2021
1 parent 8ecd26c commit b805dca
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 26 deletions.
2 changes: 1 addition & 1 deletion dex/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ waves.dex {
max-retry-attempts = 30

# Sets whether keepalive will be performed when there are no outstanding RPC on a connection
keep-alive-without-calls = true
keep-alive-without-calls = false

# Sets the time without read activity before sending a keepalive ping.
# Can't be less than 10s, see io.grpc.internal.KeepAliveManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class MatcherSettingsSpecification extends BaseSettingsSpecification with Matche
settings.eventsQueue.kafka.consumer.maxBufferSize shouldBe 777
settings.eventsQueue.kafka.consumer.client.getInt("foo") shouldBe 2
settings.eventsQueue.kafka.producer.client.getInt("bar") shouldBe 3
settings.eventsQueue.kafka.producer.enable shouldBe false
settings.eventsQueue.circuitBreaker should matchTo(
CircuitBreakerSettings(
maxFailures = 999,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class CombinedStream(
case (orig, Left(evt)) => utxEventsTransitions(orig, evt).tap(updated => log.info(s"utx: $orig + $evt -> $updated"))
case (orig, Right(evt)) => blockchainEventsTransitions(orig, evt).tap(updated => log.info(s"bu: $orig + $evt -> $updated"))
}
.doOnComplete(Task(log.info(s"lastStatus completed")))
.doOnError(e => Task(log.error(s"lastStatus failed", e)))
.doOnComplete(Task(log.info("lastStatus completed")))
.doOnError(e => Task(log.error("lastStatus failed", e)))
.lastL
.runToFuture

Expand Down Expand Up @@ -102,18 +102,18 @@ class CombinedStream(
// TODO DEX-1034
def startFrom(height: Int): Unit = {
log.info(s"Starting from $height")
updateProcessedHeight(height)
updateProcessedHeight(height - 1) // Consider we processed the previous
utxEvents.start()
}

def updateProcessedHeight(height: Int): Unit =
processedHeight = height

def updateProcessedHeight(height: Int): Unit = processedHeight = height
def currentProcessedHeight: Int = processedHeight

def restart(): Unit = {
log.info("Restarting")
// Self-healed above
// Self-healed,
// see blockchainEventsTransitions: ? + Stopped
// then utxEventsTransitions: Stopping + Stopped
blockchainUpdates.stop()
}

Expand All @@ -130,7 +130,7 @@ class CombinedStream(
if (origStatus.utxEvents) ignore()
else if (origStatus.oneDone) Status.Working
else {
blockchainUpdates.startFrom(processedHeight)
blockchainUpdates.startFrom(processedHeight + 1)
origStatus.copy(utxEvents = true)
}

Expand Down Expand Up @@ -267,12 +267,15 @@ class CombinedStream(
}
}

// Recovering after stopped streams
private def recover(): Unit = {
// Failed to process the new block and haven't yet updated processedHeight.
// So drop the last processed block.
processedHeight = math.max(1, processedHeight - 1)
// And rollback to the previous, because we will download a block at processedHeight
internalStream.onNext(WavesNodeEvent.RolledBack(WavesNodeEvent.RolledBack.To.Height(processedHeight - 1)))
// We don't know, did we process the last block. It could be liquid and we could not receive all micro blocks.
// But we processed the previous block without doubt.
val updatedProcessedHeight = math.max(1, processedHeight - 1)
internalStream.onNext(WavesNodeEvent.RolledBack(WavesNodeEvent.RolledBack.To.Height(updatedProcessedHeight)))
// We need to updated this too, because RolledBack could not be processed
// before we call startFrom in utxEventsTransitions: Starting + BecameReady
updateProcessedHeight(updatedProcessedHeight)
runWithDelay(() => utxEvents.start())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ object StatusTransitions extends ScorexLogging {
updatedLastBlockHeight = LastBlockHeight.RestartRequired
)

case Right(updatedFork) =>
case Right(updatedChain) =>
StatusUpdate(
newStatus = Normal(updatedFork),
newStatus = Normal(updatedChain),
updatedBalances = block.changes,
updatedLastBlockHeight = LastBlockHeight.Updated(block.ref.height),
updatedLastBlockHeight = LastBlockHeight.Updated(updatedChain.height),
utxUpdate = UtxUpdate(confirmedTxs = block.confirmedTxs),
requestNextBlockchainEvent = true
)
}

case RolledBack(to) => // This could happen during an appending of a new key block too
case RolledBack(to) =>
val initFork = WavesFork(origStatus.main, origStatus.main)
val updatedFork = to match {
case To.CommonBlockRef(ref) => initFork.rollbackTo(ref)
Expand All @@ -47,6 +47,7 @@ object StatusTransitions extends ScorexLogging {
fork = updatedFork,
utxUpdate = Monoid.empty[UtxUpdate]
),
updatedLastBlockHeight = LastBlockHeight.Updated(updatedFork.height),
requestNextBlockchainEvent = true
)

Expand Down Expand Up @@ -83,7 +84,7 @@ object StatusTransitions extends ScorexLogging {
StatusUpdate(
newStatus = Normal(resolved.activeChain),
updatedBalances = resolved.newChanges,
updatedLastBlockHeight = LastBlockHeight.Updated(block.ref.height),
updatedLastBlockHeight = LastBlockHeight.Updated(resolved.activeChain.height),
utxUpdate = finalUtxUpdate,
requestNextBlockchainEvent = true
)
Expand All @@ -94,14 +95,15 @@ object StatusTransitions extends ScorexLogging {
stashChanges = resolved.newChanges,
utxUpdate = finalUtxUpdate
),
updatedLastBlockHeight = LastBlockHeight.Updated(block.ref.height),
updatedLastBlockHeight = LastBlockHeight.Updated(resolved.activeChain.height),
requestBalances = resolved.lostDiffIndex
// requestNextBlockchainEvent = true // Because we are waiting for DataReceived
)

case Status.NotResolved(updatedFork) =>
StatusUpdate(
newStatus = origStatus.copy(fork = updatedFork),
updatedLastBlockHeight = LastBlockHeight.Updated(updatedFork.height),
requestNextBlockchainEvent = true
)

Expand All @@ -120,6 +122,7 @@ object StatusTransitions extends ScorexLogging {
}
StatusUpdate(
newStatus = origStatus.copy(fork = updatedFork),
updatedLastBlockHeight = LastBlockHeight.Updated(updatedFork.height),
requestNextBlockchainEvent = true
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ object StatusUpdate {

object LastBlockHeight {
case object NotChanged extends LastBlockHeight

// We confirm the latest event from the blockchain updates stream
case class Updated(to: Int) extends LastBlockHeight

// Something is broken, so we need to rollback and reconnect
case object RestartRequired extends LastBlockHeight
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ object WavesNodeEvent {

object To {

// Only from the blockchain updates stream.
// Either rollback to a block, or to a micro block (micro fork).
case class CommonBlockRef(ref: BlockRef) extends To {
override def toString: String = s"CommonBlockRef(${ref.height}, ${ref.id})"
}

// Only during issues: a wrong block, disconnects
case class Height(h: Int) extends To

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CombinedStreamTestSuite extends WavesIntegrationSuiteBase with Eventually
"affects the recovery height hint" in {
val t = mk()
t.cs.startFrom(10)
t.cs.currentProcessedHeight shouldBe 10
t.cs.currentProcessedHeight shouldBe 9
}
}

Expand All @@ -63,7 +63,7 @@ class CombinedStreamTestSuite extends WavesIntegrationSuiteBase with Eventually
}
}

"restartFrom" - {
"restart" - {
"stop blockchainUpdates" in {
val t = mk()
t.cs.startFrom(10)
Expand All @@ -81,8 +81,10 @@ class CombinedStreamTestSuite extends WavesIntegrationSuiteBase with Eventually
"doesn't affect the recovery height" in {
val t = mk()
t.cs.startFrom(10)
t.cs.currentProcessedHeight shouldBe 9

t.cs.restart()
t.cs.currentProcessedHeight shouldBe 10
t.cs.currentProcessedHeight shouldBe 9
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase {
fork = WavesFork(init.main, mkChain(Vector(block1), 99)),
utxUpdate = Monoid.empty[UtxUpdate]
),
requestNextBlockchainEvent = true
requestNextBlockchainEvent = true,
updatedLastBlockHeight = LastBlockHeight.Updated(1)
))

"Height" in test(RolledBack.To.Height(1))
Expand Down Expand Up @@ -269,7 +270,8 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase {
fork = WavesFork(init.fork.origChain, mkChain(Vector(block2B, block1), 98)),
utxUpdate = init.utxUpdate
),
requestNextBlockchainEvent = true
requestNextBlockchainEvent = true,
updatedLastBlockHeight = LastBlockHeight.Updated(2)
))
}

Expand Down Expand Up @@ -361,7 +363,8 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase {
),
utxUpdate = init.utxUpdate
),
requestNextBlockchainEvent = true
requestNextBlockchainEvent = true,
updatedLastBlockHeight = LastBlockHeight.Updated(1)
))

"Height" in test(RolledBack.To.Height(1))
Expand Down

0 comments on commit b805dca

Please sign in to comment.