diff --git a/dex/src/main/resources/application.conf b/dex/src/main/resources/application.conf index dd4c287605..9f4c656a08 100644 --- a/dex/src/main/resources/application.conf +++ b/dex/src/main/resources/application.conf @@ -116,7 +116,7 @@ waves.dex { max-retry-attempts = 30 # Sets whether keepalive will be performed when there are no outstanding RPC on a connection - keep-alive-without-calls = true + keep-alive-without-calls = false # Sets the time without read activity before sending a keepalive ping. # Can't be less than 10s, see io.grpc.internal.KeepAliveManager diff --git a/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherSettingsSpecification.scala b/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherSettingsSpecification.scala index 7c0422c8dc..55ee75cdc1 100644 --- a/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherSettingsSpecification.scala +++ b/dex/src/test/scala/com/wavesplatform/dex/settings/MatcherSettingsSpecification.scala @@ -109,6 +109,7 @@ class MatcherSettingsSpecification extends BaseSettingsSpecification with Matche settings.eventsQueue.kafka.consumer.maxBufferSize shouldBe 777 settings.eventsQueue.kafka.consumer.client.getInt("foo") shouldBe 2 settings.eventsQueue.kafka.producer.client.getInt("bar") shouldBe 3 + settings.eventsQueue.kafka.producer.enable shouldBe false settings.eventsQueue.circuitBreaker should matchTo( CircuitBreakerSettings( maxFailures = 999, diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStream.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStream.scala index 5237ae8503..b244b0cb30 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStream.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStream.scala @@ -72,8 +72,8 @@ class CombinedStream( case (orig, Left(evt)) => utxEventsTransitions(orig, evt).tap(updated => log.info(s"utx: $orig + $evt -> $updated")) case (orig, Right(evt)) => blockchainEventsTransitions(orig, evt).tap(updated => log.info(s"bu: $orig + $evt -> $updated")) } - .doOnComplete(Task(log.info(s"lastStatus completed"))) - .doOnError(e => Task(log.error(s"lastStatus failed", e))) + .doOnComplete(Task(log.info("lastStatus completed"))) + .doOnError(e => Task(log.error("lastStatus failed", e))) .lastL .runToFuture @@ -102,18 +102,18 @@ class CombinedStream( // TODO DEX-1034 def startFrom(height: Int): Unit = { log.info(s"Starting from $height") - updateProcessedHeight(height) + updateProcessedHeight(height - 1) // Consider we processed the previous utxEvents.start() } - def updateProcessedHeight(height: Int): Unit = - processedHeight = height - + def updateProcessedHeight(height: Int): Unit = processedHeight = height def currentProcessedHeight: Int = processedHeight def restart(): Unit = { log.info("Restarting") - // Self-healed above + // Self-healed, + // see blockchainEventsTransitions: ? + Stopped + // then utxEventsTransitions: Stopping + Stopped blockchainUpdates.stop() } @@ -130,7 +130,7 @@ class CombinedStream( if (origStatus.utxEvents) ignore() else if (origStatus.oneDone) Status.Working else { - blockchainUpdates.startFrom(processedHeight) + blockchainUpdates.startFrom(processedHeight + 1) origStatus.copy(utxEvents = true) } @@ -267,12 +267,15 @@ class CombinedStream( } } + // Recovering after stopped streams private def recover(): Unit = { - // Failed to process the new block and haven't yet updated processedHeight. - // So drop the last processed block. - processedHeight = math.max(1, processedHeight - 1) - // And rollback to the previous, because we will download a block at processedHeight - internalStream.onNext(WavesNodeEvent.RolledBack(WavesNodeEvent.RolledBack.To.Height(processedHeight - 1))) + // We don't know, did we process the last block. It could be liquid and we could not receive all micro blocks. + // But we processed the previous block without doubt. + val updatedProcessedHeight = math.max(1, processedHeight - 1) + internalStream.onNext(WavesNodeEvent.RolledBack(WavesNodeEvent.RolledBack.To.Height(updatedProcessedHeight))) + // We need to updated this too, because RolledBack could not be processed + // before we call startFrom in utxEventsTransitions: Starting + BecameReady + updateProcessedHeight(updatedProcessedHeight) runWithDelay(() => utxEvents.start()) } diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala index 08d2909c45..166460918f 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala @@ -26,17 +26,17 @@ object StatusTransitions extends ScorexLogging { updatedLastBlockHeight = LastBlockHeight.RestartRequired ) - case Right(updatedFork) => + case Right(updatedChain) => StatusUpdate( - newStatus = Normal(updatedFork), + newStatus = Normal(updatedChain), updatedBalances = block.changes, - updatedLastBlockHeight = LastBlockHeight.Updated(block.ref.height), + updatedLastBlockHeight = LastBlockHeight.Updated(updatedChain.height), utxUpdate = UtxUpdate(confirmedTxs = block.confirmedTxs), requestNextBlockchainEvent = true ) } - case RolledBack(to) => // This could happen during an appending of a new key block too + case RolledBack(to) => val initFork = WavesFork(origStatus.main, origStatus.main) val updatedFork = to match { case To.CommonBlockRef(ref) => initFork.rollbackTo(ref) @@ -47,6 +47,7 @@ object StatusTransitions extends ScorexLogging { fork = updatedFork, utxUpdate = Monoid.empty[UtxUpdate] ), + updatedLastBlockHeight = LastBlockHeight.Updated(updatedFork.height), requestNextBlockchainEvent = true ) @@ -83,7 +84,7 @@ object StatusTransitions extends ScorexLogging { StatusUpdate( newStatus = Normal(resolved.activeChain), updatedBalances = resolved.newChanges, - updatedLastBlockHeight = LastBlockHeight.Updated(block.ref.height), + updatedLastBlockHeight = LastBlockHeight.Updated(resolved.activeChain.height), utxUpdate = finalUtxUpdate, requestNextBlockchainEvent = true ) @@ -94,7 +95,7 @@ object StatusTransitions extends ScorexLogging { stashChanges = resolved.newChanges, utxUpdate = finalUtxUpdate ), - updatedLastBlockHeight = LastBlockHeight.Updated(block.ref.height), + updatedLastBlockHeight = LastBlockHeight.Updated(resolved.activeChain.height), requestBalances = resolved.lostDiffIndex // requestNextBlockchainEvent = true // Because we are waiting for DataReceived ) @@ -102,6 +103,7 @@ object StatusTransitions extends ScorexLogging { case Status.NotResolved(updatedFork) => StatusUpdate( newStatus = origStatus.copy(fork = updatedFork), + updatedLastBlockHeight = LastBlockHeight.Updated(updatedFork.height), requestNextBlockchainEvent = true ) @@ -120,6 +122,7 @@ object StatusTransitions extends ScorexLogging { } StatusUpdate( newStatus = origStatus.copy(fork = updatedFork), + updatedLastBlockHeight = LastBlockHeight.Updated(updatedFork.height), requestNextBlockchainEvent = true ) diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusUpdate.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusUpdate.scala index b5d74e5270..4fb45a6bb5 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusUpdate.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusUpdate.scala @@ -23,7 +23,11 @@ object StatusUpdate { object LastBlockHeight { case object NotChanged extends LastBlockHeight + + // We confirm the latest event from the blockchain updates stream case class Updated(to: Int) extends LastBlockHeight + + // Something is broken, so we need to rollback and reconnect case object RestartRequired extends LastBlockHeight } diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesNodeEvent.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesNodeEvent.scala index 52fffd4d25..a63366053d 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesNodeEvent.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesNodeEvent.scala @@ -23,10 +23,13 @@ object WavesNodeEvent { object To { + // Only from the blockchain updates stream. + // Either rollback to a block, or to a micro block (micro fork). case class CommonBlockRef(ref: BlockRef) extends To { override def toString: String = s"CommonBlockRef(${ref.height}, ${ref.id})" } + // Only during issues: a wrong block, disconnects case class Height(h: Int) extends To } diff --git a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStreamTestSuite.scala b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStreamTestSuite.scala index 6d051409eb..f6b15452c9 100644 --- a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStreamTestSuite.scala +++ b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedStreamTestSuite.scala @@ -51,7 +51,7 @@ class CombinedStreamTestSuite extends WavesIntegrationSuiteBase with Eventually "affects the recovery height hint" in { val t = mk() t.cs.startFrom(10) - t.cs.currentProcessedHeight shouldBe 10 + t.cs.currentProcessedHeight shouldBe 9 } } @@ -63,7 +63,7 @@ class CombinedStreamTestSuite extends WavesIntegrationSuiteBase with Eventually } } - "restartFrom" - { + "restart" - { "stop blockchainUpdates" in { val t = mk() t.cs.startFrom(10) @@ -81,8 +81,10 @@ class CombinedStreamTestSuite extends WavesIntegrationSuiteBase with Eventually "doesn't affect the recovery height" in { val t = mk() t.cs.startFrom(10) + t.cs.currentProcessedHeight shouldBe 9 + t.cs.restart() - t.cs.currentProcessedHeight shouldBe 10 + t.cs.currentProcessedHeight shouldBe 9 } } diff --git a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala index 6f3319cf7e..dee633f1a1 100644 --- a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala +++ b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala @@ -153,7 +153,8 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase { fork = WavesFork(init.main, mkChain(Vector(block1), 99)), utxUpdate = Monoid.empty[UtxUpdate] ), - requestNextBlockchainEvent = true + requestNextBlockchainEvent = true, + updatedLastBlockHeight = LastBlockHeight.Updated(1) )) "Height" in test(RolledBack.To.Height(1)) @@ -269,7 +270,8 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase { fork = WavesFork(init.fork.origChain, mkChain(Vector(block2B, block1), 98)), utxUpdate = init.utxUpdate ), - requestNextBlockchainEvent = true + requestNextBlockchainEvent = true, + updatedLastBlockHeight = LastBlockHeight.Updated(2) )) } @@ -361,7 +363,8 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase { ), utxUpdate = init.utxUpdate ), - requestNextBlockchainEvent = true + requestNextBlockchainEvent = true, + updatedLastBlockHeight = LastBlockHeight.Updated(1) )) "Height" in test(RolledBack.To.Height(1))