Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platform Networking #3412

Merged
merged 35 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0513a4f
Implement a SyncSocketProvider based on managed websockets
fealebenpae Aug 4, 2023
ddc3c1c
Merge remote-tracking branch 'origin/main' into yg/websocket
fealebenpae Aug 4, 2023
f29e7d8
forgotten stuff
fealebenpae Aug 4, 2023
45bba6a
small fixes
fealebenpae Aug 8, 2023
0d85c3a
don’t close if already closed
fealebenpae Aug 8, 2023
fc61000
more error logging
fealebenpae Aug 8, 2023
b59669d
remove test
fealebenpae Aug 9, 2023
3735878
Ignore websockets nuget on Unity
fealebenpae Aug 9, 2023
8c201eb
error handling and reconnects
fealebenpae Aug 10, 2023
0b3ed53
unity dependencies
fealebenpae Aug 10, 2023
410708d
disable Maui tests
fealebenpae Aug 11, 2023
415eff0
address some code review comments
fealebenpae Aug 11, 2023
877efce
extract callback deletion
fealebenpae Aug 13, 2023
5c6cb46
fixes for iOS
fealebenpae Aug 13, 2023
beb2b4a
Revert "disable Maui tests"
fealebenpae Aug 13, 2023
1ed614d
adjust sync worker thread creation for Mono
fealebenpae Aug 13, 2023
c4d7dd1
bring back websocketexception
fealebenpae Aug 13, 2023
6f0282d
attempt fixes
fealebenpae Aug 14, 2023
4eb5a63
enable release assertions
fealebenpae Aug 14, 2023
9bddaf6
rework
fealebenpae Aug 14, 2023
3d23531
stuff
fealebenpae Aug 14, 2023
daaeac4
disable coverlet
fealebenpae Aug 14, 2023
8c532d4
enable crash reporting
fealebenpae Aug 14, 2023
01442cc
fix bad P/Invoke
fealebenpae Aug 14, 2023
6440e78
enable crash reporting for all .NET Core tests
fealebenpae Aug 14, 2023
a194522
add websocket header to CMakeLists.txt
fealebenpae Aug 14, 2023
104da2c
rethrow event loop exceptions
fealebenpae Aug 15, 2023
bf2af73
use file-scoped namespaces
fealebenpae Aug 15, 2023
b5a21cd
increase AWS Device Farm timeout
fealebenpae Aug 15, 2023
ac40766
Revert "increase AWS Device Farm timeout"
fealebenpae Aug 15, 2023
092b8ef
add AppConfiguration.UseManagedWebSockets opt-in
fealebenpae Aug 15, 2023
e3e50ba
changelog
fealebenpae Aug 15, 2023
55376df
update todo
fealebenpae Aug 15, 2023
9b14717
Merge remote-tracking branch 'origin/main' into yg/websocket
fealebenpae Aug 15, 2023
771d029
remove trailing whitespace
fealebenpae Aug 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/run-android-device-farm-test/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ runs:
- $DEVICEFARM_LOG_DIR
file_artifacts: |
Customer Artifacts.zip
timeout: 5400 # 1.5 hours
- run: |
Expand-Archive 'Customer Artifacts.zip' -DestinationPath artifacts
echo "results-path=${{ github.workspace }}/artifacts/Host_Machine_Files/`$DEVICEFARM_LOG_DIR/TestResults.Android.xml" | Out-File -FilePath $Env:GITHUB_OUTPUT -Encoding utf8 -Append
Expand Down
9 changes: 9 additions & 0 deletions .github/templates/test-code-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,16 @@ jobs:
echo "${{ github.workspace }}/tools" >> $GITHUB_PATH
- #@ template.replace(dotnetPublish("Tests/Realm.Tests", "net7.0", "linux-x64", { "RealmTestsStandaloneExe": "true" }))
- name: Run the tests
env:
DOTNET_DbgEnableMiniDump: 1
DOTNET_EnableCrashReport: 1
run: #@ "./tools/coverlet ${{ steps.dotnet-publish.outputs.executable-path }} -t ${{ steps.dotnet-publish.outputs.executable-path }}/Realm.Tests -a '--result=TestResults.Linux.xml --labels=After" + baasTestArgs("code-coverage") + "' -f lcov -o ./report.lcov --exclude '[Realm.Tests]*' --exclude '[Realm.Fody]*' --exclude '[Realm.PlatformHelpers]*'"
- name: Archive core dump
uses: actions/upload-artifact@v3
if: failure()
with:
name: crash-report-net-core-code-coverage
path: /tmp/coredump*
- name: Publish Coverage
id: publish-coveralls
uses: #@ actionCoveralls
Expand Down
9 changes: 9 additions & 0 deletions .github/templates/test-net-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,14 @@ jobs:
- #@ template.replace(prepareTest(cleanupWorkspace = True))
- #@ template.replace(dotnetBuildTests("Tests/Realm.Tests", "${{ matrix.framework }}", "${{ matrix.os.runtime }}"))
- name: Run the tests
env:
DOTNET_DbgEnableMiniDump: 1
DOTNET_EnableCrashReport: 1
run: #@ "${{ steps.dotnet-publish.outputs.executable-path }}/Realm.Tests --result=TestResults.xml --labels=After" + baasTestArgs("net-core-${{ matrix.runner }}-${{ matrix.runtime }}")
- name: Archive core dump
uses: actions/upload-artifact@v3
if: failure()
with:
name: crash-report-net-core-${{ matrix.runner }}-${{ matrix.runtime }}
path: /tmp/coredump*
- #@ publishTestsResults("TestResults.xml", ".NET (${{ matrix.os.runner }}, ${{ matrix.framework }})")
9 changes: 9 additions & 0 deletions .github/workflows/test-code-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,16 @@ jobs:
run: echo 'executable-path=./Tests/Realm.Tests/bin/Release/net7.0/linux-x64' >> $GITHUB_OUTPUT
shell: bash
- name: Run the tests
env:
DOTNET_DbgEnableMiniDump: 1
DOTNET_EnableCrashReport: 1
run: ./tools/coverlet ${{ steps.dotnet-publish.outputs.executable-path }} -t ${{ steps.dotnet-publish.outputs.executable-path }}/Realm.Tests -a '--result=TestResults.Linux.xml --labels=After --baasurl=${{ inputs.realmUrl }} --baascluster=${{ inputs.clusterName }} --baasapikey=${{ secrets.AtlasPublicKey}} --baasprivateapikey=${{ secrets.AtlasPrivateKey}} --baasprojectid=${{ secrets.AtlasProjectId }} --baasdifferentiator=code-coverage' -f lcov -o ./report.lcov --exclude '[Realm.Tests]*' --exclude '[Realm.Fody]*' --exclude '[Realm.PlatformHelpers]*'
- name: Archive core dump
uses: actions/upload-artifact@v3
if: failure()
with:
name: crash-report-net-core-code-coverage
path: /tmp/coredump*
- name: Publish Coverage
id: publish-coveralls
uses: coverallsapp/github-action@95b1a2355bd0e526ad2fd62da9fd386ad4c98474
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/test-net-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,16 @@ jobs:
run: echo 'executable-path=./Tests/Realm.Tests/bin/Release/${{ matrix.framework }}/${{ matrix.os.runtime }}' >> $GITHUB_OUTPUT
shell: bash
- name: Run the tests
env:
DOTNET_DbgEnableMiniDump: 1
DOTNET_EnableCrashReport: 1
run: ${{ steps.dotnet-publish.outputs.executable-path }}/Realm.Tests --result=TestResults.xml --labels=After --baasurl=${{ inputs.realmUrl }} --baascluster=${{ inputs.clusterName }} --baasapikey=${{ secrets.AtlasPublicKey}} --baasprivateapikey=${{ secrets.AtlasPrivateKey}} --baasprojectid=${{ secrets.AtlasProjectId }} --baasdifferentiator=net-core-${{ matrix.runner }}-${{ matrix.runtime }}
- name: Archive core dump
uses: actions/upload-artifact@v3
if: failure()
with:
name: crash-report-net-core-${{ matrix.runner }}-${{ matrix.runtime }}
path: /tmp/coredump*
- name: Publish Unit Test Results
uses: LaPeste/test-reporter@510caf50a955b1003bec48a6494be4d6537f3a0b
if: always()
Expand Down
2 changes: 2 additions & 0 deletions Realm/Realm/Native/AppConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ internal MetadataPersistenceMode? MetadataPersistence

