Skip to content

Commit

Permalink
DEX-1344 Prepare release 2.3.5 (#619)
Browse files Browse the repository at this point in the history
* Fixed logging issues with PARSER_ERROR;
* Rewriting MultipleMatchersOrderCancelTestSuite.
  • Loading branch information
vsuharnikov authored Aug 17, 2021
1 parent 2f0fba0 commit b7e6e7f
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package com.wavesplatform.dex.it.api

import cats.implicits.catsStdInstancesForTry

import java.net.InetAddress
import java.nio.file.{Files, Path, Paths}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicReference
import cats.instances.future._
import com.github.dockerjava.api.command.CreateNetworkCmd
import com.github.dockerjava.api.model.Network.Ipam
import com.google.common.primitives.Ints.toByteArray
import com.google.common.util.concurrent.ThreadFactoryBuilder
import sttp.client3._
import com.wavesplatform.dex.domain.utils.ScorexLogging
import com.wavesplatform.dex.it.docker.BaseContainer
import com.wavesplatform.dex.it.sttp.LoggingSttpBackend
import mouse.any._
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.testcontainers.containers.Network
import org.testcontainers.containers.Network.NetworkImpl
import sttp.client3._
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend
import cats.instances.future._

import java.net.InetAddress
import java.nio.file.{Files, Path, Paths}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining._
import scala.util.{Random, Try}

trait BaseContainersKit extends ScorexLogging {
Expand All @@ -36,7 +34,7 @@ trait BaseContainersKit extends ScorexLogging {

protected val networkName = s"waves-${Random.nextInt(Int.MaxValue)}"

protected val network: NetworkImpl =
protected val network: Network =
Network
.builder()
.createNetworkCmdModifier { cmd: CreateNetworkCmd =>
Expand Down Expand Up @@ -110,7 +108,7 @@ trait BaseContainersKit extends ScorexLogging {
Option(System.getProperty("waves.it.logging.dir"))
.map(Paths get _)
.getOrElse(defaultDir)
.unsafeTap(Files.createDirectories(_))
.tap(Files.createDirectories(_))
}

protected def stopBaseContainers(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ trait HasDex { self: BaseContainersKit =>

protected def dexInitialSuiteConfig: Config = ConfigFactory.empty()

protected lazy val dexRunConfig: Config = dexQueueConfig(ThreadLocalRandom.current.nextInt(0, Int.MaxValue))
protected val kafkaTopicName = s"dex-${ThreadLocalRandom.current.nextInt(0, Int.MaxValue)}"

protected lazy val dexRunConfig: Config = kafkaServer.fold(ConfigFactory.empty())(dexQueueConfig(_, kafkaTopicName))

protected def kafkaServer: Option[String] = Option(System.getenv("KAFKA_SERVER"))

protected def dexQueueConfig(queueId: Int): Config =
kafkaServer.fold(ConfigFactory.empty()) { kafkaServer =>
ConfigFactory.parseString(s"""waves.dex.events-queue {
| type = kafka
| kafka {
| servers = "$kafkaServer"
| topic = "dex-$queueId"
| }
|}""".stripMargin)
}
protected def dexQueueConfig(kafkaServer: String, topicName: String): Config = ConfigFactory.parseString(
s"""waves.dex.events-queue {
| type = kafka
| kafka {
| servers = "$kafkaServer"
| topic = "$topicName"
| }
|}""".stripMargin
)

protected def createDex(
name: String,
Expand All @@ -45,7 +46,7 @@ trait HasDex { self: BaseContainersKit =>
): DexContainer =
DexContainer(name, networkName, network, getIp(name), runConfig, suiteInitialConfig, localLogsDir, image) unsafeTap addKnownContainer

lazy val dex1: DexContainer = createDex("dex-1")
protected lazy val dex1: DexContainer = createDex("dex-1")

protected def createKafkaTopic(name: String, server: Option[String] = kafkaServer): Unit = server.foreach { server =>
val adminClient = mkKafkaAdminClient(server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ trait HasWavesNode { self: BaseContainersKit =>
netAlias
) unsafeTap addKnownContainer

lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1")
protected lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1")
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ abstract class BaseContainer(protected val baseContainerPath: String, private va
}

override def start(): Unit = {
beginStart()
endStart()
}

def beginStart(): Unit = {
Option(underlying.containerId).fold(super.start())(_ => sendStartCmd())

Iterator
Expand All @@ -167,7 +172,9 @@ abstract class BaseContainer(protected val baseContainerPath: String, private va
.zipWithIndex
.find { case (state, attempt) => state.getRunning || attempt == 20 }
.fold(log.warn(s"Can't start ${underlying.containerId}"))(_ => ())
}

def endStart(): Unit = {
invalidateCaches()
logExposedPortsInfo("Started container ")
waitReady()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ import com.wavesplatform.dex.it.fp.CanRepeat
import com.wavesplatform.dex.it.resources.getRawContentFromResource
import com.wavesplatform.dex.it.sttp.LoggingSttpBackend
import com.wavesplatform.dex.settings.utils.ConfigOps.ConfigOps
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.Network.NetworkImpl
import org.testcontainers.containers.{BindMode, Network}

import java.net.InetSocketAddress
import java.nio.file.{Path, Paths}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

import scala.jdk.CollectionConverters._
import scala.util.Try

final case class DexContainer private (override val internalIp: String, underlying: GenericContainer)(
implicit
Expand Down Expand Up @@ -116,7 +114,7 @@ object DexContainer extends ScorexLogging {
def apply(
name: String,
networkName: String,
network: NetworkImpl,
network: Network,
internalIp: String,
runConfig: Config,
suiteInitialConfig: Config,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.wavesplatform.dex.it.docker

import java.net.InetSocketAddress
import java.nio.file.{Path, Paths}
import cats.tagless.FunctorK
import com.dimafeng.testcontainers.GenericContainer
import com.typesafe.config.Config
Expand All @@ -13,10 +11,11 @@ import com.wavesplatform.dex.it.cache.CachedData
import com.wavesplatform.dex.it.resources.getRawContentFromResource
import com.wavesplatform.dex.it.sttp.LoggingSttpBackend
import com.wavesplatform.dex.settings.utils.ConfigOps.ConfigOps
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.Network.NetworkImpl
import org.testcontainers.containers.{BindMode, Network}
import sttp.model.StatusCode

import java.net.InetSocketAddress
import java.nio.file.{Path, Paths}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

Expand Down Expand Up @@ -100,7 +99,7 @@ object WavesNodeContainer extends ScorexLogging {
def apply(
name: String,
networkName: String,
network: NetworkImpl,
network: Network,
internalIp: String,
runConfig: Config,
suiteInitialConfig: Config,
Expand Down
10 changes: 4 additions & 6 deletions dex-it/src/test/resources/dex-servers/logback-container.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- And additional file. The base configuration is dex/src/package/doc/logback.xml -->
<included>
<conversionRule conversionWord="traceID" converterClass="kamon.instrumentation.logback.tools.TraceIDConverter" />
<conversionRule conversionWord="spanID" converterClass="kamon.instrumentation.logback.tools.SpanIDConverter" />

<property name="logback.common.pattern" value="${%date{HH:mm:ss.SSS,UTC} %-5level [%.25thread] %logger{26} [%traceID] [%spanID] - %msg%n%rEx}"/>
<conversionRule conversionWord="traceID" converterClass="kamon.instrumentation.logback.tools.CompactTraceIDConverter" />
<conversionRule conversionWord="spanID" converterClass="kamon.instrumentation.logback.tools.CompactSpanIDConverter" />

<appender name="BRIEF" class="ch.qos.logback.core.FileAppender">
<file>${logback.brief.fullPath}</file>
Expand All @@ -14,7 +12,7 @@
<level>TRACE</level>
</filter>
<encoder>
<pattern>${logback.common.pattern}</pattern>
<pattern><![CDATA[%date{HH:mm:ss.SSS,UTC} %-5level [%.25thread] %logger{26} %traceID%spanID{}- %msg%n%rEx]]></pattern>
</encoder>
</appender>

Expand All @@ -26,7 +24,7 @@
<level>TRACE</level>
</filter>
<encoder>
<pattern>${logback.common.pattern}</pattern>
<pattern><![CDATA[%date{HH:mm:ss.SSS,UTC} %-5level [%.25thread] %logger{26} %traceID%spanID{}- %msg%n%rEx]]></pattern>
</encoder>
</appender>

Expand Down
1 change: 1 addition & 0 deletions dex-it/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<logger name="io.netty" level="INFO"/>
<logger name="io.swagger" level="OFF"/>
<logger name="com.github.dockerjava.zerodep.shaded.org.apache" level="INFO"/>
<logger name="com.github.dockerjava.httpclient5" level="INFO"/>

<root level="TRACE">
<appender-ref ref="${waves.it.logging.appender:-STDOUT}"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,69 @@
package com.wavesplatform.it.sync.networking

import com.github.dockerjava.api.command.CreateNetworkCmd
import com.github.dockerjava.api.model.ContainerNetwork
import com.typesafe.config.{Config, ConfigFactory}
import com.wavesplatform.dex.api.http.entities.HttpOrderStatus.Status
import com.wavesplatform.dex.domain.asset.Asset.Waves
import com.wavesplatform.dex.domain.order.OrderType
import com.wavesplatform.dex.it.docker.DexContainer
import com.wavesplatform.dex.it.docker.{DexContainer, WavesNodeContainer}
import com.wavesplatform.it.MatcherSuiteBase
import com.wavesplatform.it.tags.DexItExternalKafkaRequired
import org.testcontainers.containers.Network

import java.util.concurrent.ThreadLocalRandom
import scala.jdk.CollectionConverters.MapHasAsScala

@DexItExternalKafkaRequired
class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase {

override protected def dexInitialSuiteConfig: Config =
ConfigFactory.parseString(s"""waves.dex.price-assets = [ "$UsdId", "WAVES" ]""".stripMargin)
override protected def dexInitialSuiteConfig: Config = ConfigFactory.parseString(
s"""waves.dex {
| price-assets = [ "$UsdId", "WAVES" ]
|}""".stripMargin
)

private lazy val internalNetwork =
Network
.builder()
.createNetworkCmdModifier { cmd: CreateNetworkCmd =>
cmd
.withName(s"MultipleMatchersOrderCancelTestSuite${ThreadLocalRandom.current().nextInt()}")
.withInternal(true) // Disable internet, thus break Kafka connection
}
.build()

// Matchers will connect to this Node through the internalNetwork
override protected lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1", netAlias = None)
protected lazy val dex2: DexContainer = createDex("dex-2")

override protected def beforeAll(): Unit = {
wavesNode1.start()
val containerNetwork = new ContainerNetwork().withNetworkID(internalNetwork.getId).withAliases(WavesNodeContainer.wavesNodeNetAlias)

dex1.dockerClient
.connectToNetworkCmd()
.withContainerId(wavesNode1.containerId)
.withContainerNetwork(containerNetwork)
.exec()

broadcastAndAwait(IssueUsdTx, IssueEthTx)
dex1.start()
dex2.start()

dex1.beginStart()
dex1.dockerClient
.connectToNetworkCmd()
.withContainerId(dex1.containerId)
.withContainerNetwork(new ContainerNetwork().withNetworkID(internalNetwork.getId))
.exec()
dex1.endStart()

dex2.beginStart()
dex2.dockerClient
.connectToNetworkCmd()
.withContainerId(dex2.containerId)
.withContainerNetwork(new ContainerNetwork().withNetworkID(internalNetwork.getId))
.exec()
dex2.endStart()
}

/**
Expand All @@ -35,10 +78,10 @@ class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase {

val acc1 = mkAccountWithBalance(15.015.waves -> Waves)
val acc2 = mkAccountWithBalance(0.015.waves -> Waves, 15.usd -> usd)
val acc3 = mkAccountWithBalance(1.waves -> Waves, 10.eth -> eth) // Account for fake orders

val ts = System.currentTimeMillis()
val sellOrders = (1 to 5).map { amt =>
val sellOrders = (0 to 4).map { i =>
val amt = i + 1
mkOrderDP(acc1, wavesUsdPair, OrderType.SELL, amt.waves, amt, ts = ts + amt) // To cancel latest first
}

Expand All @@ -48,27 +91,59 @@ class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase {
// will cancel remained orders due to balance changes
// (which were caused by exchange transactions from DEX-2)

dex1.api.saveSnapshots
dex1.restartWithNewSuiteConfig(ConfigFactory.parseString(s"waves.dex.events-queue.type = local").withFallback(dexInitialSuiteConfig))
// HACK: Because we switched the queue, we need to place 5 orders to move offset of queue.
// If we don't do this, internal cancels will be ignored by order books.
(1 to 5).foreach { _ =>
dex1.api.place(mkOrderDP(acc3, ethWavesPair, OrderType.SELL, 1.eth, 1))
}
val inspect = dex1.dockerClient
.inspectContainerCmd(dex1.containerId)
.exec()

val submittedOrders = (1 to 3).map { amt =>
val (defaultNetworkName, defaultNetworkId) = inspect.getNetworkSettings.getNetworks.asScala
.collectFirst {
case (name, network) if name.startsWith("waves-") => (name, network.getNetworkID)
}
.getOrElse(throw new RuntimeException("Can't find network"))

step(s"dex1: disconnecting from $defaultNetworkName")

dex1.dockerClient
.disconnectFromNetworkCmd()
.withContainerId(dex1.container.getContainerId)
.withNetworkId(defaultNetworkId)
.exec()

Iterator
.continually {
Thread.sleep(1000)
log.info(s"dex1: checking disconnected from $defaultNetworkName/$defaultNetworkId")
dex1.dockerClient
.inspectContainerCmd(dex1.containerId)
.exec()
}
.dropWhile(_.getNetworkSettings.getNetworks.containsKey(defaultNetworkName))
.take(1)
.foreach { _ =>
step("dex1: disconnected")
}

val submittedOrders = (0 to 2).map { i =>
val amt = i + 1
mkOrderDP(acc2, wavesUsdPair, OrderType.BUY, amt.waves, amt)
}
submittedOrders.foreach(placeAndAwaitAtDex(_, Status.Filled, dex2))
submittedOrders.foreach(waitForOrderAtNode(_, dex2.api))

step("Orders placed")

// Enough to find exchange transactions in UTX and cancel orders #3-4
Thread.sleep(5000)

(0 to 2).foreach { i =>
dex1.api.waitForOrderStatus(sellOrders(i), Status.Accepted)
dex2.api.orderStatusByAssetPairAndId(sellOrders(i)).status shouldBe Status.Filled
}

// Matcher should prevent sell orders from cancelling!
(3 to 4).foreach { i =>
dex1.api.waitForOrderStatus(sellOrders(i), Status.Accepted)
// Matcher should not cancel rest orders!
withClue("Matcher should not cancel rest orders: ") {
(3 to 4).foreach { i =>
dex2.api.orderStatusByAssetPairAndId(sellOrders(i)).status shouldBe Status.Accepted
}
}
}
}
Loading

0 comments on commit b7e6e7f

Please sign in to comment.