Skip to content

Commit

Permalink
Pass cancellation token to TCP listeners (#2082)
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 authored Sep 15, 2024
1 parent 2fdbb02 commit 81a4e7c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 43 deletions.
1 change: 1 addition & 0 deletions MQTTnet.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ See the LICENSE file in the project root for more information.</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpRenamePlacementToArrangementMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002EMemberReordering_002EMigrations_002ECSharpFileLayoutPatternRemoveIsAttributeUpgrade/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAddAccessorOwnerDeclarationBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EAlwaysTreatStructAsNotReorderableMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async Task AcceptClientConnectionsAsync(CancellationToken cancellationToken)
{
try
{
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
var clientSocket = await _socket.AcceptAsync(cancellationToken).ConfigureAwait(false);
if (clientSocket == null)
{
continue;
Expand Down
82 changes: 42 additions & 40 deletions Source/MQTTnet.Tests/MqttTcpChannel_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,80 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Implementations;

namespace MQTTnet.Tests
namespace MQTTnet.Tests;

[TestClass]
public class MqttTcpChannel_Tests
{
[TestClass]
public class MqttTcpChannel_Tests
[TestMethod]
public async Task Dispose_Channel_While_Used()
{
[TestMethod]
public async Task Dispose_Channel_While_Used()
{
var ct = new CancellationTokenSource();
var serverSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
using var ct = new CancellationTokenSource();
using var serverSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);

try
{
serverSocket.Bind(new IPEndPoint(IPAddress.Any, 50001));
serverSocket.Listen(0);
try
{
serverSocket.Bind(new IPEndPoint(IPAddress.Any, 50001));
serverSocket.Listen(0);

#pragma warning disable 4014
Task.Run(async () =>
Task.Run(
async () =>
#pragma warning restore 4014
{
while (!ct.IsCancellationRequested)
{
var client = await serverSocket.AcceptAsync();
var client = await serverSocket.AcceptAsync(CancellationToken.None);
var data = new byte[] { 128 };
await client.SendAsync(new ArraySegment<byte>(data), SocketFlags.None);
}
}, ct.Token);
},
ct.Token);

var clientSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
await clientSocket.ConnectAsync(new DnsEndPoint("localhost", 50001), CancellationToken.None);
using var clientSocket = new CrossPlatformSocket(AddressFamily.InterNetwork, ProtocolType.Tcp);
await clientSocket.ConnectAsync(new DnsEndPoint("localhost", 50001), CancellationToken.None);

var tcpChannel = new MqttTcpChannel(clientSocket.GetStream(), "test", null);
var tcpChannel = new MqttTcpChannel(clientSocket.GetStream(), "test", null);

await Task.Delay(100, ct.Token);
await Task.Delay(100, ct.Token);

var buffer = new byte[1];
await tcpChannel.ReadAsync(buffer, 0, 1, ct.Token);
var buffer = new byte[1];
await tcpChannel.ReadAsync(buffer, 0, 1, ct.Token);

Assert.AreEqual(128, buffer[0]);
Assert.AreEqual(128, buffer[0]);

// This block should fail after dispose.
// This block should fail after dispose.
#pragma warning disable 4014
Task.Run(() =>
Task.Run(
() =>
#pragma warning restore 4014
{
Task.Delay(200, ct.Token);
tcpChannel.Dispose();
}, ct.Token);
},
ct.Token);

try
{
await tcpChannel.ReadAsync(buffer, 0, 1, CancellationToken.None);
}
catch (Exception exception)
{
Assert.IsInstanceOfType(exception, typeof(SocketException));
Assert.AreEqual(SocketError.OperationAborted, ((SocketException)exception).SocketErrorCode);
}
try
{
await tcpChannel.ReadAsync(buffer, 0, 1, CancellationToken.None);
}
finally
catch (Exception exception)
{
ct.Cancel(false);
serverSocket.Dispose();
Assert.IsInstanceOfType(exception, typeof(SocketException));
Assert.AreEqual(SocketError.OperationAborted, ((SocketException)exception).SocketErrorCode);
}
}
finally
{
ct.Cancel(false);
}
}
}
}
4 changes: 2 additions & 2 deletions Source/MQTTnet/Implementations/CrossPlatformSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ public int TcpKeepAliveTime
set => _socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, value);
}

public async Task<CrossPlatformSocket> AcceptAsync()
public async Task<CrossPlatformSocket> AcceptAsync(CancellationToken cancellationToken)
{
try
{
var clientSocket = await _socket.AcceptAsync().ConfigureAwait(false);
var clientSocket = await _socket.AcceptAsync(cancellationToken).ConfigureAwait(false);
return new CrossPlatformSocket(clientSocket);
}
catch (ObjectDisposedException)
Expand Down

0 comments on commit 81a4e7c

Please sign in to comment.