Skip to content

Commit

Permalink
Try making SuperSocket.Kestrel support Detach
Browse files Browse the repository at this point in the history
  • Loading branch information
wj8400684 committed Dec 10, 2024
1 parent 7ec3ad6 commit c8eb346
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/SuperSocket.Connection/IObjectPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IObjectPipe<T> : IObjectPipe
ValueTask<T> ReadAsync();
}

public interface ISupplyController
interface ISupplyController
{
ValueTask SupplyRequired();

Expand Down
18 changes: 11 additions & 7 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ protected void UpdateLastActiveTime()
LastActiveTime = DateTimeOffset.Now;
}

public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
protected virtual IObjectPipe<TPackageInfo> CreatePackagePipe<TPackageInfo>(bool readAsDemand)
{
var packagePipe = !Options.ReadAsDemand
return !readAsDemand
? new DefaultObjectPipe<TPackageInfo>()
: new DefaultObjectPipeWithSupplyControl<TPackageInfo>();
}

public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
{
var packagePipe = CreatePackagePipe<TPackageInfo>(Options.ReadAsDemand);

_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;
Expand Down Expand Up @@ -230,9 +235,8 @@ protected void WritePackageWithEncoder<TPackage>(IBufferWriter<byte> writer, IPa
packageEncoder.Encode(writer, package);
}

protected virtual Task OnInputPipeReadAsync(ReadResult result)
protected virtual void OnInputPipeRead(ReadResult result)
{
return Task.CompletedTask;
}

protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
Expand All @@ -246,7 +250,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
try
{
result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
await OnInputPipeReadAsync(result);
OnInputPipeRead(result);
}
catch (Exception e)
{
Expand Down Expand Up @@ -305,7 +309,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
}
}

OnReaderComplete(reader, _isDetaching);
CompleteReader(reader, _isDetaching);
WriteEOFPackage();
}

Expand Down Expand Up @@ -414,7 +418,7 @@ protected void OnError(string message, Exception e = null)
Logger?.LogError(message);
}

protected virtual void OnReaderComplete(PipeReader reader, bool isDetaching)
protected virtual void CompleteReader(PipeReader reader, bool isDetaching)
{
reader.Complete();
}
Expand Down
42 changes: 17 additions & 25 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace SuperSocket.Kestrel;
using Microsoft.Extensions.Logging;

namespace SuperSocket.Kestrel;

using System;
using System.IO;
Expand All @@ -13,7 +15,6 @@
public class KestrelPipeConnection : PipeConnectionBase
{
private ConnectionContext _context;
private ISupplyController _supplyController;

public KestrelPipeConnection(ConnectionContext context, ConnectionOptions options)
: base(context.Transport.Input, context.Transport.Output, options)
Expand All @@ -22,6 +23,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option
context.ConnectionClosed.Register(() => OnConnectionClosed());
LocalEndPoint = context.LocalEndPoint;
RemoteEndPoint = context.RemoteEndPoint;

if (options.ReadAsDemand)
{
Logger.LogWarning($"{nameof(KestrelPipeConnection)} doesn't support ReadAsDemand.");
}
}

protected override void CompleteReader(PipeReader reader, bool isDetaching)
{
if (!isDetaching)
{
reader.Complete();
}
}

protected override void OnClosed()
Expand All @@ -45,13 +59,10 @@ protected override async void Close()
}
}

protected override async Task OnInputPipeReadAsync(ReadResult result)
protected override void OnInputPipeRead(ReadResult result)
{
if (result is { IsCanceled: false, IsCompleted: false })
UpdateLastActiveTime();

if (_supplyController != null)
await _supplyController.SupplyRequired().ConfigureAwait(false);
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
Expand Down Expand Up @@ -92,23 +103,4 @@ private void OnConnectionClosed()
{
Cancel();
}

protected override Task StartInputPipeTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe,
CancellationToken cancellationToken)
{
_supplyController = packagePipe as ISupplyController;

if (_supplyController != null)
cancellationToken.Register(() => _supplyController.SupplyEnd());

return base.StartInputPipeTask(packagePipe, cancellationToken);
}

protected override void OnReaderComplete(PipeReader reader, bool isDetaching)
{
if (isDetaching)
return;

reader.Complete();
}
}
4 changes: 2 additions & 2 deletions test/SuperSocket.Tests/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public async Task TestDetachableConnection()
.UsePackageHandler(async (s, p) =>
{
await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n"));
}).BuildAsServer())
}).UseKestrelPipeConnection().BuildAsServer())
{
Assert.Equal("TestServer", server.Name);

Expand All @@ -312,7 +312,7 @@ public async Task TestDetachableConnection()
new ConnectionOptions
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)),
ReadAsDemand = true
ReadAsDemand = false
}
), () => !context.ConnectionClosed.IsCancellationRequested);
}
Expand Down

0 comments on commit c8eb346

Please sign in to comment.