Skip to content

Commit

Permalink
DEX-1332 Extend order WS information (#615)
Browse files Browse the repository at this point in the history
DEX-1332 extended order WS information with txMatchInfo
  • Loading branch information
JennaWestenra authored Aug 16, 2021
1 parent 90ebdd2 commit 2f0fba0
Show file tree
Hide file tree
Showing 27 changed files with 496 additions and 132 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,13 @@ dex/runMain com.wavesplatform.dex.cli.WavesDexCli here-your-arguments
Example:

```bash
# If installed package:
waves-dex-cli create-account-storage --address-scheme W --seed-format base64 --account-nonce 3 --output-directory /var/lib/waves-dex

# With Docker (an image is not available on Docker Hub, you should built it yourself):
docker run --rm --name matcher-cli -it -e MATCHER_HEAP_SIZE=512M -v ${PWD}/files:/var/lib/waves-dex/files \
--entrypoint /usr/share/waves-dex/bin/waves-dex-cli wavesplatform/matcher-server:latest \
create-account-storage --address-scheme W --seed-format base64 --account-nonce 3 --output-directory /var/lib/waves-dex /var/lib/waves-dex/files
```

here:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package com.wavesplatform.dex.it.api.websockets
import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.stream.Materializer
import com.softwaremill.diffx.{Derived, Diff}
import com.wavesplatform.dex.api.ws.connection.{WsConnection, WsConnectionOps}
import com.wavesplatform.dex.api.ws.entities.{WsAddressBalancesFilter, WsBalances, WsOrder}
import com.wavesplatform.dex.api.ws.entities.{WsAddressBalancesFilter, WsBalances, WsMatchTransactionInfo, WsOrder}
import com.wavesplatform.dex.api.ws.protocol._
import com.wavesplatform.dex.domain.account.KeyPair
import com.wavesplatform.dex.domain.asset.{Asset, AssetPair}
import com.wavesplatform.dex.domain.bytes.ByteStr
import com.wavesplatform.dex.error.ErrorFormatterContext
import com.wavesplatform.dex.fp.MapImplicits.MapOps
import com.wavesplatform.dex.it.config.PredefinedAssets
Expand All @@ -29,6 +31,12 @@ trait HasWebSockets extends BeforeAndAfterAll with BeforeAndAfterEach with HasJw
implicit protected val materializer: Materializer = Materializer.matFromSystem(system)
implicit protected val efc: ErrorFormatterContext = ErrorFormatterContext.from(assetDecimalsMap)

implicit protected val wsMatchTransactionInfoDiff: Derived[Diff[WsMatchTransactionInfo]] = Derived(
Diff.gen[WsMatchTransactionInfo].value
.ignore[WsMatchTransactionInfo, ByteStr](_.txId)
.ignore[WsMatchTransactionInfo, Long](_.timestamp)
)

protected def getWsStreamUri(dex: DexContainer, query: Map[String, String] = Map.empty): Uri =
Uri
.parseAbsolute(s"ws://127.0.0.1:${dex.restApiAddress.getPort}/ws/v0")
Expand Down Expand Up @@ -126,7 +134,8 @@ trait HasWebSockets extends BeforeAndAfterAll with BeforeAndAfterEach with HasJw
filledAmount = diff.filledAmount.orElse(orig.filledAmount),
filledFee = diff.filledFee.orElse(orig.filledFee),
avgWeighedPrice = diff.avgWeighedPrice.orElse(orig.avgWeighedPrice),
totalExecutedPriceAssets = diff.totalExecutedPriceAssets.orElse(orig.totalExecutedPriceAssets)
totalExecutedPriceAssets = diff.totalExecutedPriceAssets.orElse(orig.totalExecutedPriceAssets),
matchInfo = (orig.matchInfo ++ diff.matchInfo).distinct
)

