Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try making SuperSocket.Kestrel support Detach #763

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/myget.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ jobs:
run: |
dotnet pack -c Release -p:NuspecProperties="version=${{ steps.nbgv.outputs.NuGetPackageVersion }}.${{ github.run_number }}" pkg/SuperSocket.csproj /p:NoPackageAnalysis=true
- name: Push
run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/supersocket/api/v3/index.json
run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/swatchsocket/api/v3/index.json
2 changes: 1 addition & 1 deletion .github/workflows/myget_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ jobs:
run: |
dotnet pack -c Release -p:NuspecProperties="version=${{ steps.nbgv.outputs.NuGetPackageVersion }}" pkg/SuperSocket.csproj
- name: Push
run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/supersocket/api/v3/index.json
run: dotnet nuget push **/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/swatchsocket/api/v3/index.json
72 changes: 55 additions & 17 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ IPipelineFilter IPipeConnection.PipelineFilter

private bool _isDetaching = false;

private ISupplyController _supplyController;

protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, ConnectionOptions options)
{
Options = options;
Expand All @@ -66,11 +68,20 @@ protected void UpdateLastActiveTime()
LastActiveTime = DateTimeOffset.Now;
}

public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(
IPipelineFilter<TPackageInfo> pipelineFilter)
{
var packagePipe = !Options.ReadAsDemand
? new DefaultObjectPipe<TPackageInfo>()
: new DefaultObjectPipeWithSupplyControl<TPackageInfo>();
IObjectPipe<TPackageInfo> packagePipe;
if (Options.ReadAsDemand)
{
var defaultObjectPipe = new DefaultObjectPipeWithSupplyControl<TPackageInfo>();
_supplyController = defaultObjectPipe;
packagePipe = defaultObjectPipe;
}
else
{
packagePipe = new DefaultObjectPipe<TPackageInfo>();
}

_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;
Expand Down Expand Up @@ -151,7 +162,8 @@ protected virtual bool IsIgnorableException(Exception e)
return false;
}

protected virtual Task StartInputPipeTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
protected virtual Task StartInputPipeTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe,
CancellationToken cancellationToken)
{
return ReadPipeAsync(InputReader, packagePipe, cancellationToken);
}
Expand All @@ -164,7 +176,8 @@ private void CheckConnectionOpen()
}
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer,
CancellationToken cancellationToken = default)
{
var sendLockAcquired = false;

Expand All @@ -188,7 +201,8 @@ private void WriteBuffer(PipeWriter writer, ReadOnlyMemory<byte> buffer)
writer.Write(buffer.Span);
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package,
CancellationToken cancellationToken = default)
{
var sendLockAcquired = false;

Expand All @@ -206,7 +220,7 @@ public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> pa
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default)
{
var sendLockAcquired = false;

Expand All @@ -224,17 +238,20 @@ public override async ValueTask SendAsync(Action<PipeWriter> write, Cancellation
}
}

protected void WritePackageWithEncoder<TPackage>(IBufferWriter<byte> writer, IPackageEncoder<TPackage> packageEncoder, TPackage package)
protected void WritePackageWithEncoder<TPackage>(IBufferWriter<byte> writer,
IPackageEncoder<TPackage> packageEncoder, TPackage package)
{
CheckConnectionOpen();
packageEncoder.Encode(writer, package);
}

protected virtual void OnInputPipeRead(ReadResult result)
protected virtual Task OnInputPipeReadAsync(ReadResult result)
{
return Task.CompletedTask;
}

protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<TPackageInfo> packagePipe,
CancellationToken cancellationToken)
{
var pipelineFilter = _pipelineFilter as IPipelineFilter<TPackageInfo>;

Expand All @@ -245,7 +262,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
try
{
result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
OnInputPipeRead(result);
await OnInputPipeReadAsync(result);
}
catch (Exception e)
{
Expand All @@ -271,7 +288,8 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
if (buffer.Length > 0)
{
var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var currentPipelineFilter);
var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed,
out examined, out var currentPipelineFilter);

if (currentPipelineFilter != null)
{
Expand Down Expand Up @@ -304,16 +322,24 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
}
}

reader.Complete();
OnReaderComplete(reader, _isDetaching);
WriteEOFPackage();
}

protected virtual void OnReaderComplete(PipeReader reader, bool isDetaching)
{
reader.Complete();
}

