diff --git a/src/SuperSocket.Connection/IObjectPipe.cs b/src/SuperSocket.Connection/IObjectPipe.cs index 35d2a8f27..66b05aafb 100644 --- a/src/SuperSocket.Connection/IObjectPipe.cs +++ b/src/SuperSocket.Connection/IObjectPipe.cs @@ -20,7 +20,7 @@ public interface IObjectPipe : IObjectPipe ValueTask ReadAsync(); } - public interface ISupplyController + interface ISupplyController { ValueTask SupplyRequired(); diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index 16936a86b..51a26be93 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -66,11 +66,16 @@ protected void UpdateLastActiveTime() LastActiveTime = DateTimeOffset.Now; } - public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + protected virtual IObjectPipe CreatePackagePipe(bool readAsDemand) { - var packagePipe = !Options.ReadAsDemand + return !readAsDemand ? new DefaultObjectPipe() : new DefaultObjectPipeWithSupplyControl(); + } + + public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + { + var packagePipe = CreatePackagePipe(Options.ReadAsDemand); _packagePipe = packagePipe; _pipelineFilter = pipelineFilter; @@ -230,9 +235,8 @@ protected void WritePackageWithEncoder(IBufferWriter writer, IPa packageEncoder.Encode(writer, package); } - protected virtual Task OnInputPipeReadAsync(ReadResult result) + protected virtual void OnInputPipeRead(ReadResult result) { - return Task.CompletedTask; } protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe packagePipe, CancellationToken cancellationToken) @@ -246,7 +250,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< try { result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); - await OnInputPipeReadAsync(result); + OnInputPipeRead(result); } catch (Exception e) { @@ -305,7 +309,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< } } - OnReaderComplete(reader, _isDetaching); + CompleteReader(reader, _isDetaching); WriteEOFPackage(); } @@ -414,7 +418,7 @@ protected void OnError(string message, Exception e = null) Logger?.LogError(message); } - protected virtual void OnReaderComplete(PipeReader reader, bool isDetaching) + protected virtual void CompleteReader(PipeReader reader, bool isDetaching) { reader.Complete(); } diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index 06286dff3..af5d9772e 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -1,4 +1,6 @@ -namespace SuperSocket.Kestrel; +using Microsoft.Extensions.Logging; + +namespace SuperSocket.Kestrel; using System; using System.IO; @@ -13,7 +15,6 @@ public class KestrelPipeConnection : PipeConnectionBase { private ConnectionContext _context; - private ISupplyController _supplyController; public KestrelPipeConnection(ConnectionContext context, ConnectionOptions options) : base(context.Transport.Input, context.Transport.Output, options) @@ -22,6 +23,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option context.ConnectionClosed.Register(() => OnConnectionClosed()); LocalEndPoint = context.LocalEndPoint; RemoteEndPoint = context.RemoteEndPoint; + + if (options.ReadAsDemand) + { + Logger.LogWarning($"{nameof(KestrelPipeConnection)} doesn't support ReadAsDemand."); + } + } + + protected override void CompleteReader(PipeReader reader, bool isDetaching) + { + if (!isDetaching) + { + reader.Complete(); + } } protected override void OnClosed() @@ -45,13 +59,10 @@ protected override async void Close() } } - protected override async Task OnInputPipeReadAsync(ReadResult result) + protected override void OnInputPipeRead(ReadResult result) { if (result is { IsCanceled: false, IsCompleted: false }) UpdateLastActiveTime(); - - if (_supplyController != null) - await _supplyController.SupplyRequired().ConfigureAwait(false); } public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) @@ -92,23 +103,4 @@ private void OnConnectionClosed() { Cancel(); } - - protected override Task StartInputPipeTask(IObjectPipe packagePipe, - CancellationToken cancellationToken) - { - _supplyController = packagePipe as ISupplyController; - - if (_supplyController != null) - cancellationToken.Register(() => _supplyController.SupplyEnd()); - - return base.StartInputPipeTask(packagePipe, cancellationToken); - } - - protected override void OnReaderComplete(PipeReader reader, bool isDetaching) - { - if (isDetaching) - return; - - reader.Complete(); - } } \ No newline at end of file diff --git a/test/SuperSocket.Tests/ClientTest.cs b/test/SuperSocket.Tests/ClientTest.cs index 7b2d05b74..38116096a 100644 --- a/test/SuperSocket.Tests/ClientTest.cs +++ b/test/SuperSocket.Tests/ClientTest.cs @@ -297,7 +297,7 @@ public async Task TestDetachableConnection() .UsePackageHandler(async (s, p) => { await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); - }).BuildAsServer()) + }).UseKestrelPipeConnection().BuildAsServer()) { Assert.Equal("TestServer", server.Name); @@ -312,7 +312,7 @@ public async Task TestDetachableConnection() new ConnectionOptions { Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = true + ReadAsDemand = false } ), () => !context.ConnectionClosed.IsCancellationRequested); }