internal IntPtr managed_http_client;

internal IntPtr managed_websocket_provider;

internal UInt64 sync_connect_timeout_ms;

internal UInt64 sync_connection_linger_time_ms;
Expand Down
41 changes: 23 additions & 18 deletions Realm/Realm/Native/PrimitiveValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -244,16 +245,7 @@ public readonly Guid AsGuid()

public readonly string AsString() => string_value!;

public readonly byte[] AsBinary()
{
var bytes = new byte[(int)data_value.size];
for (var i = 0; i < bytes.Length; i++)
{
bytes[i] = data_value.data[i];
}

return bytes;
}
public readonly byte[] AsBinary() => data_value.AsBytes();

public readonly IRealmObjectBase AsObject(Realm realm)
{
Expand Down Expand Up @@ -283,13 +275,6 @@ public readonly bool TryGetObjectHandle(Realm realm, [NotNullWhen(true)] out Obj
return false;
}

[StructLayout(LayoutKind.Sequential)]
private struct BinaryValue
{
public byte* data;
public IntPtr size;
}

[StructLayout(LayoutKind.Sequential)]
private struct LinkValue
{
Expand Down Expand Up @@ -326,6 +311,8 @@ internal unsafe struct StringValue
public byte* data;
public nint size;

public static readonly StringValue Null = new() { data = null };

public static StringValue AllocateFrom(string? value, Arena arena)
{
if (value is null)
Expand All @@ -335,17 +322,35 @@ public static StringValue AllocateFrom(string? value, Arena arena)

var byteCount = Encoding.UTF8.GetMaxByteCount(value.Length);
var buffer = arena.Allocate<byte>(byteCount + 1);
#if NETCOREAPP2_1_OR_GREATER
byteCount = Encoding.UTF8.GetBytes(value, new Span<byte>(buffer.Data, buffer.Length));
#else
fixed (char* stringBytes = value)
{
byteCount = Encoding.UTF8.GetBytes(stringBytes, value.Length, buffer.Data, buffer.Length);
buffer.Data[byteCount] = 0;
}
#endif

buffer.Data[byteCount] = 0;
return new StringValue { data = buffer.Data, size = byteCount };
}

public static implicit operator bool(in StringValue value) => value.data != null;

public static implicit operator string?(in StringValue value) => !value ? null : Encoding.UTF8.GetString(value.data, (int)value.size);
}

[StructLayout(LayoutKind.Sequential)]
internal unsafe struct BinaryValue
{
public byte* data;
public IntPtr size;

public readonly byte[] AsBytes(bool usePooledArray = false)
{
var bytes = usePooledArray ? ArrayPool<byte>.Shared.Rent((int)size) : new byte[(int)size];
Marshal.Copy((IntPtr)data, bytes, 0, (int)size);
return bytes;
}
}
}
144 changes: 144 additions & 0 deletions Realm/Realm/Native/SyncSocketProvider.EventLoop.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2023 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////

