diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/BaseContainersKit.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/BaseContainersKit.scala index 1be77abcb1..48c4c6d0b7 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/BaseContainersKit.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/BaseContainersKit.scala @@ -1,29 +1,27 @@ package com.wavesplatform.dex.it.api import cats.implicits.catsStdInstancesForTry - -import java.net.InetAddress -import java.nio.file.{Files, Path, Paths} -import java.time.LocalDateTime -import java.time.format.DateTimeFormatter -import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicReference +import cats.instances.future._ import com.github.dockerjava.api.command.CreateNetworkCmd import com.github.dockerjava.api.model.Network.Ipam import com.google.common.primitives.Ints.toByteArray import com.google.common.util.concurrent.ThreadFactoryBuilder -import sttp.client3._ import com.wavesplatform.dex.domain.utils.ScorexLogging import com.wavesplatform.dex.it.docker.BaseContainer import com.wavesplatform.dex.it.sttp.LoggingSttpBackend -import mouse.any._ import org.asynchttpclient.DefaultAsyncHttpClientConfig import org.testcontainers.containers.Network -import org.testcontainers.containers.Network.NetworkImpl +import sttp.client3._ import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend -import cats.instances.future._ +import java.net.InetAddress +import java.nio.file.{Files, Path, Paths} +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.{ExecutionContext, Future} +import scala.util.chaining._ import scala.util.{Random, Try} trait BaseContainersKit extends ScorexLogging { @@ -36,7 +34,7 @@ trait BaseContainersKit extends ScorexLogging { protected val networkName = s"waves-${Random.nextInt(Int.MaxValue)}" - protected val network: NetworkImpl = + protected val network: Network = Network .builder() .createNetworkCmdModifier { cmd: CreateNetworkCmd => @@ -110,7 +108,7 @@ trait BaseContainersKit extends ScorexLogging { Option(System.getProperty("waves.it.logging.dir")) .map(Paths get _) .getOrElse(defaultDir) - .unsafeTap(Files.createDirectories(_)) + .tap(Files.createDirectories(_)) } protected def stopBaseContainers(): Unit = { diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala index 457e60fe59..8b48253cb2 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/dex/HasDex.scala @@ -22,20 +22,21 @@ trait HasDex { self: BaseContainersKit => protected def dexInitialSuiteConfig: Config = ConfigFactory.empty() - protected lazy val dexRunConfig: Config = dexQueueConfig(ThreadLocalRandom.current.nextInt(0, Int.MaxValue)) + protected val kafkaTopicName = s"dex-${ThreadLocalRandom.current.nextInt(0, Int.MaxValue)}" + + protected lazy val dexRunConfig: Config = kafkaServer.fold(ConfigFactory.empty())(dexQueueConfig(_, kafkaTopicName)) protected def kafkaServer: Option[String] = Option(System.getenv("KAFKA_SERVER")) - protected def dexQueueConfig(queueId: Int): Config = - kafkaServer.fold(ConfigFactory.empty()) { kafkaServer => - ConfigFactory.parseString(s"""waves.dex.events-queue { - | type = kafka - | kafka { - | servers = "$kafkaServer" - | topic = "dex-$queueId" - | } - |}""".stripMargin) - } + protected def dexQueueConfig(kafkaServer: String, topicName: String): Config = ConfigFactory.parseString( + s"""waves.dex.events-queue { + | type = kafka + | kafka { + | servers = "$kafkaServer" + | topic = "$topicName" + | } + |}""".stripMargin + ) protected def createDex( name: String, @@ -45,7 +46,7 @@ trait HasDex { self: BaseContainersKit => ): DexContainer = DexContainer(name, networkName, network, getIp(name), runConfig, suiteInitialConfig, localLogsDir, image) unsafeTap addKnownContainer - lazy val dex1: DexContainer = createDex("dex-1") + protected lazy val dex1: DexContainer = createDex("dex-1") protected def createKafkaTopic(name: String, server: Option[String] = kafkaServer): Unit = server.foreach { server => val adminClient = mkKafkaAdminClient(server) diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala index adbe951e46..9438c101ab 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/api/node/HasWavesNode.scala @@ -38,5 +38,5 @@ trait HasWavesNode { self: BaseContainersKit => netAlias ) unsafeTap addKnownContainer - lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1") + protected lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1") } diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/BaseContainer.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/BaseContainer.scala index 7e61f48645..ee2070427b 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/BaseContainer.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/BaseContainer.scala @@ -157,6 +157,11 @@ abstract class BaseContainer(protected val baseContainerPath: String, private va } override def start(): Unit = { + beginStart() + endStart() + } + + def beginStart(): Unit = { Option(underlying.containerId).fold(super.start())(_ => sendStartCmd()) Iterator @@ -167,7 +172,9 @@ abstract class BaseContainer(protected val baseContainerPath: String, private va .zipWithIndex .find { case (state, attempt) => state.getRunning || attempt == 20 } .fold(log.warn(s"Can't start ${underlying.containerId}"))(_ => ()) + } + def endStart(): Unit = { invalidateCaches() logExposedPortsInfo("Started container ") waitReady() diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/DexContainer.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/DexContainer.scala index f3592b2857..dd07335fb5 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/DexContainer.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/DexContainer.scala @@ -16,15 +16,13 @@ import com.wavesplatform.dex.it.fp.CanRepeat import com.wavesplatform.dex.it.resources.getRawContentFromResource import com.wavesplatform.dex.it.sttp.LoggingSttpBackend import com.wavesplatform.dex.settings.utils.ConfigOps.ConfigOps -import org.testcontainers.containers.BindMode -import org.testcontainers.containers.Network.NetworkImpl +import org.testcontainers.containers.{BindMode, Network} import java.net.InetSocketAddress import java.nio.file.{Path, Paths} import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try - import scala.jdk.CollectionConverters._ +import scala.util.Try final case class DexContainer private (override val internalIp: String, underlying: GenericContainer)( implicit @@ -116,7 +114,7 @@ object DexContainer extends ScorexLogging { def apply( name: String, networkName: String, - network: NetworkImpl, + network: Network, internalIp: String, runConfig: Config, suiteInitialConfig: Config, diff --git a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/WavesNodeContainer.scala b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/WavesNodeContainer.scala index a9e2526950..aed5ae0881 100644 --- a/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/WavesNodeContainer.scala +++ b/dex-it-common/src/main/scala/com/wavesplatform/dex/it/docker/WavesNodeContainer.scala @@ -1,7 +1,5 @@ package com.wavesplatform.dex.it.docker -import java.net.InetSocketAddress -import java.nio.file.{Path, Paths} import cats.tagless.FunctorK import com.dimafeng.testcontainers.GenericContainer import com.typesafe.config.Config @@ -13,10 +11,11 @@ import com.wavesplatform.dex.it.cache.CachedData import com.wavesplatform.dex.it.resources.getRawContentFromResource import com.wavesplatform.dex.it.sttp.LoggingSttpBackend import com.wavesplatform.dex.settings.utils.ConfigOps.ConfigOps -import org.testcontainers.containers.BindMode -import org.testcontainers.containers.Network.NetworkImpl +import org.testcontainers.containers.{BindMode, Network} import sttp.model.StatusCode +import java.net.InetSocketAddress +import java.nio.file.{Path, Paths} import scala.concurrent.{ExecutionContext, Future} import scala.util.Try @@ -100,7 +99,7 @@ object WavesNodeContainer extends ScorexLogging { def apply( name: String, networkName: String, - network: NetworkImpl, + network: Network, internalIp: String, runConfig: Config, suiteInitialConfig: Config, diff --git a/dex-it/src/test/resources/dex-servers/logback-container.xml b/dex-it/src/test/resources/dex-servers/logback-container.xml index 8934159c93..5e46a97487 100644 --- a/dex-it/src/test/resources/dex-servers/logback-container.xml +++ b/dex-it/src/test/resources/dex-servers/logback-container.xml @@ -1,10 +1,8 @@ - - - - + + ${logback.brief.fullPath} @@ -14,7 +12,7 @@ TRACE - ${logback.common.pattern} + @@ -26,7 +24,7 @@ TRACE - ${logback.common.pattern} + diff --git a/dex-it/src/test/resources/logback-test.xml b/dex-it/src/test/resources/logback-test.xml index 3522642aa4..c02e849b8d 100644 --- a/dex-it/src/test/resources/logback-test.xml +++ b/dex-it/src/test/resources/logback-test.xml @@ -27,6 +27,7 @@ + diff --git a/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala b/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala index 69a010d3a9..d55f2dcb7a 100644 --- a/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala +++ b/dex-it/src/test/scala/com/wavesplatform/it/sync/networking/MultipleMatchersOrderCancelTestSuite.scala @@ -1,26 +1,69 @@ package com.wavesplatform.it.sync.networking +import com.github.dockerjava.api.command.CreateNetworkCmd +import com.github.dockerjava.api.model.ContainerNetwork import com.typesafe.config.{Config, ConfigFactory} import com.wavesplatform.dex.api.http.entities.HttpOrderStatus.Status import com.wavesplatform.dex.domain.asset.Asset.Waves import com.wavesplatform.dex.domain.order.OrderType -import com.wavesplatform.dex.it.docker.DexContainer +import com.wavesplatform.dex.it.docker.{DexContainer, WavesNodeContainer} import com.wavesplatform.it.MatcherSuiteBase import com.wavesplatform.it.tags.DexItExternalKafkaRequired +import org.testcontainers.containers.Network + +import java.util.concurrent.ThreadLocalRandom +import scala.jdk.CollectionConverters.MapHasAsScala @DexItExternalKafkaRequired class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase { - override protected def dexInitialSuiteConfig: Config = - ConfigFactory.parseString(s"""waves.dex.price-assets = [ "$UsdId", "WAVES" ]""".stripMargin) + override protected def dexInitialSuiteConfig: Config = ConfigFactory.parseString( + s"""waves.dex { + | price-assets = [ "$UsdId", "WAVES" ] + |}""".stripMargin + ) + + private lazy val internalNetwork = + Network + .builder() + .createNetworkCmdModifier { cmd: CreateNetworkCmd => + cmd + .withName(s"MultipleMatchersOrderCancelTestSuite${ThreadLocalRandom.current().nextInt()}") + .withInternal(true) // Disable internet, thus break Kafka connection + } + .build() + // Matchers will connect to this Node through the internalNetwork + override protected lazy val wavesNode1: WavesNodeContainer = createWavesNode("waves-1", netAlias = None) protected lazy val dex2: DexContainer = createDex("dex-2") override protected def beforeAll(): Unit = { wavesNode1.start() + val containerNetwork = new ContainerNetwork().withNetworkID(internalNetwork.getId).withAliases(WavesNodeContainer.wavesNodeNetAlias) + + dex1.dockerClient + .connectToNetworkCmd() + .withContainerId(wavesNode1.containerId) + .withContainerNetwork(containerNetwork) + .exec() + broadcastAndAwait(IssueUsdTx, IssueEthTx) - dex1.start() - dex2.start() + + dex1.beginStart() + dex1.dockerClient + .connectToNetworkCmd() + .withContainerId(dex1.containerId) + .withContainerNetwork(new ContainerNetwork().withNetworkID(internalNetwork.getId)) + .exec() + dex1.endStart() + + dex2.beginStart() + dex2.dockerClient + .connectToNetworkCmd() + .withContainerId(dex2.containerId) + .withContainerNetwork(new ContainerNetwork().withNetworkID(internalNetwork.getId)) + .exec() + dex2.endStart() } /** @@ -35,10 +78,10 @@ class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase { val acc1 = mkAccountWithBalance(15.015.waves -> Waves) val acc2 = mkAccountWithBalance(0.015.waves -> Waves, 15.usd -> usd) - val acc3 = mkAccountWithBalance(1.waves -> Waves, 10.eth -> eth) // Account for fake orders val ts = System.currentTimeMillis() - val sellOrders = (1 to 5).map { amt => + val sellOrders = (0 to 4).map { i => + val amt = i + 1 mkOrderDP(acc1, wavesUsdPair, OrderType.SELL, amt.waves, amt, ts = ts + amt) // To cancel latest first } @@ -48,27 +91,59 @@ class MultipleMatchersOrderCancelTestSuite extends MatcherSuiteBase { // will cancel remained orders due to balance changes // (which were caused by exchange transactions from DEX-2) - dex1.api.saveSnapshots - dex1.restartWithNewSuiteConfig(ConfigFactory.parseString(s"waves.dex.events-queue.type = local").withFallback(dexInitialSuiteConfig)) - // HACK: Because we switched the queue, we need to place 5 orders to move offset of queue. - // If we don't do this, internal cancels will be ignored by order books. - (1 to 5).foreach { _ => - dex1.api.place(mkOrderDP(acc3, ethWavesPair, OrderType.SELL, 1.eth, 1)) - } + val inspect = dex1.dockerClient + .inspectContainerCmd(dex1.containerId) + .exec() - val submittedOrders = (1 to 3).map { amt => + val (defaultNetworkName, defaultNetworkId) = inspect.getNetworkSettings.getNetworks.asScala + .collectFirst { + case (name, network) if name.startsWith("waves-") => (name, network.getNetworkID) + } + .getOrElse(throw new RuntimeException("Can't find network")) + + step(s"dex1: disconnecting from $defaultNetworkName") + + dex1.dockerClient + .disconnectFromNetworkCmd() + .withContainerId(dex1.container.getContainerId) + .withNetworkId(defaultNetworkId) + .exec() + + Iterator + .continually { + Thread.sleep(1000) + log.info(s"dex1: checking disconnected from $defaultNetworkName/$defaultNetworkId") + dex1.dockerClient + .inspectContainerCmd(dex1.containerId) + .exec() + } + .dropWhile(_.getNetworkSettings.getNetworks.containsKey(defaultNetworkName)) + .take(1) + .foreach { _ => + step("dex1: disconnected") + } + + val submittedOrders = (0 to 2).map { i => + val amt = i + 1 mkOrderDP(acc2, wavesUsdPair, OrderType.BUY, amt.waves, amt) } submittedOrders.foreach(placeAndAwaitAtDex(_, Status.Filled, dex2)) submittedOrders.foreach(waitForOrderAtNode(_, dex2.api)) + step("Orders placed") + + // Enough to find exchange transactions in UTX and cancel orders #3-4 + Thread.sleep(5000) + (0 to 2).foreach { i => - dex1.api.waitForOrderStatus(sellOrders(i), Status.Accepted) + dex2.api.orderStatusByAssetPairAndId(sellOrders(i)).status shouldBe Status.Filled } - // Matcher should prevent sell orders from cancelling! - (3 to 4).foreach { i => - dex1.api.waitForOrderStatus(sellOrders(i), Status.Accepted) + // Matcher should not cancel rest orders! + withClue("Matcher should not cancel rest orders: ") { + (3 to 4).foreach { i => + dex2.api.orderStatusByAssetPairAndId(sellOrders(i)).status shouldBe Status.Accepted + } } } } diff --git a/dex-ws-load/src/test/resources/logback.xml b/dex-ws-load/src/test/resources/logback.xml index 58156846c2..94a00dc8e8 100644 --- a/dex-ws-load/src/test/resources/logback.xml +++ b/dex-ws-load/src/test/resources/logback.xml @@ -1,19 +1,19 @@  - - - %d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx - - false - + + + %d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx + + false + - - - - + + + + - - - + + + diff --git a/dex/src/main/scala/com/wavesplatform/dex/time/NTP.scala b/dex/src/main/scala/com/wavesplatform/dex/time/NTP.scala index 097272bb04..4ac81810f1 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/time/NTP.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/time/NTP.scala @@ -49,7 +49,7 @@ class NTP(ntpServer: String) extends Time with ScorexLogging with AutoCloseable case _: SocketTimeoutException => None case t: Throwable => - log.warn("Problems with NTP: ", t) + log.warn(s"Problems with NTP: ${Option(t.getMessage).getOrElse(t.getClass.getName)}") None } } diff --git a/dex/src/main/scala/kamon/instrumentation/logback/tools/CompactSpanIDConverter.scala b/dex/src/main/scala/kamon/instrumentation/logback/tools/CompactSpanIDConverter.scala new file mode 100644 index 0000000000..3065fc4d2c --- /dev/null +++ b/dex/src/main/scala/kamon/instrumentation/logback/tools/CompactSpanIDConverter.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.logback.tools + +import ch.qos.logback.classic.pattern.ClassicConverter +import ch.qos.logback.classic.spi.ILoggingEvent +import kamon.Kamon +import kamon.trace.Identifier + +class CompactSpanIDConverter extends ClassicConverter { + + override def convert(event: ILoggingEvent): String = { + val currentSpan = Kamon.currentSpan() + val spanID = currentSpan.id + + if (spanID == Identifier.Empty) "" + else s"[${spanID.string}] " + } + +} diff --git a/dex/src/main/scala/kamon/instrumentation/logback/tools/CompactTraceIDConverter.scala b/dex/src/main/scala/kamon/instrumentation/logback/tools/CompactTraceIDConverter.scala new file mode 100644 index 0000000000..8db50df2af --- /dev/null +++ b/dex/src/main/scala/kamon/instrumentation/logback/tools/CompactTraceIDConverter.scala @@ -0,0 +1,18 @@ +package kamon.instrumentation.logback.tools + +import ch.qos.logback.classic.pattern.ClassicConverter +import ch.qos.logback.classic.spi.ILoggingEvent +import kamon.Kamon +import kamon.trace.Identifier + +class CompactTraceIDConverter extends ClassicConverter { + + override def convert(event: ILoggingEvent): String = { + val currentSpan = Kamon.currentSpan() + val traceID = currentSpan.trace.id + + if (traceID == Identifier.Empty) "" + else s"[${traceID.string}] " + } + +}