Skip to content

Commit

Permalink
DEX-1099 Tradable balance issues during rollbacks (#476)
Browse files Browse the repository at this point in the history
See StatusTransitions for more information.
  • Loading branch information
vsuharnikov authored Mar 7, 2021
1 parent b805dca commit 21a9e67
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,27 @@ class BouncingBalancesTestSuite extends WsSuiteBase {

dex1.api.getOrderStatus(bobOrder).status shouldBe Status.Accepted

withClue(s"After rollback order should not be cancelled and balances should not be decreased\n") {
wavesNode1.api.rollback(heightInitial, returnTransactionsToUtx = false)
wavesNode1.api.currentHeight shouldBe heightInitial
withClue("After rollback order should not be cancelled and balances should not be decreased\n") {
wavesNode1.asyncApi.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node
wavesMinerNode.api.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node
eventually {
wavesNode1.api.currentHeight shouldBe >=(heightInitial)
}

wavesNode1.api.waitForHeight(heightSecondTransfer)
wavesNode1.api.waitForHeightArise() // See WavesFork
Thread.sleep(3000) // TODO We need an API to see the state on Matcher
dex1.api.getOrderStatus(bobOrder).status shouldBe Status.Accepted

wsc.messages.filter {
case _: WsPingOrPong => false
case _ => true
} shouldBe empty
// wsc.messages.filter {
// case _: WsPingOrPong => false
// case _ => true
// } shouldBe empty

// Relates DEX-1099
dex1.api.getTradableBalance(bob, AssetPair(doggyCoin, Waves)) should matchTo(Map[Asset, Long](
Waves -> 494994999700000L
))
}

wsc.close()
Expand All @@ -126,6 +136,8 @@ class BouncingBalancesTestSuite extends WsSuiteBase {
val aliceWsc = mkWsAddressConnection(alice, dex1)
val bobWsc = mkWsAddressConnection(bob, dex1)

val heightInitial = wavesNode1.api.currentHeight

val now = System.currentTimeMillis()
val counterOrders = (1 to 25).map(i => mkOrderDP(alice, wavesUsdPair, OrderType.BUY, 1.waves, 10, ts = now + i))
val submittedOrders = (1 to 50).map(i => mkOrderDP(bob, wavesUsdPair, OrderType.SELL, 0.5.waves, 10, ts = now + i))
Expand Down Expand Up @@ -158,6 +170,27 @@ class BouncingBalancesTestSuite extends WsSuiteBase {
val bobWavesChanges = collectTradableBalanceChanges(bobWsc, Waves)
checkOrdering("bob Waves", bobWavesChanges)(_ shouldBe >=(_))

submittedOrders.foreach(waitForOrderAtNode(_))
val finalHeight = wavesNode1.api.waitForHeightArise()
val bobBalanceBefore = dex1.api.getTradableBalance(bob, wavesUsdPair)

step("Doing a rollback")
wavesNode1.asyncApi.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node
wavesMinerNode.api.rollback(heightInitial, returnTransactionsToUtx = true) // true as on Node
eventually {
wavesNode1.api.currentHeight shouldBe >=(heightInitial)
}

step("Wait for a height to be restored")
wavesNode1.api.waitForHeight(finalHeight)
wavesNode1.api.waitForHeightArise() // See WavesFork
Thread.sleep(3000)

// Relates DEX-1099
eventually {
dex1.api.getTradableBalance(bob, wavesUsdPair) should matchTo(bobBalanceBefore)
}

aliceWsc.close()
bobWsc.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,17 @@ object StatusTransitions extends ScorexLogging {
case Appended(block) =>
origStatus.fork.withBlock(block) match {
case resolved: Status.Resolved =>
val finalUtxUpdate = origStatus.utxUpdate |+| UtxUpdate(
confirmedTxs = resolved.confirmedTxs,
failedTxs = Map.empty // resolved.lostTxIds
)
val finalUtxUpdate = {
val x = origStatus.utxUpdate |+| UtxUpdate(
confirmedTxs = resolved.newConfirmedTxs,
failedTxs = Map.empty // resolved.lostTxIds
)

// This solves a situation when rolled back transactions are moved to UTX Pool
// and then confirmed in a micro block.
// Relates DEX-1099
x.copy(unconfirmedTxs = x.unconfirmedTxs.filterNot(x => resolved.commonTxIds.contains(x.id)))
}

if (resolved.lostDiffIndex.isEmpty)
StatusUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ case class WavesFork private[domain] (origChain: WavesChain, forkChain: WavesCha
newChanges = updatedForkAllChanges, // TODO DEX-1011
lostDiffIndex = origForkDiffIndex.without(updatedForkDiffIndex),
lostTxIds = origTxs -- forkTxs.keys,
confirmedTxs = forkTxs -- origTxs.keys
newConfirmedTxs = forkTxs -- origTxs.keys,
commonTxIds = forkTxs.keySet.intersect(origTxs.keySet)
)
} else Status.NotResolved(copy(forkChain = updatedForkChain))
}
Expand All @@ -58,12 +59,16 @@ object WavesFork {

object Status {

/**
* @param commonTxIds Common on a forked part of chain
*/
case class Resolved(
activeChain: WavesChain,
newChanges: BlockchainBalance,
lostDiffIndex: DiffIndex,
lostTxIds: Map[ByteString, TransactionWithChanges], // Will be used in the future
confirmedTxs: Map[ByteString, TransactionWithChanges]
newConfirmedTxs: Map[ByteString, TransactionWithChanges],
commonTxIds: Set[ByteString]
) extends Status

case class NotResolved(updatedFork: WavesFork) extends Status
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.wavesplatform.dex.grpc.integration.clients.domain

import cats.Monoid
import cats.syntax.semigroup._
import cats.implicits._
import com.google.protobuf.UnsafeByteOperations
import com.wavesplatform.dex.WavesIntegrationSuiteBase
import com.wavesplatform.dex.domain.account.KeyPair
Expand Down Expand Up @@ -210,7 +210,14 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase {
origChain = mkChain(Vector(block2A, block1), 98),
forkChain = mkChain(Vector(block2B, block1), 98)
),
utxUpdate = UtxUpdate(failedTxs = mkUtxTransactionMap(30))
utxUpdate = UtxUpdate(
unconfirmedTxs = List(
2, // from block2A to UTX Pool during a rollback
31,
32
).foldMapK(mkUtxTransactionMap).values.toSeq,
failedTxs = mkUtxTransactionMap(30)
)
)

"Appended ->" - {
Expand All @@ -223,7 +230,11 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase {
outgoingLeasing = Map(bob -> 10)
),
tpe = WavesBlock.Type.MicroBlock,
confirmedTxs = mkTransactionWithChangesMap(10)
confirmedTxs = List(
2, // The tx migrates from block2A to a new micro block, relates DEX-1099
10,
31 // init.utxUpdate
).foldMapK(mkTransactionWithChangesMap)
)

val event = Appended(microBlock)
Expand All @@ -234,9 +245,14 @@ class StatusTransitionsTestSuite extends WavesIntegrationSuiteBase {
),
updatedBalances = block2B.changes |+| microBlock.changes,
updatedLastBlockHeight = StatusUpdate.LastBlockHeight.Updated(2),
utxUpdate = init.utxUpdate |+| UtxUpdate(
confirmedTxs = block2B.confirmedTxs ++ microBlock.confirmedTxs,
failedTxs = Map.empty // It doesn't affect now
utxUpdate = UtxUpdate(
unconfirmedTxs = mkUtxTransactionMap(32).values.toSeq, // init.utxUpdate, 31 is gone, because confirmed
confirmedTxs = List(
3, // block2B
10, // microBlock
31 // microBlock
).foldMapK(mkTransactionWithChangesMap),
failedTxs = mkUtxTransactionMap(30) // init.utxUpdate
),
requestNextBlockchainEvent = true
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.wavesplatform.dex.grpc.integration.clients.domain
import java.nio.charset.StandardCharsets

import cats.Monoid
import cats.implicits._
import com.wavesplatform.dex.domain.account.KeyPair
import com.wavesplatform.dex.domain.asset.Asset.{IssuedAsset, Waves}
import com.wavesplatform.dex.domain.bytes.ByteStr
Expand Down Expand Up @@ -142,7 +143,7 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven
}

"Resolved" - {
"on a key block with a higher height" in {
"on a block with a higher height" in {
val block4 = WavesBlock(
ref = BlockRef(height = 4, id = ByteStr(Array[Byte](98, 1, 1, 1, 0))),
reference = block3.ref.id,
Expand All @@ -167,7 +168,8 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven
),
lostDiffIndex = Monoid.empty[DiffIndex],
lostTxIds = Map.empty,
confirmedTxs = block3.confirmedTxs ++ block4.confirmedTxs
newConfirmedTxs = block3.confirmedTxs ++ block4.confirmedTxs,
commonTxIds = Set.empty
))
}

Expand Down Expand Up @@ -197,7 +199,59 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven
),
lostDiffIndex = Monoid.empty[DiffIndex],
lostTxIds = Map.empty,
confirmedTxs = microBlock.confirmedTxs
newConfirmedTxs = microBlock.confirmedTxs,
commonTxIds = Set.empty
))
}

"after the key block with the higher height" in {
val microBlock1 = WavesBlock(
ref = BlockRef(height = 1, id = ByteStr(Array[Byte](98, 2))),
reference = block1.ref.id,
changes = BlockchainBalance(
regular = Map(alice -> Map(usd -> 69)),
outgoingLeasing = Map(bob -> 2L)
),
tpe = WavesBlock.Type.MicroBlock,
confirmedTxs = (10 to 12).toList.foldMapK(mkTransactionWithChangesMap)
)

val block2B = WavesBlock(
ref = BlockRef(height = 2, id = ByteStr(Array[Byte](98, 1, 0))),
reference = block1.ref.id,
changes = BlockchainBalance(
regular = Map(bob -> Map(usd -> 35)),
outgoingLeasing = Map.empty
),
tpe = WavesBlock.Type.FullBlock,
confirmedTxs = mkTransactionWithChangesMap(2) ++
mkTransactionWithChangesMap(11) // migrated from microBlock1
)

val microBlock2 = WavesBlock(
ref = BlockRef(height = 2, id = ByteStr(Array[Byte](98, 1, 1))),
reference = block2B.ref.id,
changes = BlockchainBalance(
regular = Map(bob -> Map(Waves -> 11)),
outgoingLeasing = Map(alice -> 9L)
),
tpe = WavesBlock.Type.MicroBlock,
confirmedTxs = mkTransactionWithChangesMap(100) ++
mkTransactionWithChangesMap(12) // migrated from microBlock1
)

val fork = WavesFork(
mkChain(Vector(microBlock1, block1), 99),
mkChain(Vector(block2B, block1), 98)
)

fork.withBlock(microBlock2) should matchTo[Status](Status.Resolved(
activeChain = mkChain(Vector(microBlock2, block2B, block1), 98),
newChanges = block2B.changes |+| microBlock2.changes,
lostDiffIndex = microBlock1.diffIndex,
lostTxIds = mkTransactionWithChangesMap(10), // from microBlock1
newConfirmedTxs = List(2, 100).foldMapK(mkTransactionWithChangesMap),
commonTxIds = Set(mkTxId(11), mkTxId(12))
))
}

Expand Down Expand Up @@ -234,7 +288,8 @@ class WavesForkTestSuite extends WavesIntegrationSuiteBase with ScalaCheckDriven
newChanges = microBlock2.changes,
lostDiffIndex = Monoid.empty[DiffIndex],
lostTxIds = Map.empty,
confirmedTxs = microBlock2.confirmedTxs
newConfirmedTxs = microBlock2.confirmedTxs,
commonTxIds = Set.empty
))
}
}
Expand Down

0 comments on commit 21a9e67

Please sign in to comment.