Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform Networking #3412

Merged
merged 35 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0513a4f
Implement a SyncSocketProvider based on managed websockets
fealebenpae Aug 4, 2023
ddc3c1c
Merge remote-tracking branch 'origin/main' into yg/websocket
fealebenpae Aug 4, 2023
f29e7d8
forgotten stuff
fealebenpae Aug 4, 2023
45bba6a
small fixes
fealebenpae Aug 8, 2023
0d85c3a
don’t close if already closed
fealebenpae Aug 8, 2023
fc61000
more error logging
fealebenpae Aug 8, 2023
b59669d
remove test
fealebenpae Aug 9, 2023
3735878
Ignore websockets nuget on Unity
fealebenpae Aug 9, 2023
8c201eb
error handling and reconnects
fealebenpae Aug 10, 2023
0b3ed53
unity dependencies
fealebenpae Aug 10, 2023
410708d
disable Maui tests
fealebenpae Aug 11, 2023
415eff0
address some code review comments
fealebenpae Aug 11, 2023
877efce
extract callback deletion
fealebenpae Aug 13, 2023
5c6cb46
fixes for iOS
fealebenpae Aug 13, 2023
beb2b4a
Revert "disable Maui tests"
fealebenpae Aug 13, 2023
1ed614d
adjust sync worker thread creation for Mono
fealebenpae Aug 13, 2023
c4d7dd1
bring back websocketexception
fealebenpae Aug 13, 2023
6f0282d
attempt fixes
fealebenpae Aug 14, 2023
4eb5a63
enable release assertions
fealebenpae Aug 14, 2023
9bddaf6
rework
fealebenpae Aug 14, 2023
3d23531
stuff
fealebenpae Aug 14, 2023
daaeac4
disable coverlet
fealebenpae Aug 14, 2023
8c532d4
enable crash reporting
fealebenpae Aug 14, 2023
01442cc
fix bad P/Invoke
fealebenpae Aug 14, 2023
6440e78
enable crash reporting for all .NET Core tests
fealebenpae Aug 14, 2023
a194522
add websocket header to CMakeLists.txt
fealebenpae Aug 14, 2023
104da2c
rethrow event loop exceptions
fealebenpae Aug 15, 2023
bf2af73
use file-scoped namespaces
fealebenpae Aug 15, 2023
b5a21cd
increase AWS Device Farm timeout
fealebenpae Aug 15, 2023
ac40766
Revert "increase AWS Device Farm timeout"
fealebenpae Aug 15, 2023
092b8ef
add AppConfiguration.UseManagedWebSockets opt-in
fealebenpae Aug 15, 2023
e3e50ba
changelog
fealebenpae Aug 15, 2023
55376df
update todo
fealebenpae Aug 15, 2023
9b14717
Merge remote-tracking branch 'origin/main' into yg/websocket
fealebenpae Aug 15, 2023
771d029
remove trailing whitespace
fealebenpae Aug 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Realm/Realm/Native/AppConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ internal MetadataPersistenceMode? MetadataPersistence

internal IntPtr managed_http_client;

internal IntPtr managed_websocket_provider;

internal UInt64 sync_connect_timeout_ms;

internal UInt64 sync_connection_linger_time_ms;
Expand Down
14 changes: 7 additions & 7 deletions Realm/Realm/Native/PrimitiveValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,6 @@ public readonly bool TryGetObjectHandle(Realm realm, [NotNullWhen(true)] out Obj
return false;
}

[StructLayout(LayoutKind.Sequential)]
private struct BinaryValue
{
public byte* data;
public IntPtr size;
}

[StructLayout(LayoutKind.Sequential)]
private struct LinkValue
{
Expand Down Expand Up @@ -348,4 +341,11 @@ public static StringValue AllocateFrom(string? value, Arena arena)

public static implicit operator string?(in StringValue value) => !value ? null : Encoding.UTF8.GetString(value.data, (int)value.size);
}

[StructLayout(LayoutKind.Sequential)]
internal unsafe struct BinaryValue
{
public byte* data;
public IntPtr size;
}
}
94 changes: 94 additions & 0 deletions Realm/Realm/Native/SyncSocketProvider.EventLoop.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2023 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

