diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/HasToxiProxy.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/HasToxiProxy.scala index 97a08eeab9..f1e362505b 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/HasToxiProxy.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/HasToxiProxy.scala @@ -6,6 +6,7 @@ import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy import scala.collection.JavaConverters._ trait HasToxiProxy { self: BaseContainersKit => + protected val toxiProxyHostName = s"$networkName-toxiproxy" private val container: ToxiproxyContainer = new ToxiproxyContainer() diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala index ddbc353fbd..eb0797a860 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApi.scala @@ -80,6 +80,7 @@ trait DexApi[F[_]] extends HasWaitReady[F] { def tryLastOffset: F[Either[MatcherError, Long]] def tryOldestSnapshotOffset: F[Either[MatcherError, Long]] def tryAllSnapshotOffsets: F[Either[MatcherError, Map[AssetPair, Long]]] + def trySaveSnapshots: F[Either[MatcherError, Unit]] // TODO move @@ -326,6 +327,10 @@ object DexApi { override def tryAllSnapshotOffsets: F[Either[MatcherError, Map[AssetPair, Long]]] = tryParseJson(sttp.get(uri"$apiUri/debug/allSnapshotOffsets").headers(apiKeyHeaders)) + override def trySaveSnapshots: F[Either[MatcherError, Unit]] = tryUnit { + sttp.post(uri"$apiUri/debug/saveSnapshots").headers(apiKeyHeaders) + } + override def waitReady: F[Unit] = { def request: F[Boolean] = M.handleErrorWith(tryAllOrderBooks.map(_.isRight)) { case _: SocketException | _: JsResultException => M.pure(false) diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApiOps.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApiOps.scala index f5b8132781..a1e6c00c18 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApiOps.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/DexApiOps.scala @@ -80,5 +80,6 @@ object DexApiOps { def lastOffset: F[Long] = explicitGet(self.tryLastOffset) def oldestSnapshotOffset: F[Long] = explicitGet(self.tryOldestSnapshotOffset) def allSnapshotOffsets: F[Map[AssetPair, Long]] = explicitGet(self.tryAllSnapshotOffsets) + def saveSnapshots: F[Unit] = explicitGet(self.trySaveSnapshots) } } diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala index 3a310b8b96..21b79917cb 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala @@ -36,7 +36,7 @@ trait HasDex { self: BaseContainersKit => } protected def createDex(name: String, runConfig: Config = dexRunConfig, suiteInitialConfig: Config = dexInitialSuiteConfig): DexContainer = - DexContainer(name, networkName, network, getIp(name), dexRunConfig, dexInitialSuiteConfig, localLogsDir) unsafeTap addKnownContainer + DexContainer(name, networkName, network, getIp(name), runConfig, suiteInitialConfig, localLogsDir) unsafeTap addKnownContainer lazy val dex1: DexContainer = createDex("dex-1") diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala index 18399f2ae7..c787b00a6c 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala @@ -21,7 +21,7 @@ trait HasWavesNode { self: BaseContainersKit => protected def createWavesNode(name: String, runConfig: Config = wavesNodeRunConfig, suiteInitialConfig: Config = wavesNodeInitialSuiteConfig): WavesNodeContainer = - WavesNodeContainer(name, networkName, network, getIp(name), wavesNodeRunConfig, wavesNodeInitialSuiteConfig, localLogsDir) unsafeTap addKnownContainer + WavesNodeContainer(name, networkName, network, getIp(name), runConfig, suiteInitialConfig, localLogsDir) unsafeTap addKnownContainer lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1") } diff --git a/dex-it/src/test/scala/com/wavesplatform/it/api/ApiExtensions.scala b/dex-it/src/test/scala/com/wavesplatform/it/api/ApiExtensions.scala index ca706cd011..13bde16bde 100644 --- a/dex-it/src/test/scala/com/wavesplatform/it/api/ApiExtensions.scala +++ b/dex-it/src/test/scala/com/wavesplatform/it/api/ApiExtensions.scala @@ -7,6 +7,7 @@ import com.wavesplatform.dex.domain.order.Order import com.wavesplatform.dex.it.api.dex.DexApi import com.wavesplatform.dex.it.api.node.{NodeApi, NodeApiExtensions} import com.wavesplatform.dex.it.api.responses.dex.{OrderBookHistoryItem, OrderStatus, OrderStatusResponse} +import com.wavesplatform.dex.it.docker.DexContainer import com.wavesplatform.it.{MatcherSuiteBase, api} import com.wavesplatform.wavesj.transactions.ExchangeTransaction import mouse.any._ @@ -15,9 +16,9 @@ import scala.collection.immutable.TreeMap trait ApiExtensions extends NodeApiExtensions { this: MatcherSuiteBase => - protected def placeAndAwaitAtDex(order: Order, expectedStatus: OrderStatus = OrderStatus.Accepted): OrderStatusResponse = { - dex1.api.place(order) - dex1.api.waitForOrderStatus(order, expectedStatus) + protected def placeAndAwaitAtDex(order: Order, expectedStatus: OrderStatus = OrderStatus.Accepted, dex: DexContainer = dex1): OrderStatusResponse = { + dex.api.place(order) + dex.api.waitForOrderStatus(order, expectedStatus) } protected def placeAndAwaitAtNode(order: Order, diff --git a/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala b/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala new file mode 100644 index 0000000000..dd30f94e6d --- /dev/null +++ b/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala @@ -0,0 +1,84 @@ +package com.wavesplatform.it.sync.networking + +import java.nio.charset.StandardCharsets +import java.util.concurrent.ThreadLocalRandom + +import com.typesafe.config.{Config, ConfigFactory} +import com.wavesplatform.dex.domain.account.KeyPair +import com.wavesplatform.dex.domain.asset.Asset +import com.wavesplatform.dex.domain.asset.Asset.Waves +import com.wavesplatform.dex.domain.bytes.ByteStr +import com.wavesplatform.dex.domain.order.OrderType +import com.wavesplatform.dex.it.api.responses.dex.OrderStatus +import com.wavesplatform.dex.it.docker.DexContainer +import com.wavesplatform.it.MatcherSuiteBase +import com.wavesplatform.it.tags.DexItKafkaRequired +import org.scalatest + +@DexItKafkaRequired +class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase { + + override protected def dexInitialSuiteConfig: Config = ConfigFactory.parseString(s"""waves.dex.price-assets = [ "$UsdId", "WAVES" ]""".stripMargin) + + protected lazy val dex2: DexContainer = createDex("dex-2") + + override protected def beforeAll(): Unit = { + wavesNode1.start() + broadcastAndAwait(IssueUsdTx) + dex1.start() + dex2.start() + } + + private def createAccountWithBalance(balances: (Long, Asset)*): KeyPair = { + val account = KeyPair(ByteStr(s"account-test-${ThreadLocalRandom.current().nextInt()}".getBytes(StandardCharsets.UTF_8))) + + balances.foreach { + case (balance, asset) => + asset.fold { scalatest.Assertions.succeed } { issuedAsset => + assert( + wavesNode1.api.assetBalance(alice, issuedAsset).balance >= balance, + s"Alice doesn't have enough balance in ${issuedAsset.toString} to make a transfer" + ) + } + broadcastAndAwait { mkTransfer(alice, account, balance, asset, 0.003.waves) } + } + account + } + + /** + * Assumptions: + * 1. DEX-1 is a master, DEX-2 is a slave; + * 2. Latency in direction Kafka -> DEX-1 is too high and is ok in direction Kafka -> DEX-2, or master DEX is much more busy than slave one; + * 3. DEX-1 and DEX-2 are connected to the same Node. + * + * In this case orders on DEX-1 might be cancelled due to balance changing on Node (which were caused by exchange transactions from DEX-2) + */ + "Tricky case when DEX-1 is slower than DEX-2 and it leads to order cancelling on DEX-1" in { + + val acc1 = createAccountWithBalance(15.015.waves -> Waves) + val acc2 = createAccountWithBalance(0.015.waves -> Waves, 15.usd -> usd) + + val sellOrders = (1 to 5).map { amt => + mkOrderDP(acc1, wavesUsdPair, OrderType.SELL, amt.waves, amt) + } + + sellOrders.foreach { placeAndAwaitAtDex(_) } + + // if DEX-1 will work with local queue, it won't receive buy orders placements and + // will cancel remained orders due to balance changes + // (which were caused by exchange transactions from DEX-2) + + dex1.api.saveSnapshots + dex1.restartWithNewSuiteConfig(ConfigFactory.parseString(s"waves.dex.events-queue.type = local").withFallback(dexInitialSuiteConfig)) + + (1 to 3).foreach { amt => + val order = mkOrderDP(acc2, wavesUsdPair, OrderType.BUY, amt.waves, amt) + dex2.api.place(order) + dex2.api.waitForOrderStatus(order, OrderStatus.Filled) + } + + // problem solution should prevent sell orders from cancelling! + dex1.api.waitForOrderStatus(sellOrders(4), OrderStatus.Cancelled) + dex1.api.waitForOrderStatus(sellOrders(3), OrderStatus.Cancelled) + } +}