Skip to content

Commit

Permalink
Updated WebSocket reading logic to eliminate large arrays allocation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xperiandri committed Mar 24, 2024
1 parent 9a2a107 commit 4cab5b4
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 28 deletions.
1 change: 1 addition & 0 deletions Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<FAKEVersion>6.*</FAKEVersion>
</PropertyGroup>
<ItemGroup Label="Common">
<PackageReference Update="Collections.Pooled" Version="1.0.*" />
<PackageReference Update="FParsec" Version="1.1.1" />
<PackageReference Include="FSharp.Core" Version="$(FSharpCoreVersion)">
<ExcludeAssets>contentFiles</ExcludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Collections.Pooled" />
<PackageReference Include="FsToolkit.ErrorHandling" />
<PackageReference Include="FsToolkit.ErrorHandling.TaskResult" />
<PackageReference Include="Giraffe" />
Expand Down
2 changes: 2 additions & 0 deletions src/FSharp.Data.GraphQL.Server.AspNetCore/GraphQLOptions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type IGraphQLOptions =
type GraphQLOptions<'Root> = {
SchemaExecutor : Executor<'Root>
RootFactory : HttpContext -> 'Root
/// The minimum rented array size to read a message from WebSocket
ReadBufferSize : int
SerializerOptions : JsonSerializerOptions
WebsocketOptions : GraphQLTransportWSOptions
} with
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
namespace FSharp.Data.GraphQL.Server.AspNetCore

open System
open System.Buffers
open System.Collections.Generic
open System.Diagnostics
open System.Linq
open System.Net.WebSockets
open System.Text.Json
open System.Text.Json.Serialization
Expand All @@ -11,6 +14,8 @@ open Microsoft.AspNetCore.Http
open Microsoft.Extensions.Hosting
open Microsoft.Extensions.Logging
open Microsoft.Extensions.Options

open Collections.Pooled
open FsToolkit.ErrorHandling

open FSharp.Data.GraphQL
Expand Down Expand Up @@ -47,9 +52,9 @@ type GraphQLWebSocketMiddleware<'Root>
static let invalidJsonInClientMessageError =
Result.Error <| InvalidMessage (4400, "Invalid json in client message")

let deserializeClientMessage (serializerOptions : JsonSerializerOptions) (msg : string) = taskResult {
let deserializeClientMessage (serializerOptions : JsonSerializerOptions) (msg : IReadOnlyPooledList<byte>) = taskResult {
try
return JsonSerializer.Deserialize<ClientMessage> (msg, serializerOptions)
return JsonSerializer.Deserialize<ClientMessage> (msg.Span, serializerOptions)
with
| :? InvalidWebsocketMessageException as ex ->
logger.LogError(ex, "Invalid websocket message:\n{payload}", msg)
Expand All @@ -75,31 +80,35 @@ type GraphQLWebSocketMiddleware<'Root>
&& not (theSocket.State = WebSocketState.Closed)

let receiveMessageViaSocket (cancellationToken : CancellationToken) (serializerOptions : JsonSerializerOptions) (socket : WebSocket) = taskResult {
let buffer = Array.zeroCreate 4096
let completeMessage = new List<byte> ()
let mutable segmentResponse : WebSocketReceiveResult = null
while (not cancellationToken.IsCancellationRequested)
&& socket |> isSocketOpen
&& ((segmentResponse = null)
|| (not segmentResponse.EndOfMessage)) do
try
let! r = socket.ReceiveAsync (new ArraySegment<byte> (buffer), cancellationToken)
segmentResponse <- r
completeMessage.AddRange (new ArraySegment<byte> (buffer, 0, r.Count))
with :? OperationCanceledException ->
()

// TODO: Allocate string only if a debugger is attached
let message =
completeMessage
|> Seq.filter (fun x -> x > 0uy)
|> Array.ofSeq
|> System.Text.Encoding.UTF8.GetString
if String.IsNullOrWhiteSpace message then
return ValueNone
else
let! result = message |> deserializeClientMessage serializerOptions
return ValueSome result
let buffer = ArrayPool.Shared.Rent options.ReadBufferSize
try
let completeMessage = new PooledList<byte> ()
let mutable segmentResponse : WebSocketReceiveResult = null
while (not cancellationToken.IsCancellationRequested)
&& socket |> isSocketOpen
&& ((segmentResponse = null)
|| (not segmentResponse.EndOfMessage)) do
try
let! r = socket.ReceiveAsync (new ArraySegment<byte> (buffer), cancellationToken)
segmentResponse <- r
completeMessage.AddRange (new ArraySegment<byte> (buffer, 0, r.Count))
with :? OperationCanceledException ->
()

if Debugger.IsAttached then
let message =
completeMessage
|> Seq.filter (fun x -> x > 0uy)
|> Array.ofSeq
|> System.Text.Encoding.UTF8.GetString
logger.LogInformation ("-> Request: {request}", message)
if completeMessage.All(fun b -> b = 0uy) then
return ValueNone
else
let! result = deserializeClientMessage serializerOptions completeMessage
return ValueSome result
finally
ArrayPool.Shared.Return buffer
}

let sendMessageViaSocket (jsonSerializerOptions) (socket : WebSocket) (message : ServerMessage) : Task = task {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ open System.Runtime.CompilerServices
open Microsoft.AspNetCore.Builder
open Microsoft.AspNetCore.Http
open Microsoft.Extensions.DependencyInjection
open FSharp.Data.GraphQL
open Microsoft.Extensions.Options
open FSharp.Data.GraphQL

[<AutoOpen; Extension>]
module ServiceCollectionExtensions =

let createStandardOptions executor rootFactory endpointUrl = {
SchemaExecutor = executor
RootFactory = rootFactory
ReadBufferSize = 4096
SerializerOptions = Json.serializerOptions
WebsocketOptions = {
EndpointUrl = endpointUrl
Expand Down

0 comments on commit 4cab5b4

Please sign in to comment.