using System;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Realms.Native
{
internal partial class SyncSocketProvider
{
private class Timer
{
private readonly CancellationTokenSource _cts = new();

internal Timer(TimeSpan delay, IntPtr nativeCallback, ChannelWriter<IWork> workQueue)
{
Task.Delay(delay).ContinueWith(t =>
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
var status = Status.OK;
if (t.IsCanceled)
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
status = new() { code = NativeMethods.ErrorCode.OperationAborted, reason = "Timer canceled" };
}

return workQueue.WriteAsync(new EventLoopWork(nativeCallback, status));
});
}

internal void Cancel()
{
_cts.Cancel();
_cts.Dispose();
}
}

private class EventLoopWork : IWork
{
private readonly IntPtr _nativeCallback;
private readonly Status _status;

public EventLoopWork(IntPtr nativeCallback, Status status)
{
_nativeCallback = nativeCallback;
_status = status;
}

public unsafe void Execute()
{
if (!string.IsNullOrEmpty(_status.reason))
{
var bytes = Encoding.UTF8.GetBytes(_status.reason);
fixed (byte* data = bytes)
{
var reason = new StringValue { data = data, size = bytes.Length };
NativeMethods.run_callback(_nativeCallback, _status.code, reason);
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
NativeMethods.run_callback(_nativeCallback, _status.code, new());
}
}
}

private partial async Task WorkThread()
{
while (await _workQueue.Reader.WaitToReadAsync())
{
while (_workQueue.Reader.TryRead(out var work))
{
work.Execute();
}
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
203 changes: 203 additions & 0 deletions Realm/Realm/Native/SyncSocketProvider.WebSocket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2023 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

using System;
using System.Buffers;
using System.IO;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Realms.Native
{
internal partial class SyncSocketProvider
{
private class Socket : IDisposable
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly ClientWebSocket _webSocket;
private readonly IntPtr _observer;

Check warning on line 35 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (macos-latest, osx-x64)

Field 'SyncSocketProvider.Socket._observer' is never assigned to, and will always have its default value [/Users/runner/work/realm-dotnet/realm-dotnet/Realm/Realm/Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 35 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (windows-latest, win-x64)

Field 'SyncSocketProvider.Socket._observer' is never assigned to, and will always have its default value [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 35 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Package / NuGet

Field 'SyncSocketProvider.Socket._observer' is never assigned to, and will always have its default value [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
private readonly ChannelWriter<IWork> _workQueue;

private readonly Uri _uri;
private readonly Task _workerThread;

private MemoryStream _receiveBuffer = new();

internal Socket(ClientWebSocket webSocket, ChannelWriter<IWork> workQueue, Uri uri)

Check warning on line 43 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (macos-latest, osx-x64)

Code should not contain trailing whitespace [/Users/runner/work/realm-dotnet/realm-dotnet/Realm/Realm/Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 43 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (windows-latest, win-x64)

Code should not contain trailing whitespace [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 43 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Package / NuGet

Code should not contain trailing whitespace [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]
{
_webSocket = webSocket;
_workQueue = workQueue;
_uri = uri;
_workerThread = Task.Factory.StartNew(ReadThread, creationOptions: TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach).Unwrap();
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}

private async Task ReadThread()
{
try
{
await _webSocket.ConnectAsync(_uri, default);
await _workQueue.WriteAsync(new WebSocketConnectedWork(_observer, _webSocket.SubProtocol));
}
catch (WebSocketException e)

Check warning on line 58 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (macos-latest, osx-x64)

The variable 'e' is declared but never used [/Users/runner/work/realm-dotnet/realm-dotnet/Realm/Realm/Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 58 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (windows-latest, win-x64)

The variable 'e' is declared but never used [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 58 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Package / NuGet

The variable 'e' is declared but never used [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we send a websocket closed work in case of a non-websocket error - e.g. I can imagine we can hit an out of memory or similar system exception that would prevent us from connecting but Sync will be waiting forever for the connection.

{
// TODO: handle connect error

Check warning on line 60 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Verify TODOs

Realm/Realm/Native/SyncSocketProvider.WebSocket.cs#L60

TODO entry doesn't have a link to Github issue or Jira ticket handle connect error
return;
}

var buffer = new byte[32 * 1024];
do
{
try
{
var result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), default);
if (result.MessageType == WebSocketMessageType.Binary)
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
await _receiveBuffer.WriteAsync(buffer, 0, result.Count);
if (result.EndOfMessage)
{
var current_buffer = _receiveBuffer;
_receiveBuffer = new MemoryStream();
await _workQueue.WriteAsync(new BinaryMessageReceivedWork(current_buffer, _observer));
}
}
else if (result.MessageType == WebSocketMessageType.Close)
{
await _workQueue.WriteAsync(new ClosedMessageReceivedWork(result.CloseStatus!.Value, result.CloseStatusDescription, _observer));
}
}
catch(WebSocketException e)

Check warning on line 85 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (macos-latest, osx-x64)

The variable 'e' is declared but never used [/Users/runner/work/realm-dotnet/realm-dotnet/Realm/Realm/Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 85 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (windows-latest, win-x64)

The variable 'e' is declared but never used [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]

Check warning on line 85 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Package / NuGet

The variable 'e' is declared but never used [D:\a\realm-dotnet\realm-dotnet\Realm\Realm\Realm.csproj::TargetFramework=netstandard2.0]
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO: handle read error

Check warning on line 87 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Verify TODOs

Realm/Realm/Native/SyncSocketProvider.WebSocket.cs#L87

TODO entry doesn't have a link to Github issue or Jira ticket handle read error
}
} while (_webSocket.State == WebSocketState.Open);

Check warning on line 89 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Test / Weaver (macos-latest, osx-x64)

Braces for multi-line statements should not share line [/Users/runner/work/realm-dotnet/realm-dotnet/Realm/Realm/Realm.csproj::TargetFramework=netstandard2.0]
}

public void Write(BinaryValue data, IntPtr native_callback)
{
var buffer = ArrayPool<byte>.Shared.Rent((int)data.size);
unsafe
{
Marshal.Copy((IntPtr)data.data, buffer, 0, (int)data.size);
}

_webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Binary, true, default).ContinueWith(async t =>
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
ArrayPool<byte>.Shared.Return(buffer);

var status = Status.OK;
if (t.IsFaulted)
{
// TODO: handle write error

Check warning on line 107 in Realm/Realm/Native/SyncSocketProvider.WebSocket.cs

View workflow job for this annotation

GitHub Actions / Verify TODOs

Realm/Realm/Native/SyncSocketProvider.WebSocket.cs#L107

TODO entry doesn't have a link to Github issue or Jira ticket handle write error
}

await _workQueue.WriteAsync(new EventLoopWork(native_callback, status));
});
}

public void Dispose()
{
_webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, default).ContinueWith(t =>
{
_webSocket.Dispose();
_receiveBuffer.Dispose();
});
}
}

private abstract class WebSocketWork : IWork
{
private readonly IntPtr _observer;

protected WebSocketWork(IntPtr observer)
{
_observer = observer;
}

protected abstract void Execute(IntPtr observer);

void IWork.Execute()
{
Execute(_observer);
}
}

private sealed class WebSocketConnectedWork : WebSocketWork
{
private readonly string _protocol;

public WebSocketConnectedWork(IntPtr observer, string protocol)
: base(observer)
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
_protocol = protocol;
}

protected unsafe override void Execute(IntPtr observer)
{
var bytes = Encoding.UTF8.GetBytes(_protocol);
fixed (byte* data = bytes)
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
{
NativeMethods.observer_connected_handler(observer, new() { data = data, size = bytes.Length });
}
}
}

private sealed class BinaryMessageReceivedWork : WebSocketWork
{
private MemoryStream _receiveBuffer;

public BinaryMessageReceivedWork(MemoryStream receiveBuffer, IntPtr observer)
: base(observer)
{
_receiveBuffer = receiveBuffer;
}

protected unsafe override void Execute(IntPtr observer)
{
using var buffer = _receiveBuffer;
fixed (byte* data = buffer.GetBuffer())
{
NativeMethods.observer_binary_message_received(observer, new() { data = data, size = (IntPtr)buffer.Length });
}
}
}

private sealed class ClosedMessageReceivedWork : WebSocketWork
{
private WebSocketCloseStatus _status;
private string _description;

public ClosedMessageReceivedWork(WebSocketCloseStatus status, string description, IntPtr observer)
: base(observer)
{
_status = status;
_description = description;
}

protected unsafe override void Execute(IntPtr observer)
{
var bytes = Encoding.UTF8.GetBytes(_description);
fixed(byte* data = bytes)
{
NativeMethods.observer_closed_handler(observer, _status, new() { data = data, size = bytes.Length });
}
}
}
}
}
Loading
Loading