Skip to content

Commit

Permalink
DEX-627 Orders cancelling reproduction test (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
ngadiyak authored Feb 27, 2020
1 parent 7380f22 commit b74b642
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy
import scala.collection.JavaConverters._

trait HasToxiProxy { self: BaseContainersKit =>

protected val toxiProxyHostName = s"$networkName-toxiproxy"

private val container: ToxiproxyContainer = new ToxiproxyContainer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ trait DexApi[F[_]] extends HasWaitReady[F] {
def tryLastOffset: F[Either[MatcherError, Long]]
def tryOldestSnapshotOffset: F[Either[MatcherError, Long]]
def tryAllSnapshotOffsets: F[Either[MatcherError, Map[AssetPair, Long]]]
def trySaveSnapshots: F[Either[MatcherError, Unit]]

// TODO move

Expand Down Expand Up @@ -326,6 +327,10 @@ object DexApi {
override def tryAllSnapshotOffsets: F[Either[MatcherError, Map[AssetPair, Long]]] =
tryParseJson(sttp.get(uri"$apiUri/debug/allSnapshotOffsets").headers(apiKeyHeaders))

override def trySaveSnapshots: F[Either[MatcherError, Unit]] = tryUnit {
sttp.post(uri"$apiUri/debug/saveSnapshots").headers(apiKeyHeaders)
}

override def waitReady: F[Unit] = {
def request: F[Boolean] = M.handleErrorWith(tryAllOrderBooks.map(_.isRight)) {
case _: SocketException | _: JsResultException => M.pure(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,6 @@ object DexApiOps {
def lastOffset: F[Long] = explicitGet(self.tryLastOffset)
def oldestSnapshotOffset: F[Long] = explicitGet(self.tryOldestSnapshotOffset)
def allSnapshotOffsets: F[Map[AssetPair, Long]] = explicitGet(self.tryAllSnapshotOffsets)
def saveSnapshots: F[Unit] = explicitGet(self.trySaveSnapshots)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait HasDex { self: BaseContainersKit =>
}

protected def createDex(name: String, runConfig: Config = dexRunConfig, suiteInitialConfig: Config = dexInitialSuiteConfig): DexContainer =
DexContainer(name, networkName, network, getIp(name), dexRunConfig, dexInitialSuiteConfig, localLogsDir) unsafeTap addKnownContainer
DexContainer(name, networkName, network, getIp(name), runConfig, suiteInitialConfig, localLogsDir) unsafeTap addKnownContainer

lazy val dex1: DexContainer = createDex("dex-1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trait HasWavesNode { self: BaseContainersKit =>
protected def createWavesNode(name: String,
runConfig: Config = wavesNodeRunConfig,
suiteInitialConfig: Config = wavesNodeInitialSuiteConfig): WavesNodeContainer =
WavesNodeContainer(name, networkName, network, getIp(name), wavesNodeRunConfig, wavesNodeInitialSuiteConfig, localLogsDir) unsafeTap addKnownContainer
WavesNodeContainer(name, networkName, network, getIp(name), runConfig, suiteInitialConfig, localLogsDir) unsafeTap addKnownContainer

lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.wavesplatform.dex.domain.order.Order
import com.wavesplatform.dex.it.api.dex.DexApi
import com.wavesplatform.dex.it.api.node.{NodeApi, NodeApiExtensions}
import com.wavesplatform.dex.it.api.responses.dex.{OrderBookHistoryItem, OrderStatus, OrderStatusResponse}
import com.wavesplatform.dex.it.docker.DexContainer
import com.wavesplatform.it.{MatcherSuiteBase, api}
import com.wavesplatform.wavesj.transactions.ExchangeTransaction
import mouse.any._
Expand All @@ -15,9 +16,9 @@ import scala.collection.immutable.TreeMap

trait ApiExtensions extends NodeApiExtensions { this: MatcherSuiteBase =>

protected def placeAndAwaitAtDex(order: Order, expectedStatus: OrderStatus = OrderStatus.Accepted): OrderStatusResponse = {
dex1.api.place(order)
dex1.api.waitForOrderStatus(order, expectedStatus)
protected def placeAndAwaitAtDex(order: Order, expectedStatus: OrderStatus = OrderStatus.Accepted, dex: DexContainer = dex1): OrderStatusResponse = {
dex.api.place(order)
dex.api.waitForOrderStatus(order, expectedStatus)
}

protected def placeAndAwaitAtNode(order: Order,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.wavesplatform.it.sync.networking

import java.nio.charset.StandardCharsets
import java.util.concurrent.ThreadLocalRandom

import com.typesafe.config.{Config, ConfigFactory}
import com.wavesplatform.dex.domain.account.KeyPair
import com.wavesplatform.dex.domain.asset.Asset
import com.wavesplatform.dex.domain.asset.Asset.Waves
import com.wavesplatform.dex.domain.bytes.ByteStr
import com.wavesplatform.dex.domain.order.OrderType
import com.wavesplatform.dex.it.api.responses.dex.OrderStatus
import com.wavesplatform.dex.it.docker.DexContainer
import com.wavesplatform.it.MatcherSuiteBase
import com.wavesplatform.it.tags.DexItKafkaRequired
import org.scalatest

@DexItKafkaRequired
class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase {

override protected def dexInitialSuiteConfig: Config = ConfigFactory.parseString(s"""waves.dex.price-assets = [ "$UsdId", "WAVES" ]""".stripMargin)

protected lazy val dex2: DexContainer = createDex("dex-2")

override protected def beforeAll(): Unit = {
wavesNode1.start()
broadcastAndAwait(IssueUsdTx)
dex1.start()
dex2.start()
}

private def createAccountWithBalance(balances: (Long, Asset)*): KeyPair = {
val account = KeyPair(ByteStr(s"account-test-${ThreadLocalRandom.current().nextInt()}".getBytes(StandardCharsets.UTF_8)))

balances.foreach {
case (balance, asset) =>
asset.fold { scalatest.Assertions.succeed } { issuedAsset =>
assert(
wavesNode1.api.assetBalance(alice, issuedAsset).balance >= balance,
s"Alice doesn't have enough balance in ${issuedAsset.toString} to make a transfer"
)
}
broadcastAndAwait { mkTransfer(alice, account, balance, asset, 0.003.waves) }
}
account
}

/**
* Assumptions:
* 1. DEX-1 is a master, DEX-2 is a slave;
* 2. Latency in direction Kafka -> DEX-1 is too high and is ok in direction Kafka -> DEX-2, or master DEX is much more busy than slave one;
* 3. DEX-1 and DEX-2 are connected to the same Node.
*
* In this case orders on DEX-1 might be cancelled due to balance changing on Node (which were caused by exchange transactions from DEX-2)
*/
"Tricky case when DEX-1 is slower than DEX-2 and it leads to order cancelling on DEX-1" in {

val acc1 = createAccountWithBalance(15.015.waves -> Waves)
val acc2 = createAccountWithBalance(0.015.waves -> Waves, 15.usd -> usd)

val sellOrders = (1 to 5).map { amt =>
mkOrderDP(acc1, wavesUsdPair, OrderType.SELL, amt.waves, amt)
}

sellOrders.foreach { placeAndAwaitAtDex(_) }

// if DEX-1 will work with local queue, it won't receive buy orders placements and
// will cancel remained orders due to balance changes
// (which were caused by exchange transactions from DEX-2)

dex1.api.saveSnapshots
dex1.restartWithNewSuiteConfig(ConfigFactory.parseString(s"waves.dex.events-queue.type = local").withFallback(dexInitialSuiteConfig))

(1 to 3).foreach { amt =>
val order = mkOrderDP(acc2, wavesUsdPair, OrderType.BUY, amt.waves, amt)
dex2.api.place(order)
dex2.api.waitForOrderStatus(order, OrderStatus.Filled)
}

// problem solution should prevent sell orders from cancelling!
dex1.api.waitForOrderStatus(sellOrders(4), OrderStatus.Cancelled)
dex1.api.waitForOrderStatus(sellOrders(3), OrderStatus.Cancelled)
}
}

0 comments on commit b74b642

Please sign in to comment.