From 04e8f6dc96533a353a6746c4434a4f3f41b7ef8c Mon Sep 17 00:00:00 2001 From: webwarrior Date: Thu, 8 Feb 2024 14:22:36 +0100 Subject: [PATCH] Add GrpcModels project for server/client interop Added GrpcModels project for server/client interop which includes message models (that were in WebSocketApp), as well as Marshalling module (code from RIM). --- Directory.Packages.props | 1 + FX.sln | 6 ++ src/FX.GrpcClient/FX.GrpcClient.csproj | 4 ++ src/FX.GrpcClient/Instance.cs | 9 +++ src/FX.GrpcModels/FX.GrpcModels.fsproj | 17 ++++++ src/FX.GrpcModels/Marshalling.fs | 72 ++++++++++++++++++++++++ src/FX.GrpcModels/Models.fs | 27 +++++++++ src/FX.GrpcService/FX.GrpcService.csproj | 6 ++ src/FX.GrpcService/Services/FXService.cs | 46 ++++++++++++++- src/FX.Tests/E2ETests.cs | 4 +- src/FX.Tests/FX.Tests.csproj | 1 + 11 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 src/FX.GrpcModels/FX.GrpcModels.fsproj create mode 100644 src/FX.GrpcModels/Marshalling.fs create mode 100644 src/FX.GrpcModels/Models.fs diff --git a/Directory.Packages.props b/Directory.Packages.props index 5486585..9b71626 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -12,6 +12,7 @@ + diff --git a/FX.sln b/FX.sln index 4ea8f49..9546a1a 100644 --- a/FX.sln +++ b/FX.sln @@ -12,6 +12,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FX.GrpcService", "src\FX.Gr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FX.GrpcClient", "src\FX.GrpcClient\FX.GrpcClient.csproj", "{578D4048-175B-41BC-8EC3-FC83FF137139}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FX.GrpcModels", "src\FX.GrpcModels\FX.GrpcModels.fsproj", "{11B2E30C-FCFE-41EB-A76D-CF9E95A844C4}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -38,6 +40,10 @@ Global {578D4048-175B-41BC-8EC3-FC83FF137139}.Debug|Any CPU.Build.0 = Debug|Any CPU {578D4048-175B-41BC-8EC3-FC83FF137139}.Release|Any CPU.ActiveCfg = Release|Any CPU {578D4048-175B-41BC-8EC3-FC83FF137139}.Release|Any CPU.Build.0 = Release|Any CPU + {11B2E30C-FCFE-41EB-A76D-CF9E95A844C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {11B2E30C-FCFE-41EB-A76D-CF9E95A844C4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {11B2E30C-FCFE-41EB-A76D-CF9E95A844C4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {11B2E30C-FCFE-41EB-A76D-CF9E95A844C4}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/FX.GrpcClient/FX.GrpcClient.csproj b/src/FX.GrpcClient/FX.GrpcClient.csproj index 5a3f93a..5a2f887 100644 --- a/src/FX.GrpcClient/FX.GrpcClient.csproj +++ b/src/FX.GrpcClient/FX.GrpcClient.csproj @@ -16,6 +16,10 @@ + + + + Protos\fx.proto diff --git a/src/FX.GrpcClient/Instance.cs b/src/FX.GrpcClient/Instance.cs index d49bec9..d98fd72 100644 --- a/src/FX.GrpcClient/Instance.cs +++ b/src/FX.GrpcClient/Instance.cs @@ -6,6 +6,8 @@ using GrpcService; +using GrpcModels; + namespace GrpcClient { public class Instance @@ -32,5 +34,12 @@ public async Task SendMessage(string message) Console.WriteLine($"Got response: {reply.MsgOut}"); return reply.MsgOut; } + + public async Task SendMessage(TMessage message) + { + var text = Marshaller.Serialize(message); + var answer = await SendMessage(text); + return new Message(answer); + } } } diff --git a/src/FX.GrpcModels/FX.GrpcModels.fsproj b/src/FX.GrpcModels/FX.GrpcModels.fsproj new file mode 100644 index 0000000..1c0e5ba --- /dev/null +++ b/src/FX.GrpcModels/FX.GrpcModels.fsproj @@ -0,0 +1,17 @@ + + + + netstandard2.0 + true + + + + + + + + + + + + diff --git a/src/FX.GrpcModels/Marshalling.fs b/src/FX.GrpcModels/Marshalling.fs new file mode 100644 index 0000000..81af74e --- /dev/null +++ b/src/FX.GrpcModels/Marshalling.fs @@ -0,0 +1,72 @@ +namespace GrpcModels + +open System +open System.Reflection +open System.Text.Json + +module VersionHelper = + let CURRENT_VERSION = + Assembly + .GetExecutingAssembly() + .GetName() + .Version.ToString() + +type IMarshallingWrapper = + abstract member Value: obj + +type MarshallingWrapper<'T> = + { + Version: string + TypeName: string + Value: 'T + } + + static member New(value: 'T) = + { + Value = value + Version = VersionHelper.CURRENT_VERSION + TypeName = typeof<'T>.FullName + } + + interface IMarshallingWrapper with + member this.Value = this.Value :> obj + +module Marshaller = + + let ExtractMetadata(json: string) : Type * Version = + let wrapper = JsonSerializer.Deserialize> json + let typ = Type.GetType wrapper.TypeName + let version = Version wrapper.Version + typ, version + + let Serialize<'T>(object: 'T) : string = + let wrapper = MarshallingWrapper.New object + JsonSerializer.Serialize wrapper + + let Deserialize<'T>(json: string) : 'T = + if isNull json then + raise <| ArgumentNullException "json" + + let wrapper = JsonSerializer.Deserialize> json + wrapper.Value + + let DeserializeAbstract (json: string) (targetType: Type) : obj = + if isNull json then + raise <| ArgumentNullException "json" + + let wrapperGenericType = typedefof> + + let wrapperType = + wrapperGenericType.MakeGenericType(Array.singleton targetType) + + let wrapperObj = JsonSerializer.Deserialize(json, wrapperType) + + if isNull wrapperObj then + failwith "Deserialization failed: result is null" + elif wrapperObj.GetType() <> wrapperType then + failwithf + "Deserialization failed, resulting type: %s" + (wrapperObj.GetType().FullName) + + let wrapper = wrapperObj :?> IMarshallingWrapper + wrapper.Value diff --git a/src/FX.GrpcModels/Models.fs b/src/FX.GrpcModels/Models.fs new file mode 100644 index 0000000..89223f7 --- /dev/null +++ b/src/FX.GrpcModels/Models.fs @@ -0,0 +1,27 @@ +namespace GrpcModels + +open System + +type Message = + { + Text : string + } + +type LimitOrder = + { + Price: decimal + Side: string + Quantity: decimal + } + +type MarketOrder = + { + Side: string + Quantity: decimal + } + +type CancelOrderRequest = + { + OrderId: Guid + // TODO: add Market + } diff --git a/src/FX.GrpcService/FX.GrpcService.csproj b/src/FX.GrpcService/FX.GrpcService.csproj index ca52704..ed8feb5 100644 --- a/src/FX.GrpcService/FX.GrpcService.csproj +++ b/src/FX.GrpcService/FX.GrpcService.csproj @@ -12,6 +12,12 @@ + + + + + + diff --git a/src/FX.GrpcService/Services/FXService.cs b/src/FX.GrpcService/Services/FXService.cs index 63e8177..220a59c 100644 --- a/src/FX.GrpcService/Services/FXService.cs +++ b/src/FX.GrpcService/Services/FXService.cs @@ -5,15 +5,59 @@ using Grpc.Core; +using FsharpExchangeDotNetStandard; +using System.IO; + namespace GrpcService.Services { public class FXService : FXGrpcService.FXGrpcServiceBase { + private readonly Exchange exchange = new Exchange(Persistence.Redis); + public override async Task GenericMethod(GenericInputParam request, ServerCallContext context) { Console.WriteLine($"Received {request.MsgIn}"); - return await Task.FromResult(new GenericOutputParam { MsgOut = "received " + request.MsgIn }); + var (type, _version) = GrpcModels.Marshaller.ExtractMetadata(request.MsgIn); + + var deserializedRequest = GrpcModels.Marshaller.DeserializeAbstract(request.MsgIn, type); + + if (deserializedRequest is GrpcModels.Message message) + { + return await Task.FromResult(new GenericOutputParam { MsgOut = "received " + message.Text }); + } + else if (deserializedRequest is GrpcModels.LimitOrder { } limitOrder) + { + var orderInfo = new OrderInfo(Guid.NewGuid(), Side.Parse(limitOrder.Side), limitOrder.Quantity); + var exchangeLimitOrder = new LimitOrder(orderInfo, limitOrder.Price); + var limitOrderReq = new LimitOrderRequest(exchangeLimitOrder, LimitOrderRequestType.Normal); + var marketForLimitOrder = new Market(Currency.BTC, Currency.USD); + + // TODO: make async + var matchType = exchange.SendLimitOrder(limitOrderReq, marketForLimitOrder); + return await Task.FromResult(new GenericOutputParam { MsgOut = GrpcModels.Marshaller.Serialize(matchType) }); + } + else if (deserializedRequest is GrpcModels.MarketOrder { } marketOrder) + { + var orderInfo = new OrderInfo(Guid.NewGuid(), Side.Parse(marketOrder.Side), marketOrder.Quantity); + var marketForMarketOrder = new Market(Currency.BTC, Currency.USD); + + // TODO: make async + exchange.SendMarketOrder(orderInfo, marketForMarketOrder); + // return empty string? + return await Task.FromResult(new GenericOutputParam { MsgOut = String.Empty }); + } + else if (deserializedRequest is GrpcModels.CancelOrderRequest { } cancelOrderRequest) + { + // TODO: make async + exchange.CancelLimitOrder(cancelOrderRequest.OrderId); + // return empty string? + return await Task.FromResult(new GenericOutputParam { MsgOut = String.Empty }); + } + else + { + throw new InvalidOperationException("Unable to deserialize request: " + request.MsgIn); + } } public override async Task GenericStreamOutputMethod(GenericInputParam request, IServerStreamWriter responseStream, ServerCallContext context) diff --git a/src/FX.Tests/E2ETests.cs b/src/FX.Tests/E2ETests.cs index 0a25042..5e2e806 100644 --- a/src/FX.Tests/E2ETests.cs +++ b/src/FX.Tests/E2ETests.cs @@ -49,9 +49,9 @@ async public Task GrpcE2ETest() client.Connect(); var message = "hello"; - var response = await client.SendMessage(message); + var response = await client.SendMessage(new GrpcModels.Message{ Text = message }); - Assert.That(response, Is.EqualTo("received " + message)); + Assert.That(response.Text, Is.EqualTo("received " + message)); } } } diff --git a/src/FX.Tests/FX.Tests.csproj b/src/FX.Tests/FX.Tests.csproj index 0800f09..64d8226 100644 --- a/src/FX.Tests/FX.Tests.csproj +++ b/src/FX.Tests/FX.Tests.csproj @@ -18,5 +18,6 @@ +