Skip to content

Commit

Permalink
add connectionToken property from IConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
wj8400684 committed Apr 24, 2024
1 parent 01e4603 commit ff8b254
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 43 deletions.
6 changes: 4 additions & 2 deletions src/SuperSocket.Connection/ConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
namespace SuperSocket.Connection
{
public abstract class ConnectionBase : IConnection
{
{
public abstract IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter);

public abstract ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken = default);

public abstract ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken = default);

public bool IsClosed { get; private set; }
Expand All @@ -28,6 +28,8 @@ public abstract class ConnectionBase : IConnection

public DateTimeOffset LastActiveTime { get; protected set; } = DateTimeOffset.Now;

public CancellationToken ConnectionToken { get; protected set; }

protected virtual void OnClosed()
{
IsClosed = true;
Expand Down
2 changes: 2 additions & 0 deletions src/SuperSocket.Connection/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ public interface IConnection
ValueTask DetachAsync();

CloseReason? CloseReason { get; }

CancellationToken ConnectionToken { get; }
}
}
19 changes: 10 additions & 9 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ PipeReader IPipeConnection.InputReader
{
get { return InputReader; }
}

IPipelineFilter IPipeConnection.PipelineFilter
{
get { return _pipelineFilter; }
Expand All @@ -52,6 +52,7 @@ protected PipeConnectionBase(PipeReader inputReader, PipeWriter outputWriter, Co
Logger = options.Logger;
InputReader = inputReader;
OutputWriter = outputWriter;
ConnectionToken = _cts.Token;
}

protected virtual Task StartTask<TPackageInfo>(IObjectPipe<TPackageInfo> packagePipe)
Expand All @@ -72,7 +73,7 @@ public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPip

_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;

_pipeTask = StartTask(packagePipe);

_ = HandleClosing();
Expand Down Expand Up @@ -118,7 +119,7 @@ private async ValueTask HandleClosing()
{
if (!IsIgnorableException(exc))
OnError("Unhandled exception in the method PipeChannel.Close.", exc);
}
}
}
}
}
Expand Down Expand Up @@ -172,7 +173,7 @@ public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Cancellat
finally
{
SendLock.Release();
}
}
}

private void WriteBuffer(PipeWriter writer, ReadOnlyMemory<byte> buffer)
Expand Down Expand Up @@ -236,7 +237,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
if (!IsIgnorableException(e) && !(e is OperationCanceledException))
OnError("Failed to read from the pipe", e);

break;
}

Expand Down Expand Up @@ -267,7 +268,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
completed = true;
break;
}
}
}

if (completed)
Expand Down Expand Up @@ -344,7 +345,7 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
Close();
return false;
}

if (packageInfo == null)
{
// the current pipeline filter needs more data to process
Expand All @@ -370,12 +371,12 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
examined = consumed = buffer.End;
return true;
}

if (bytesConsumed > 0)
seqReader = new SequenceReader<byte>(seqReader.Sequence.Slice(bytesConsumed));
}
}

public override async ValueTask DetachAsync()
{
_isDetaching = true;
Expand Down
18 changes: 9 additions & 9 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option
: base(context.Transport.Input, context.Transport.Output, options)
{
_context = context;
context.ConnectionClosed.Register(() => OnClosed());
context.ConnectionClosed.Register(() => OnConnectionClosed());
LocalEndPoint = context.LocalEndPoint;
RemoteEndPoint = context.RemoteEndPoint;
}
Expand All @@ -39,14 +39,6 @@ protected override async void Close()
}
}

protected override void OnClosed()
{
if (!CloseReason.HasValue)
CloseReason = Connection.CloseReason.RemoteClosing;

base.OnClosed();
}

protected override void OnInputPipeRead(ReadResult result)
{
if (!result.IsCanceled && !result.IsCompleted)
Expand All @@ -72,4 +64,12 @@ public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> pa
await base.SendAsync(packageEncoder, package, cancellationToken);
UpdateLastActiveTime();
}

private void OnConnectionClosed()
{
if (!CloseReason.HasValue)
CloseReason = Connection.CloseReason.RemoteClosing;

Cancel();
}
}
42 changes: 19 additions & 23 deletions src/SuperSocket.Server/SuperSocketService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ protected virtual ValueTask OnSessionClosedAsync(IAppSession session, CloseEvent
if (closedHandler != null)
return closedHandler.Invoke(session, e);

#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

protected virtual async ValueTask FireSessionConnectedEvent(AppSession session)
Expand Down Expand Up @@ -350,7 +350,7 @@ protected virtual async ValueTask FireSessionClosedEvent(AppSession session, Clo
if (!handshakeSession.Handshaked)
return;
}

await UnRegisterSessionFromMiddlewares(session);

_logger.LogInformation($"The session disconnected: {session.SessionID} ({reason})");
Expand Down Expand Up @@ -396,18 +396,18 @@ private async ValueTask HandleSession(AppSession session, IConnection connection
var packageHandlingScheduler = _packageHandlingScheduler;

#if NET6_0_OR_GREATER
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None);
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif

await foreach (var p in packageStream)
{
if(_packageHandlingContextAccessor != null)
if (_packageHandlingContextAccessor != null)
{
_packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
}

#if !NET6_0_OR_GREATER
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None);
using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(connection.ConnectionToken);
#endif
await packageHandlingScheduler.HandlePackage(session, p, cancellationTokenSource.Token);

Expand All @@ -424,13 +424,9 @@ private async ValueTask HandleSession(AppSession session, IConnection connection

protected virtual CancellationTokenSource GetPackageHandlingCancellationTokenSource(CancellationToken cancellationToken)
{
#if NET6_0_OR_GREATER
return CancellationTokenSourcePool.Shared.Rent(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
#else
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut));
return cancellationTokenSource;
#endif
}

protected virtual ValueTask<bool> OnSessionErrorAsync(IAppSession session, PackageHandlingException<TReceivePackageInfo> exception)
Expand Down Expand Up @@ -471,20 +467,20 @@ public async Task StartAsync(CancellationToken cancellationToken)

protected virtual ValueTask OnStartedAsync()
{
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

protected virtual ValueTask OnStopAsync()
{
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
#if NETSTANDARD2_1
return GetCompletedTask();
#else
return ValueTask.CompletedTask;
#endif
}

private async Task StopListener(IConnectionListener listener)
Expand Down

0 comments on commit ff8b254

Please sign in to comment.