using System;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Realms.Logging;

namespace Realms.Native;

internal partial class SyncSocketProvider
{
private class Timer
{
private readonly CancellationTokenSource _cts = new();

internal Timer(TimeSpan delay, IntPtr nativeCallback, ChannelWriter<IWork> workQueue)
{
Logger.LogDefault(LogLevel.Trace, $"Creating timer with delay {delay} and target {nativeCallback}.");
var cancellationToken = _cts.Token;
Task.Delay(delay, cancellationToken).ContinueWith(async _ =>
{
await workQueue.WriteAsync(new Work(nativeCallback, cancellationToken));
});
}

internal void Cancel()
{
Logger.LogDefault(LogLevel.Trace, $"Canceling timer.");
_cts.Cancel();
_cts.Dispose();
}

private class Work : IWork
{
private readonly IntPtr _nativeCallback;
private readonly CancellationToken _cancellationToken;

public Work(IntPtr nativeCallback, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_cancellationToken = cancellationToken;
}

public void Execute()
{
var status = Status.OK;
if (_cancellationToken.IsCancellationRequested)
{
status = new(ErrorCode.OperationAborted, "Timer canceled");
}

RunCallback(_nativeCallback, status);
}
}
}

private class EventLoopWork : IWork
{
private readonly IntPtr _nativeCallback;
private readonly Status _status;

// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private readonly CancellationToken _cancellationToken;

public EventLoopWork(IntPtr nativeCallback, Status status, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_status = status;
_cancellationToken = cancellationToken;
}

public void Execute()
{
if (_cancellationToken.IsCancellationRequested)
{
Logger.LogDefault(LogLevel.Trace, "Deleting EventLoopWork callback only because event loop was cancelled.");
NativeMethods.delete_callback(_nativeCallback);
return;
}

RunCallback(_nativeCallback, _status);
}
}

private static void RunCallback(IntPtr nativeCallback, Status status)
{
Logger.LogDefault(LogLevel.Trace, $"SyncSocketProvider running native callback {nativeCallback} with status {status.Code} \"{status.Reason}\".");

using var arena = new Arena();
NativeMethods.run_callback(nativeCallback, status.Code, StringValue.AllocateFrom(status.Reason, arena));
}

private async Task PostWorkAsync(IntPtr nativeCallback)
{
Logger.LogDefault(LogLevel.Trace, "Posting work to SyncSocketProvider event loop.");
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, Status.OK, _cts.Token));
}

private async partial Task WorkThread()
{
Logger.LogDefault(LogLevel.Trace, "Starting SyncSocketProvider event loop.");
try
{
while (await _workQueue.Reader.WaitToReadAsync())
{
while (_workQueue.Reader.TryRead(out var work))
{
work.Execute();
}
}
nirinchev marked this conversation as resolved.
Show resolved Hide resolved
}
catch (Exception e)
{
Logger.LogDefault(LogLevel.Error, $"Error occurred in SyncSocketProvider event loop {e.GetType().FullName}: {e.Message}");
if (!string.IsNullOrEmpty(e.StackTrace))
{
Logger.LogDefault(LogLevel.Trace, e.StackTrace);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we notify sync that we're terminating the work thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't an interface we can use to notify Sync about that. But now that you mention it the existing behavior on an exception on the sync worker thread is to bring down the process, so instead of swallowing and logging the exception that's what we should do here.

}

throw;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think throwing in a background thread will bring down the process - you may get a warning or something, but in release, I think it's just ignored on most platforms.

}

Logger.LogDefault(LogLevel.Trace, "Exiting SyncSocketProvider event loop.");
}
}
Loading
Loading