diff --git a/.github/workflows/myget.yml b/.github/workflows/myget.yml index 1a31e4951..705cfd547 100644 --- a/.github/workflows/myget.yml +++ b/.github/workflows/myget.yml @@ -31,4 +31,4 @@ jobs: run: | dotnet pack -c Release -p:NuspecProperties="version=${{ steps.nbgv.outputs.NuGetPackageVersion }}.${{ github.run_number }}" pkg/SuperSocket.csproj /p:NoPackageAnalysis=true - name: Push - run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/supersocket/api/v3/index.json + run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/swatchsocket/api/v3/index.json diff --git a/.github/workflows/myget_release.yml b/.github/workflows/myget_release.yml index a74c24ec9..335bfcd9d 100644 --- a/.github/workflows/myget_release.yml +++ b/.github/workflows/myget_release.yml @@ -26,4 +26,4 @@ jobs: run: | dotnet pack -c Release -p:NuspecProperties="version=${{ steps.nbgv.outputs.NuGetPackageVersion }}" pkg/SuperSocket.csproj - name: Push - run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/supersocket/api/v3/index.json + run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/swatchsocket/api/v3/index.json diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index faa0d6c4d..8e3fb2acb 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -47,6 +47,8 @@ IPipelineFilter IPipeConnection.PipelineFilter private bool _isDetaching = false; + private ISupplyController _supplyController; + protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, ConnectionOptions options) { Options = options; @@ -66,11 +68,20 @@ protected void UpdateLastActiveTime() LastActiveTime = DateTimeOffset.Now; } - public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + public async override IAsyncEnumerable RunAsync( + IPipelineFilter pipelineFilter) { - var packagePipe = !Options.ReadAsDemand - ? new DefaultObjectPipe() - : new DefaultObjectPipeWithSupplyControl(); + IObjectPipe packagePipe; + if (Options.ReadAsDemand) + { + var defaultObjectPipe = new DefaultObjectPipeWithSupplyControl(); + _supplyController = defaultObjectPipe; + packagePipe = defaultObjectPipe; + } + else + { + packagePipe = new DefaultObjectPipe(); + } _packagePipe = packagePipe; _pipelineFilter = pipelineFilter; @@ -151,7 +162,8 @@ protected virtual bool IsIgnorableException(Exception e) return false; } - protected virtual Task StartInputPipeTask(IObjectPipe packagePipe, CancellationToken cancellationToken) + protected virtual Task StartInputPipeTask(IObjectPipe packagePipe, + CancellationToken cancellationToken) { return ReadPipeAsync(InputReader, packagePipe, cancellationToken); } @@ -164,7 +176,8 @@ private void CheckConnectionOpen() } } - public override async ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + public override async ValueTask SendAsync(ReadOnlyMemory buffer, + CancellationToken cancellationToken = default) { var sendLockAcquired = false; @@ -188,7 +201,8 @@ private void WriteBuffer(PipeWriter writer, ReadOnlyMemory buffer) writer.Write(buffer.Span); } - public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken = default) + public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, + CancellationToken cancellationToken = default) { var sendLockAcquired = false; @@ -206,7 +220,7 @@ public override async ValueTask SendAsync(IPackageEncoder pa } } - public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) + public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken = default) { var sendLockAcquired = false; @@ -224,17 +238,20 @@ public override async ValueTask SendAsync(Action write, Cancellation } } - protected void WritePackageWithEncoder(IBufferWriter writer, IPackageEncoder packageEncoder, TPackage package) + protected void WritePackageWithEncoder(IBufferWriter writer, + IPackageEncoder packageEncoder, TPackage package) { CheckConnectionOpen(); packageEncoder.Encode(writer, package); } - protected virtual void OnInputPipeRead(ReadResult result) + protected virtual Task OnInputPipeReadAsync(ReadResult result) { + return Task.CompletedTask; } - protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe packagePipe, CancellationToken cancellationToken) + protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe packagePipe, + CancellationToken cancellationToken) { var pipelineFilter = _pipelineFilter as IPipelineFilter; @@ -245,7 +262,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< try { result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); - OnInputPipeRead(result); + await OnInputPipeReadAsync(result); } catch (Exception e) { @@ -271,7 +288,8 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< { if (buffer.Length > 0) { - var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var currentPipelineFilter); + var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, + out examined, out var currentPipelineFilter); if (currentPipelineFilter != null) { @@ -304,16 +322,24 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< } } - reader.Complete(); + OnReaderComplete(reader, _isDetaching); WriteEOFPackage(); } + protected virtual void OnReaderComplete(PipeReader reader, bool isDetaching) + { + reader.Complete(); + } + protected void WriteEOFPackage() { _packagePipe.WirteEOF(); } - private bool ReaderBuffer(ref ReadOnlySequence buffer, IPipelineFilter pipelineFilter, IObjectPipe packagePipe, out SequencePosition consumed, out SequencePosition examined, out IPipelineFilter currentPipelineFilter) + private bool ReaderBuffer(ref ReadOnlySequence buffer, + IPipelineFilter pipelineFilter, IObjectPipe packagePipe, + out SequencePosition consumed, out SequencePosition examined, + out IPipelineFilter currentPipelineFilter) { consumed = buffer.Start; examined = buffer.End; @@ -336,7 +362,8 @@ private bool ReaderBuffer(ref ReadOnlySequence buffer, IPipe if (nextFilter != null) { // ProxyProtocolPipelineFilter always is the first filter and its next filter is the actual first filter. - if (bytesConsumedTotal == 0 && pipelineFilter is IProxyProtocolPipelineFilter proxyProtocolPipelineFilter) + if (bytesConsumedTotal == 0 && + pipelineFilter is IProxyProtocolPipelineFilter proxyProtocolPipelineFilter) { ProxyInfo = proxyProtocolPipelineFilter.ProxyInfo; } @@ -412,5 +439,16 @@ protected void OnError(string message, Exception e = null) else Logger?.LogError(message); } + + protected void SupplyEnd() + { + _supplyController?.SupplyEnd(); + } + + protected async Task SupplyRequiredAsync() + { + if (_supplyController != null) + await _supplyController.SupplyRequired().ConfigureAwait(false); + } } -} +} \ No newline at end of file diff --git a/src/SuperSocket.Connection/TcpPipeConnection.cs b/src/SuperSocket.Connection/TcpPipeConnection.cs index 2bb406852..dcd19adf6 100644 --- a/src/SuperSocket.Connection/TcpPipeConnection.cs +++ b/src/SuperSocket.Connection/TcpPipeConnection.cs @@ -60,8 +60,6 @@ protected override async ValueTask SendOverIOAsync(ReadOnlySequence b _segmentsForSend.Clear(); } - var segments = _segmentsForSend; - foreach (var piece in buffer) { cancellationToken.ThrowIfCancellationRequested(); diff --git a/src/SuperSocket.Connection/UdpPipeConnection.cs b/src/SuperSocket.Connection/UdpPipeConnection.cs index 9bb84385c..fbb0ba032 100644 --- a/src/SuperSocket.Connection/UdpPipeConnection.cs +++ b/src/SuperSocket.Connection/UdpPipeConnection.cs @@ -97,7 +97,7 @@ private void MergeBuffer(ref ReadOnlySequence buffer, byte[] destBuffer) } } - public override async ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + public override async ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { if (_enableSendingPipe) { @@ -111,7 +111,7 @@ await SendOverIOAsync(new ReadOnlySequence(buffer), cancellationToken) .ConfigureAwait(false); } - public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken) + public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken = default) { if (_enableSendingPipe) { @@ -145,7 +145,7 @@ await ProcessOutputRead(Output.Reader) } } - public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) + public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken = default) { if (_enableSendingPipe) { diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index 8ed40dd93..d36bcbe36 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -18,7 +18,7 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option : base(context.Transport.Input, context.Transport.Output, options) { _context = context; - context.ConnectionClosed.Register(() => OnConnectionClosed()); + context.ConnectionClosed.Register(OnConnectionClosed); LocalEndPoint = context.LocalEndPoint; RemoteEndPoint = context.RemoteEndPoint; } @@ -31,11 +31,6 @@ protected override void OnClosed() base.OnClosed(); } - public override ValueTask DetachAsync() - { - throw new NotSupportedException($"Detach is not supported by {nameof(KestrelPipeConnection)}."); - } - protected override async void Close() { var context = _context; @@ -49,32 +44,46 @@ protected override async void Close() } } - protected override void OnInputPipeRead(ReadResult result) + protected override Task StartInputPipeTask(IObjectPipe packagePipe, CancellationToken cancellationToken) { - if (!result.IsCanceled && !result.IsCompleted) - { + cancellationToken.Register(SupplyEnd); + return base.StartInputPipeTask(packagePipe, cancellationToken); + } + + protected override async Task OnInputPipeReadAsync(ReadResult result) + { + if (result is { IsCanceled: false, IsCompleted: false }) UpdateLastActiveTime(); - } + + await SupplyRequiredAsync(); } - public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) + public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken = default) { await base.SendAsync(write, cancellationToken); UpdateLastActiveTime(); } - public override async ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + public override async ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { await base.SendAsync(buffer, cancellationToken); UpdateLastActiveTime(); } - public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken) + public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken = default) { await base.SendAsync(packageEncoder, package, cancellationToken); UpdateLastActiveTime(); } + protected override void OnReaderComplete(PipeReader reader, bool isDetaching) + { + if (isDetaching) + return; + + reader.Complete(); + } + protected override bool IsIgnorableException(Exception e) { if (e is IOException ioe && ioe.InnerException != null) diff --git a/test/SuperSocket.Tests/ClientTest.cs b/test/SuperSocket.Tests/ClientTest.cs index c190f8174..d1ebbc73b 100644 --- a/test/SuperSocket.Tests/ClientTest.cs +++ b/test/SuperSocket.Tests/ClientTest.cs @@ -14,7 +14,11 @@ using SuperSocket.Server.Abstractions; using SuperSocket.Server.Abstractions.Session; using System.Threading; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; +using SuperSocket.Kestrel; using SuperSocket.WebSocket; namespace SuperSocket.Tests @@ -23,13 +27,12 @@ namespace SuperSocket.Tests public class ClientTest : TestClassBase { private static Random _rd = new Random(); - + public ClientTest(ITestOutputHelper outputHelper) : base(outputHelper) { - } - + [Theory] [Trait("Category", "Client.TestEcho")] [InlineData(typeof(RegularHostConfigurator), false)] @@ -43,24 +46,23 @@ public async Task TestEcho(Type hostConfiguratorType, bool clientReadAsDemand) var hostConfigurator = CreateObject(hostConfiguratorType); using (var server = CreateSocketServerBuilder(hostConfigurator) - .UsePackageHandler(async (s, p) => - { - await s.SendAsync(Utf8Encoding.GetBytes(p.Text + "\r\n")); - }) - .UseSessionHandler( - onConnected: (s) => - { - serverSessionEvent.Set(); - return ValueTask.CompletedTask; - }, - onClosed: (s, e) => - { - serverSessionEvent.Set(); - return ValueTask.CompletedTask; - }) - .BuildAsServer()) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes(p.Text + "\r\n")); + }) + .UseSessionHandler( + onConnected: (s) => + { + serverSessionEvent.Set(); + return ValueTask.CompletedTask; + }, + onClosed: (s, e) => + { + serverSessionEvent.Set(); + return ValueTask.CompletedTask; + }) + .BuildAsServer()) { - Assert.Equal("TestServer", server.Name); Assert.True(await server.StartAsync()); @@ -71,11 +73,12 @@ public async Task TestEcho(Type hostConfiguratorType, bool clientReadAsDemand) Logger = NullLogger.Instance, ReadAsDemand = clientReadAsDemand }; - + var client = hostConfigurator.ConfigureEasyClient(new LinePipelineFilter(), options); - var connected = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, hostConfigurator.Listener.Port)); - + var connected = + await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, hostConfigurator.Listener.Port)); + Assert.True(connected); Assert.True(serverSessionEvent.WaitOne(1000)); @@ -87,7 +90,7 @@ public async Task TestEcho(Type hostConfiguratorType, bool clientReadAsDemand) var package = await client.ReceiveAsync(); Assert.NotNull(package); - Assert.Equal(msg, package.Text); + Assert.Equal(msg, package.Text); } await client.CloseAsync(); @@ -105,19 +108,19 @@ public async Task TestBindLocalEndPoint(Type hostConfiguratorType) IAppSession session = default; var hostConfigurator = CreateObject(hostConfiguratorType); - using (var server = CreateSocketServerBuilder(hostConfigurator) - .UseSessionHandler(async s => - { - session = s; - await Task.CompletedTask; - }) - .BuildAsServer()) + using (var server = + CreateSocketServerBuilder(hostConfigurator) + .UseSessionHandler(async s => + { + session = s; + await Task.CompletedTask; + }) + .BuildAsServer()) { - Assert.Equal("TestServer", server.Name); Assert.True(await server.StartAsync()); - OutputHelper.WriteLine("Server started."); + OutputHelper.WriteLine("Server started."); var pipelineFilter = new CommandLinePipelineFilter { @@ -128,11 +131,11 @@ public async Task TestBindLocalEndPoint(Type hostConfiguratorType) { Logger = DefaultLoggerFactory.CreateLogger(nameof(TestBindLocalEndPoint)) }; - + var client = hostConfigurator.ConfigureEasyClient(pipelineFilter, options); var connected = false; var localPort = 0; - + for (var i = 0; i < 3; i++) { localPort = _rd.Next(40000, 50000); @@ -140,19 +143,21 @@ public async Task TestBindLocalEndPoint(Type hostConfiguratorType) try { - connected = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, hostConfigurator.Listener.Port)); + connected = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, + hostConfigurator.Listener.Port)); } catch (SocketException e) { - if (e.SocketErrorCode == SocketError.AccessDenied || e.SocketErrorCode == SocketError.AddressAlreadyInUse) + if (e.SocketErrorCode == SocketError.AccessDenied || + e.SocketErrorCode == SocketError.AddressAlreadyInUse) continue; - + throw e; } - break; - } - + break; + } + Assert.True(connected); await Task.Delay(500); @@ -202,12 +207,10 @@ public async Task TestCommandLine(Type hostConfiguratorType) var packageEvent = new AutoResetEvent(false); var hostConfigurator = CreateObject(hostConfiguratorType); - using (var server = CreateSocketServerBuilder(hostConfigurator) - .UseCommand((options) => - { - options.AddCommand(); - }) - .BuildAsServer()) + using (var server = + CreateSocketServerBuilder(hostConfigurator) + .UseCommand((options) => { options.AddCommand(); }) + .BuildAsServer()) { Assert.Equal("TestServer", server.Name); @@ -234,8 +237,9 @@ public async Task TestCommandLine(Type hostConfiguratorType) await Task.CompletedTask; }; - var connected = await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, hostConfigurator.Listener.Port)); - + var connected = + await client.ConnectAsync(new IPEndPoint(IPAddress.Loopback, hostConfigurator.Listener.Port)); + Assert.True(connected); client.StartReceive(); @@ -262,70 +266,85 @@ public async Task TestDetachableConnection() { IHostConfigurator hostConfigurator = new RegularHostConfigurator(); - await TestDetachableConnectionInternal(hostConfigurator, (_, socket) => - new StreamPipeConnection( - hostConfigurator.GetClientStream(socket).Result, - socket.RemoteEndPoint, - socket.LocalEndPoint, - new ConnectionOptions - { - Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = true - }) - ); - - /* KestrelPipeConnection doesn't support Detach right now. - await TestDetachableConnectionInternal(new KestralConnectionHostConfigurator(), (server, socket) => - new KestrelPipeConnection( - server.ServiceProvider.GetService().Create(socket), + using (var server = CreateSocketServerBuilder(hostConfigurator) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); + }).BuildAsServer()) + { + Assert.Equal("TestServer", server.Name); + + Assert.True(await server.StartAsync()); + OutputHelper.WriteLine("Server started."); + + using (var socket = hostConfigurator.CreateClient()) + { + await TestDetachableConnectionInternal(hostConfigurator, server, ser => new StreamPipeConnection( + hostConfigurator.GetClientStream(socket).Result, + socket.RemoteEndPoint, + socket.LocalEndPoint, new ConnectionOptions { Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = false - } - ) - ); - */ - } + ReadAsDemand = true + }), () => socket.Connected); + } + + await server.StopAsync(); + } - async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, Func connectionFactory) - { using (var server = CreateSocketServerBuilder(hostConfigurator) - .UsePackageHandler(async (s, p) => - { - await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); - }).BuildAsServer()) + .ConfigureServices((ctx, services) => services.AddSocketConnectionFactory()) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); + }).BuildAsServer()) { - Assert.Equal("TestServer", server.Name); Assert.True(await server.StartAsync()); OutputHelper.WriteLine("Server started."); - - using (var socket = hostConfigurator.CreateClient()) + var connectionFactory = server.ServiceProvider + .GetRequiredService(); + await using (var context = await connectionFactory.ConnectAsync(hostConfigurator.GetServerEndPoint())) { - var connection = connectionFactory(server, socket); + await TestDetachableConnectionInternal(hostConfigurator, server, ser => new KestrelPipeConnection( + context, + new ConnectionOptions + { + Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), + ReadAsDemand = true + } + ), () => !context.ConnectionClosed.IsCancellationRequested); + } + + await server.StopAsync(); + } + } - await TestConnection(connection); + async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, + IServer server, + Func connectionFactory, + Func checkConnectionFactory) + { + var connection = connectionFactory(server); - OutputHelper.WriteLine("Before DetachAsync"); + await TestConnection(connection); - await connection.DetachAsync(); + OutputHelper.WriteLine("Before DetachAsync"); - // the connection is still alive in the server - Assert.Equal(1, server.SessionCount); + await connection.DetachAsync(); - // socket.Connected is is still connected - Assert.True(socket.Connected); + // the connection is still alive in the server + Assert.Equal(1, server.SessionCount); - // Attach the socket with another connection - connection = connectionFactory(server, socket); + // socket.Connected is is still connected + Assert.True(checkConnectionFactory()); - await TestConnection(connection); - } + // Attach the socket with another connection + connection = connectionFactory(server); - await server.StopAsync(); - } + await TestConnection(connection); } async Task TestConnection(IConnection connection) @@ -351,4 +370,4 @@ async Task TestConnection(IConnection connection) } } } -} +} \ No newline at end of file diff --git a/test/SuperSocket.Tests/ServiceCollectionExtensions.cs b/test/SuperSocket.Tests/ServiceCollectionExtensions.cs new file mode 100644 index 000000000..29fba61a5 --- /dev/null +++ b/test/SuperSocket.Tests/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using System; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.DependencyInjection; + +namespace SuperSocket.Tests; + +public static class ServiceCollectionExtensions +{ + private const string SocketConnectionFactoryTypeName = + "Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactory"; + + private static Type FindSocketConnectionFactory() + { + var assembly = typeof(SocketTransportOptions).Assembly; + var connectionFactoryType = assembly.GetType(SocketConnectionFactoryTypeName); + return connectionFactoryType ?? throw new NotSupportedException(SocketConnectionFactoryTypeName); + } + + public static IServiceCollection AddSocketConnectionFactory(this IServiceCollection services) + { + var factoryType = FindSocketConnectionFactory(); + return services.AddTransient(typeof(IConnectionFactory), factoryType); + } +} \ No newline at end of file