protected void WriteEOFPackage()
{
_packagePipe.WirteEOF();
}

private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipelineFilter<TPackageInfo> pipelineFilter, IObjectPipe<TPackageInfo> packagePipe, out SequencePosition consumed, out SequencePosition examined, out IPipelineFilter<TPackageInfo> currentPipelineFilter)
private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer,
IPipelineFilter<TPackageInfo> pipelineFilter, IObjectPipe<TPackageInfo> packagePipe,
out SequencePosition consumed, out SequencePosition examined,
out IPipelineFilter<TPackageInfo> currentPipelineFilter)
{
consumed = buffer.Start;
examined = buffer.End;
Expand All @@ -336,7 +362,8 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
if (nextFilter != null)
{
// ProxyProtocolPipelineFilter always is the first filter and its next filter is the actual first filter.
if (bytesConsumedTotal == 0 && pipelineFilter is IProxyProtocolPipelineFilter proxyProtocolPipelineFilter)
if (bytesConsumedTotal == 0 &&
pipelineFilter is IProxyProtocolPipelineFilter proxyProtocolPipelineFilter)
{
ProxyInfo = proxyProtocolPipelineFilter.ProxyInfo;
}
Expand Down Expand Up @@ -412,5 +439,16 @@ protected void OnError(string message, Exception e = null)
else
Logger?.LogError(message);
}

protected void SupplyEnd()
{
_supplyController?.SupplyEnd();
}

protected async Task SupplyRequiredAsync()
{
if (_supplyController != null)
await _supplyController.SupplyRequired().ConfigureAwait(false);
}
}
}
}
2 changes: 0 additions & 2 deletions src/SuperSocket.Connection/TcpPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ protected override async ValueTask<int> SendOverIOAsync(ReadOnlySequence<byte> b
_segmentsForSend.Clear();
}

var segments = _segmentsForSend;

foreach (var piece in buffer)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down
6 changes: 3 additions & 3 deletions src/SuperSocket.Connection/UdpPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void MergeBuffer(ref ReadOnlySequence<byte> buffer, byte[] destBuffer)
}
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
if (_enableSendingPipe)
{
Expand All @@ -111,7 +111,7 @@ await SendOverIOAsync(new ReadOnlySequence<byte>(buffer), cancellationToken)
.ConfigureAwait(false);
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default)
{
if (_enableSendingPipe)
{
Expand Down Expand Up @@ -145,7 +145,7 @@ await ProcessOutputRead(Output.Reader)
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default)
{
if (_enableSendingPipe)
{
Expand Down
35 changes: 22 additions & 13 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option
: base(context.Transport.Input, context.Transport.Output, options)
{
_context = context;
context.ConnectionClosed.Register(() => OnConnectionClosed());
context.ConnectionClosed.Register(OnConnectionClosed);
LocalEndPoint = context.LocalEndPoint;
RemoteEndPoint = context.RemoteEndPoint;
}
Expand All @@ -31,11 +31,6 @@ protected override void OnClosed()
base.OnClosed();
}

public override ValueTask DetachAsync()
{
throw new NotSupportedException($"Detach is not supported by {nameof(KestrelPipeConnection)}.");
}

protected override async void Close()
{
var context = _context;
Expand All @@ -49,32 +44,46 @@ protected override async void Close()
}
}

protected override void OnInputPipeRead(ReadResult result)
protected override Task StartInputPipeTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe, CancellationToken cancellationToken)
{
if (!result.IsCanceled && !result.IsCompleted)
{
cancellationToken.Register(SupplyEnd);
return base.StartInputPipeTask(packagePipe, cancellationToken);
}

protected override async Task OnInputPipeReadAsync(ReadResult result)
{
if (result is { IsCanceled: false, IsCompleted: false })
UpdateLastActiveTime();
}

await SupplyRequiredAsync();
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default)
{
await base.SendAsync(write, cancellationToken);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await base.SendAsync(buffer, cancellationToken);
UpdateLastActiveTime();
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default)
{
await base.SendAsync(packageEncoder, package, cancellationToken);
UpdateLastActiveTime();
}

protected override void OnReaderComplete(PipeReader reader, bool isDetaching)
{
if (isDetaching)
return;

reader.Complete();
}

protected override bool IsIgnorableException(Exception e)
{
if (e is IOException ioe && ioe.InnerException != null)
Expand Down
Loading
Loading