From 21a9e67750096f0f2161350ab53dbbc7322731df Mon Sep 17 00:00:00 2001 From: Vyatcheslav Suharnikov Date: Sun, 7 Mar 2021 23:28:59 +0300 Subject: [PATCH] DEX-1099 Tradable balance issues during rollbacks (#476) See StatusTransitions for more information. --- .../it/sync/BouncingBalancesTestSuite.scala | 47 +++++++++++--- .../clients/domain/StatusTransitions.scala | 15 +++-- .../clients/domain/WavesFork.scala | 9 ++- .../domain/StatusTransitionsTestSuite.scala | 28 +++++++-- .../clients/domain/WavesForkTestSuite.scala | 63 +++++++++++++++++-- 5 files changed, 139 insertions(+), 23 deletions(-) diff --git a/dex-it/src/test/scala/com/wavesplatform/it/sync/BouncingBalancesTestSuite.scala b/dex-it/src/test/scala/com/wavesplatform/it/sync/BouncingBalancesTestSuite.scala index 514430e671..196d791f36 100644 --- a/dex-it/src/test/scala/com/wavesplatform/it/sync/BouncingBalancesTestSuite.scala +++ b/dex-it/src/test/scala/com/wavesplatform/it/sync/BouncingBalancesTestSuite.scala @@ -106,17 +106,27 @@ class BouncingBalancesTestSuite extends WsSuiteBase { dex1.api.getOrderStatus(bobOrder).status shouldBe Status.Accepted - withClue(s"After rollback order should not be cancelled and balances should not be decreased\n") { - wavesNode1.api.rollback(heightInitial, returnTransactionsToUtx = false) - wavesNode1.api.currentHeight shouldBe heightInitial + withClue("After rollback order should not be cancelled and balances should not be decreased\n") { + wavesNode1.asyncApi.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node + wavesMinerNode.api.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node + eventually { + wavesNode1.api.currentHeight shouldBe >=(heightInitial) + } wavesNode1.api.waitForHeight(heightSecondTransfer) + wavesNode1.api.waitForHeightArise() // See WavesFork + Thread.sleep(3000) // TODO We need an API to see the state on Matcher dex1.api.getOrderStatus(bobOrder).status shouldBe Status.Accepted - wsc.messages.filter { - case _: WsPingOrPong => false - case _ => true - } shouldBe empty + // wsc.messages.filter { + // case _: WsPingOrPong => false + // case _ => true + // } shouldBe empty + + // Relates DEX-1099 + dex1.api.getTradableBalance(bob, AssetPair(doggyCoin, Waves)) should matchTo(Map[Asset, Long]( + Waves -> 494994999700000L + )) } wsc.close() @@ -126,6 +136,8 @@ class BouncingBalancesTestSuite extends WsSuiteBase { val aliceWsc = mkWsAddressConnection(alice, dex1) val bobWsc = mkWsAddressConnection(bob, dex1) + val heightInitial = wavesNode1.api.currentHeight + val now = System.currentTimeMillis() val counterOrders = (1 to 25).map(i => mkOrderDP(alice, wavesUsdPair, OrderType.BUY, 1.waves, 10, ts = now + i)) val submittedOrders = (1 to 50).map(i => mkOrderDP(bob, wavesUsdPair, OrderType.SELL, 0.5.waves, 10, ts = now + i)) @@ -158,6 +170,27 @@ class BouncingBalancesTestSuite extends WsSuiteBase { val bobWavesChanges = collectTradableBalanceChanges(bobWsc, Waves) checkOrdering("bob Waves", bobWavesChanges)(_ shouldBe >=(_)) + submittedOrders.foreach(waitForOrderAtNode(_)) + val finalHeight = wavesNode1.api.waitForHeightArise() + val bobBalanceBefore = dex1.api.getTradableBalance(bob, wavesUsdPair) + + step("Doing a rollback") + wavesNode1.asyncApi.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node + wavesMinerNode.api.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node + eventually { + wavesNode1.api.currentHeight shouldBe >=(heightInitial) + } + + step("Wait for a height to be restored") + wavesNode1.api.waitForHeight(finalHeight) + wavesNode1.api.waitForHeightArise() // See WavesFork + Thread.sleep(3000) + + // Relates DEX-1099 + eventually { + dex1.api.getTradableBalance(bob, wavesUsdPair) should matchTo(bobBalanceBefore) + } + aliceWsc.close() bobWsc.close() } diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala index 166460918f..b81a51e1a9 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitions.scala @@ -75,10 +75,17 @@ object StatusTransitions extends ScorexLogging { case Appended(block) => origStatus.fork.withBlock(block) match { case resolved: Status.Resolved => - val finalUtxUpdate = origStatus.utxUpdate |+| UtxUpdate( - confirmedTxs = resolved.confirmedTxs, - failedTxs = Map.empty // resolved.lostTxIds - ) + val finalUtxUpdate = { + val x = origStatus.utxUpdate |+| UtxUpdate( + confirmedTxs = resolved.newConfirmedTxs, + failedTxs = Map.empty // resolved.lostTxIds + ) + + // This solves a situation when rolled back transactions are moved to UTX Pool + // and then confirmed in a micro block. + // Relates DEX-1099 + x.copy(unconfirmedTxs = x.unconfirmedTxs.filterNot(x => resolved.commonTxIds.contains(x.id))) + } if (resolved.lostDiffIndex.isEmpty) StatusUpdate( diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesFork.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesFork.scala index 1bf5ee384c..3ea7893724 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesFork.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesFork.scala @@ -41,7 +41,8 @@ case class WavesFork private[domain] (origChain: WavesChain, forkChain: WavesCha newChanges = updatedForkAllChanges, // TODO DEX-1011 lostDiffIndex = origForkDiffIndex.without(updatedForkDiffIndex), lostTxIds = origTxs -- forkTxs.keys, - confirmedTxs = forkTxs -- origTxs.keys + newConfirmedTxs = forkTxs -- origTxs.keys, + commonTxIds = forkTxs.keySet.intersect(origTxs.keySet) ) } else Status.NotResolved(copy(forkChain = updatedForkChain)) } @@ -58,12 +59,16 @@ object WavesFork { object Status { + /** + * @param commonTxIds Common on a forked part of chain + */ case class Resolved( activeChain: WavesChain, newChanges: BlockchainBalance, lostDiffIndex: DiffIndex, lostTxIds: Map[ByteString, TransactionWithChanges], // Will be used in the future - confirmedTxs: Map[ByteString, TransactionWithChanges] + newConfirmedTxs: Map[ByteString, TransactionWithChanges], + commonTxIds: Set[ByteString] ) extends Status case class NotResolved(updatedFork: WavesFork) extends Status diff --git a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala index dee633f1a1..95ed3aabfa 100644 --- a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala +++ b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/StatusTransitionsTestSuite.scala @@ -1,7 +1,7 @@ package com.wavesplatform.dex.grpc.integration.clients.domain import cats.Monoid -import cats.syntax.semigroup._ +import cats.implicits._ import com.google.protobuf.UnsafeByteOperations import com.wavesplatform.dex.WavesIntegrationSuiteBase import com.wavesplatform.dex.domain.account.KeyPair @@ -210,7 +210,14 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase { origChain = mkChain(Vector(block2A, block1), 98), forkChain = mkChain(Vector(block2B, block1), 98) ), - utxUpdate = UtxUpdate(failedTxs = mkUtxTransactionMap(30)) + utxUpdate = UtxUpdate( + unconfirmedTxs = List( + 2, // from block2A to UTX Pool during a rollback + 31, + 32 + ).foldMapK(mkUtxTransactionMap).values.toSeq, + failedTxs = mkUtxTransactionMap(30) + ) ) "Appended ->" - { @@ -223,7 +230,11 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase { outgoingLeasing = Map(bob -> 10) ), tpe = WavesBlock.Type.MicroBlock, - confirmedTxs = mkTransactionWithChangesMap(10) + confirmedTxs = List( + 2, // The tx migrates from block2A to a new micro block, relates DEX-1099 + 10, + 31 // init.utxUpdate + ).foldMapK(mkTransactionWithChangesMap) ) val event = Appended(microBlock) @@ -234,9 +245,14 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase { ), updatedBalances = block2B.changes |+| microBlock.changes, updatedLastBlockHeight = StatusUpdate.LastBlockHeight.Updated(2), - utxUpdate = init.utxUpdate |+| UtxUpdate( - confirmedTxs = block2B.confirmedTxs ++ microBlock.confirmedTxs, - failedTxs = Map.empty // It doesn't affect now + utxUpdate = UtxUpdate( + unconfirmedTxs = mkUtxTransactionMap(32).values.toSeq, // init.utxUpdate, 31 is gone, because confirmed + confirmedTxs = List( + 3, // block2B + 10, // microBlock + 31 // microBlock + ).foldMapK(mkTransactionWithChangesMap), + failedTxs = mkUtxTransactionMap(30) // init.utxUpdate ), requestNextBlockchainEvent = true )) diff --git a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesForkTestSuite.scala b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesForkTestSuite.scala index 7b41be8d35..0989915f30 100644 --- a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesForkTestSuite.scala +++ b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/domain/WavesForkTestSuite.scala @@ -3,6 +3,7 @@ package com.wavesplatform.dex.grpc.integration.clients.domain import java.nio.charset.StandardCharsets import cats.Monoid +import cats.implicits._ import com.wavesplatform.dex.domain.account.KeyPair import com.wavesplatform.dex.domain.asset.Asset.{IssuedAsset, Waves} import com.wavesplatform.dex.domain.bytes.ByteStr @@ -142,7 +143,7 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven } "Resolved" - { - "on a key block with a higher height" in { + "on a block with a higher height" in { val block4 = WavesBlock( ref = BlockRef(height = 4, id = ByteStr(Array[Byte](98, 1, 1, 1, 0))), reference = block3.ref.id, @@ -167,7 +168,8 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven ), lostDiffIndex = Monoid.empty[DiffIndex], lostTxIds = Map.empty, - confirmedTxs = block3.confirmedTxs ++ block4.confirmedTxs + newConfirmedTxs = block3.confirmedTxs ++ block4.confirmedTxs, + commonTxIds = Set.empty )) } @@ -197,7 +199,59 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven ), lostDiffIndex = Monoid.empty[DiffIndex], lostTxIds = Map.empty, - confirmedTxs = microBlock.confirmedTxs + newConfirmedTxs = microBlock.confirmedTxs, + commonTxIds = Set.empty + )) + } + + "after the key block with the higher height" in { + val microBlock1 = WavesBlock( + ref = BlockRef(height = 1, id = ByteStr(Array[Byte](98, 2))), + reference = block1.ref.id, + changes = BlockchainBalance( + regular = Map(alice -> Map(usd -> 69)), + outgoingLeasing = Map(bob -> 2L) + ), + tpe = WavesBlock.Type.MicroBlock, + confirmedTxs = (10 to 12).toList.foldMapK(mkTransactionWithChangesMap) + ) + + val block2B = WavesBlock( + ref = BlockRef(height = 2, id = ByteStr(Array[Byte](98, 1, 0))), + reference = block1.ref.id, + changes = BlockchainBalance( + regular = Map(bob -> Map(usd -> 35)), + outgoingLeasing = Map.empty + ), + tpe = WavesBlock.Type.FullBlock, + confirmedTxs = mkTransactionWithChangesMap(2) ++ + mkTransactionWithChangesMap(11) // migrated from microBlock1 + ) + + val microBlock2 = WavesBlock( + ref = BlockRef(height = 2, id = ByteStr(Array[Byte](98, 1, 1))), + reference = block2B.ref.id, + changes = BlockchainBalance( + regular = Map(bob -> Map(Waves -> 11)), + outgoingLeasing = Map(alice -> 9L) + ), + tpe = WavesBlock.Type.MicroBlock, + confirmedTxs = mkTransactionWithChangesMap(100) ++ + mkTransactionWithChangesMap(12) // migrated from microBlock1 + ) + + val fork = WavesFork( + mkChain(Vector(microBlock1, block1), 99), + mkChain(Vector(block2B, block1), 98) + ) + + fork.withBlock(microBlock2) should matchTo[Status](Status.Resolved( + activeChain = mkChain(Vector(microBlock2, block2B, block1), 98), + newChanges = block2B.changes |+| microBlock2.changes, + lostDiffIndex = microBlock1.diffIndex, + lostTxIds = mkTransactionWithChangesMap(10), // from microBlock1 + newConfirmedTxs = List(2, 100).foldMapK(mkTransactionWithChangesMap), + commonTxIds = Set(mkTxId(11), mkTxId(12)) )) } @@ -234,7 +288,8 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven newChanges = microBlock2.changes, lostDiffIndex = Monoid.empty[DiffIndex], lostTxIds = Map.empty, - confirmedTxs = microBlock2.confirmedTxs + newConfirmedTxs = microBlock2.confirmedTxs, + commonTxIds = Set.empty )) } }