protected def mergeAddressChanges(orig: WsAddressChanges, diff: WsAddressChanges): WsAddressChanges = WsAddressChanges(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ trait WsMessageOps {
def squashed: Map[Order.Id, WsOrder] =
self
.groupBy(_.id)
.map {
.flatMap {
case (id, orderChanges) =>
id -> orderChanges.foldLeft(orderChanges.head) {
orderChanges.reduceLeftOption[WsOrder] {
case (acc, oc) =>
acc.copy(
status = oc.status,
filledAmount = oc.filledAmount,
filledFee = oc.filledFee,
avgWeighedPrice = oc.avgWeighedPrice,
totalExecutedPriceAssets = oc.totalExecutedPriceAssets
totalExecutedPriceAssets = oc.totalExecutedPriceAssets,
matchInfo = acc.matchInfo ++ oc.matchInfo
)
}
}.map(c => id -> c)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.wavesplatform.dex.domain.crypto
import com.wavesplatform.dex.domain.model.Normalization
import com.wavesplatform.dex.domain.order.{Order, OrderType}
import com.wavesplatform.dex.domain.transaction.{ExchangeTransaction, ExchangeTransactionV2}
import com.wavesplatform.dex.domain.utils.EitherExt2
import com.wavesplatform.dex.it.config.GenesisConfig
import com.wavesplatform.dex.it.config.PredefinedAccounts.matcher
import com.wavesplatform.dex.it.waves.Implicits._
Expand Down Expand Up @@ -268,8 +267,7 @@ trait MkWavesEntities {
sellMatcherFee = sellOrder.matcherFee,
fee = matcherFee,
timestamp = timestamp
)
.explicitGet()
).transaction
}

def mkBurn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ class BouncingBalancesTestSuite extends WsSuiteBase {

override def wavesNodeInitialSuiteConfig: Config = ConfigFactory.parseString(s"""waves.miner.enable = false""".stripMargin)

private val minerNodeSuiteConfig: Config = ConfigFactory.parseString("""waves.miner {
enable = true
micro-block-interval = 1s
min-micro-block-age = 1s
}""")
private val minerNodeSuiteConfig: Config = ConfigFactory.parseString(
s"""waves.miner {
| enable = true
| micro-block-interval = 1s
| min-micro-block-age = 1s
|}""".stripMargin
)

lazy val wavesMinerNode: WavesNodeContainer = createWavesNode("waves-2", suiteInitialConfig = minerNodeSuiteConfig, netAlias = None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import cats.syntax.option._
import com.typesafe.config.{Config, ConfigFactory}
import com.wavesplatform.dex.Implicits.releasable
import com.wavesplatform.dex.api.http.entities.HttpOrderStatus
import com.wavesplatform.dex.api.http.entities.HttpOrderStatus.Status
import com.wavesplatform.dex.api.ws.connection.WsConnection
import com.wavesplatform.dex.api.ws.entities.{WsAddressBalancesFilter, WsBalances, WsOrder}
import com.wavesplatform.dex.api.ws.entities.{WsAddressBalancesFilter, WsBalances, WsMatchTransactionInfo, WsOrder}
import com.wavesplatform.dex.api.ws.protocol.{WsAddressChanges, WsAddressSubscribe, WsError, WsUnsubscribe}
import com.wavesplatform.dex.domain.account.KeyPair
import com.wavesplatform.dex.domain.account.KeyPair.toAddress
import com.wavesplatform.dex.domain.asset.Asset
import com.wavesplatform.dex.domain.asset.Asset.Waves
import com.wavesplatform.dex.domain.bytes.ByteStr
import com.wavesplatform.dex.domain.model.Denormalization
import com.wavesplatform.dex.domain.order.OrderType
import com.wavesplatform.dex.domain.order.OrderType.{BUY, SELL}
import com.wavesplatform.dex.error.{AddressAndPublicKeyAreIncompatible, SubscriptionTokenExpired, SubscriptionsLimitReached}
import com.wavesplatform.dex.it.test.Scripts
Expand Down Expand Up @@ -189,23 +192,26 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
filledAmount = 15.0,
filledFee = 0.0009,
avgWeighedPrice = 1.2,
totalExecutedPriceAssets = 18
totalExecutedPriceAssets = 18,
matchInfo = WsMatchTransactionInfo(ByteStr.empty, 0L, 1.2, 15.0, 18.0)
),
WsOrder(
mo.id,
status = OrderStatus.PartiallyFilled.name,
filledAmount = 40.0,
filledFee = 0.0024,
avgWeighedPrice = 1.1375,
totalExecutedPriceAssets = 45.5
totalExecutedPriceAssets = 45.5,
matchInfo = WsMatchTransactionInfo(ByteStr.empty, 0L, 1.1, 25.0, 27.5)
),
WsOrder(
mo.id,
status = OrderStatus.Filled.name,
filledAmount = 50.0,
filledFee = 0.003,
avgWeighedPrice = 1.11,
totalExecutedPriceAssets = 55.5
totalExecutedPriceAssets = 55.5,
matchInfo = WsMatchTransactionInfo(ByteStr.empty, 0L, 1.0, 10.0, 10.0)
)
)
}
Expand Down Expand Up @@ -237,7 +243,8 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
filledAmount = 10.0,
filledFee = 0.003,
avgWeighedPrice = 1.0,
totalExecutedPriceAssets = 10
totalExecutedPriceAssets = 10,
matchInfo = WsMatchTransactionInfo(ByteStr.empty, 0L, 1.0, 10.0, 10.0)
)
)
}
Expand Down Expand Up @@ -278,7 +285,8 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
filledAmount = 5.0.some,
filledFee = 0.0015.some,
avgWeighedPrice = 1.0.some,
totalExecutedPriceAssets = 5.0.some
totalExecutedPriceAssets = 5.0.some,
matchInfo = Seq(WsMatchTransactionInfo(ByteStr.empty, 0L, 1.0, 5.0, 5.0))
)
)
)
Expand All @@ -303,7 +311,6 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
assertChanges(wsc, squash = false)(Map(Waves -> WsBalances(10, 0), usd -> WsBalances(10, 0)))()
broadcastAndAwait(mkTransfer(acc, alice.toAddress, 2.usd, usd, feeAmount = 1.waves))
assertChanges(wsc)(Map(Waves -> WsBalances(9, 0), usd -> WsBalances(8, 0)))()

}
}

Expand Down Expand Up @@ -503,6 +510,140 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
}
}
}

"send matchTxInfo on order executions" - {

"when order partially filled" in {
val acc = mkAccountWithBalance(100.waves -> Waves, 50.usd -> usd)
Using(mkWsAddressConnection(acc)) { wsc =>
assertChanges(wsc, squash = false)(Map(usd -> WsBalances(50, 0), Waves -> WsBalances(100, 0)))()

val order = mkOrder(acc, wavesUsdPair, OrderType.SELL, 10.waves, 1.usd)
placeAndAwaitAtDex(order)

val firstCounterOrder = mkOrder(alice, wavesUsdPair, OrderType.BUY, 4.5.waves, 1.usd)
placeAndAwaitAtDex(firstCounterOrder, Status.Filled)

eventually(wsc.orderChanges should matchTo(List(
WsOrder.fromDomain(LimitOrder(order)),
WsOrder(
order.id(),
status = OrderStatus.PartiallyFilled.name,
filledAmount = 4.5,
filledFee = 0.00135,
avgWeighedPrice = 1.0,
totalExecutedPriceAssets = 4.5,
matchInfo = WsMatchTransactionInfo(ByteStr.empty, 0L, 1.0, 4.5, 4.5)
)
)))
wsc.clearMessages()

val secondCounterOrder = mkOrder(alice, wavesUsdPair, OrderType.BUY, 4.waves, 1.usd)
placeAndAwaitAtDex(secondCounterOrder, Status.Filled)

eventually(wsc.orderChanges should matchTo(List(WsOrder(
order.id(),
status = OrderStatus.PartiallyFilled.name,
filledAmount = 8.5,
filledFee = 0.00255,
avgWeighedPrice = 1.0,
totalExecutedPriceAssets = 8.5,
WsMatchTransactionInfo(ByteStr.empty, 0L, 1, 4.0, 4.0)
))))
}
dex1.api.cancelAllOrdersWithSig(acc)
}

"when order filled" in {
val acc = mkAccountWithBalance(100.waves -> Waves, 50.usd -> usd)
Using(mkWsAddressConnection(acc)) { wsc =>
val order = mkOrder(acc, wavesUsdPair, OrderType.SELL, 10.waves, 1.usd)
placeAndAwaitAtDex(order)

eventually(wsc.orderChanges should matchTo(List(WsOrder.fromDomain(LimitOrder(order)))))
wsc.clearMessages()

val counterOrder = mkOrder(alice, wavesUsdPair, OrderType.BUY, 10.waves, 1.usd)
placeAndAwaitAtDex(counterOrder, Status.Filled)

eventually(wsc.orderChanges should matchTo(List(WsOrder(
order.id(),
status = OrderStatus.Filled.name,
filledAmount = 10.0,
filledFee = 0.003,
avgWeighedPrice = 1.0,
totalExecutedPriceAssets = 10.0,
WsMatchTransactionInfo(ByteStr.empty, 0L, 1, 10.0, 10.0)
))))
}
dex1.api.cancelAllOrdersWithSig(acc)
}

"when filling market order" in {
val acc = mkAccountWithBalance(100.waves -> Waves, 150.usd -> usd)
Using(mkWsAddressConnection(acc)) { wsc =>
val aliceOrders = Seq(
mkOrder(acc, wavesUsdPair, OrderType.SELL, 10.waves, 1.2.usd),
mkOrder(acc, wavesUsdPair, OrderType.SELL, 20.waves, 1.1.usd),
mkOrder(acc, wavesUsdPair, OrderType.SELL, 30.waves, 1.3.usd)
)
val accountsOrder = mkOrder(acc, wavesUsdPair, OrderType.BUY, 50.waves, 1.3.usd)

aliceOrders.foreach(dex1.api.place)
aliceOrders.foreach(order => dex1.api.waitForOrderStatus(order, Status.Accepted))
dex1.api.placeMarket(accountsOrder)
dex1.api.waitForOrderStatus(accountsOrder, Status.Filled)

eventually(wsc.orderChanges.squashed(accountsOrder.id()) should matchTo(WsOrder.fromDomain(MarketOrder(
accountsOrder,
Long.MaxValue
)).copy(
status = OrderStatus.Filled.name.some,
filledAmount = 50.0.some,
filledFee = 0.003.some,
avgWeighedPrice = 1.2.some,
totalExecutedPriceAssets = 60.0.some,
matchInfo = Seq(
WsMatchTransactionInfo(ByteStr.empty, 0L, 1.1, 20.0, 22.0),
WsMatchTransactionInfo(ByteStr.empty, 0L, 1.2, 10.0, 12.0),
WsMatchTransactionInfo(ByteStr.empty, 0L, 1.3, 20.0, 26.0)
)
)))
}
}

"when trading with itself" in {
def copyWithCommonPart(wsOrder: WsOrder): WsOrder = wsOrder.copy(
status = OrderStatus.Filled.name.some,
filledAmount = 10.0.some,
filledFee = 0.003.some,
avgWeighedPrice = 1.0.some,
totalExecutedPriceAssets = 10.0.some,
matchInfo = Seq(WsMatchTransactionInfo(ByteStr.empty, 0L, 1, 10.0, 10.0))
)

val acc = mkAccountWithBalance(100.usd -> usd, 50.waves -> Waves)
Using(mkWsAddressConnection(acc)) { wsc =>
val order1 = mkOrder(acc, wavesUsdPair, OrderType.SELL, 10.waves, 1.usd)
placeAndAwaitAtDex(order1)

val order2 = mkOrder(acc, wavesUsdPair, OrderType.BUY, 10.waves, 1.usd)
placeAndAwaitAtDex(order2, Status.Filled)

eventually {
val orderChanges = wsc.orderChanges.squashed
orderChanges(order1.id()) should matchTo(
copyWithCommonPart(WsOrder.fromDomain(LimitOrder(order1)))
)
orderChanges(order2.id()) should matchTo(
copyWithCommonPart(WsOrder.fromDomain(LimitOrder(order2)))
)

orderChanges(order1.id()).matchInfo.head.txId shouldBe orderChanges(order2.id()).matchInfo.head.txId
}
}
}
}
}

"Second connection should get the actual data" in {
Expand Down Expand Up @@ -562,7 +703,6 @@ class WsAddressStreamTestSuite extends WsSuiteBase with TableDrivenPropertyCheck
}

"Subscription should be cancelled after jwt expiration" in {

val acc = mkAccountWithBalance(10.waves -> Waves)
Using.resource(mkWsAddressConnection(acc, dex1, subscriptionLifetime = 3.seconds)) { wsc =>
wsc.receiveAtLeastN[WsAddressChanges](1) // snapshot
Expand Down
Loading

0 comments on commit 2f0fba0

Please sign in to comment.