Skip to content

Commit

Permalink
Fix unobserved SocketException
Browse files Browse the repository at this point in the history
resolves #770 (original issue)
resolves #771 (original PR, this is a rebase)

Perform socket read operations according to Task-based asynchronous pattern (TAP) instead of Asynchronous Programming Model (APM)
  • Loading branch information
nmandzyk authored and gbirchmeier committed Feb 14, 2024
1 parent b9e93b4 commit 3eb4ae2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 38 deletions.
39 changes: 26 additions & 13 deletions QuickFIXn/SocketInitiatorThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.IO;
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace QuickFix
{
Expand All @@ -21,6 +22,7 @@ public class SocketInitiatorThread : IResponder
private byte[] readBuffer_ = new byte[BUF_SIZE];
private Parser parser_;
protected Stream stream_;
protected CancellationTokenSource _readCancellationTokenSource;
private Transport.SocketInitiator initiator_;
private Session session_;
private IPEndPoint socketEndPoint_;
Expand All @@ -36,6 +38,7 @@ public SocketInitiatorThread(Transport.SocketInitiator initiator, Session sessio
parser_ = new Parser();
session_ = session;
socketSettings_ = socketSettings;
_readCancellationTokenSource = new CancellationTokenSource();
}

public void Start()
Expand Down Expand Up @@ -117,9 +120,9 @@ public bool Read()
}

/// <summary>
/// Keep a handle to the current outstanding read request (if any)
/// Keep a task for handling async read
/// </summary>
private IAsyncResult currentReadRequest_;
private Task<int> currentReadTask;
/// <summary>
/// Reads data from the network into the specified buffer.
/// It will wait up to the specified number of milliseconds for data to arrive,
Expand All @@ -136,20 +139,22 @@ protected int ReadSome(byte[] buffer, int timeoutMilliseconds)
try
{
// Begin read if it is not already started
if (currentReadRequest_ == null)
currentReadRequest_ = stream_.BeginRead(buffer, 0, buffer.Length, null, null);
if (currentReadTask == null)
currentReadTask = stream_.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);

// Wait for it to complete (given timeout)
currentReadRequest_.AsyncWaitHandle.WaitOne(timeoutMilliseconds);
currentReadTask.Wait(timeoutMilliseconds);


if (currentReadRequest_.IsCompleted)
if (currentReadTask.IsCompleted)
{
// Make sure to set currentReadRequest_ to before retreiving result
// so a new read can be started next time even if an exception is thrown
var request = currentReadRequest_;
currentReadRequest_ = null;
var request = currentReadTask;
currentReadTask.Dispose();
currentReadTask = null;

int bytesRead = stream_.EndRead(request);
int bytesRead = request.Result;
if (0 == bytesRead)
throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown));

Expand All @@ -158,9 +163,10 @@ protected int ReadSome(byte[] buffer, int timeoutMilliseconds)
else
return 0;
}
catch (System.IO.IOException ex) // Timeout
catch (AggregateException ex) // Timeout
{
var inner = ex.InnerException as SocketException;
var ioException = ex.InnerException as IOException;
var inner = ioException?.InnerException as SocketException;
if (inner != null && inner.SocketErrorCode == SocketError.TimedOut)
{
// Nothing read
Expand Down Expand Up @@ -196,8 +202,15 @@ public bool Send(string data)
public void Disconnect()
{
isDisconnectRequested_ = true;
if (stream_ != null)
stream_.Close();
_readCancellationTokenSource?.Cancel();
_readCancellationTokenSource?.Dispose();
_readCancellationTokenSource = null;

// just wait when read task will be cancelled
currentReadTask?.ContinueWith(t => { }).Wait(1000);
currentReadTask?.Dispose();
currentReadTask = null;
stream_?.Close();
}

#endregion
Expand Down
61 changes: 36 additions & 25 deletions QuickFIXn/SocketReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.IO;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace QuickFix
{
Expand All @@ -14,16 +16,17 @@ public class SocketReader : IDisposable
public const int BUF_SIZE = 4096;
private readonly byte[] _readBuffer = new byte[BUF_SIZE];
private readonly Parser _parser = new();
private Session? _qfSession; //will be null when initialized
private Session? _qfSession;
private readonly Stream _stream;
private CancellationTokenSource? _readCancellationTokenSource;
private readonly TcpClient _tcpClient;
private readonly ClientHandlerThread _responder;
private readonly AcceptorSocketDescriptor? _acceptorDescriptor;

/// <summary>
/// Keep a handle to the current outstanding read request (if any)
/// Keep a task for handling async read
/// </summary>
private IAsyncResult? _currentReadRequest;
private Task<int>? _currentReadTask;

internal SocketReader(
TcpClient tcpClient,
Expand All @@ -35,6 +38,7 @@ internal SocketReader(
_responder = responder;
_acceptorDescriptor = acceptorDescriptor;
_stream = Transport.StreamFactory.CreateServerStream(tcpClient, settings, responder.GetLog());
_readCancellationTokenSource = new CancellationTokenSource();
}

public void Read()
Expand Down Expand Up @@ -71,46 +75,44 @@ public void Read()
/// <exception cref="System.Net.Sockets.SocketException">On connection reset</exception>
protected virtual int ReadSome(byte[] buffer, int timeoutMilliseconds)
{
// NOTE: THIS FUNCTION IS EXACTLY THE SAME AS THE ONE IN SocketInitiatorThread.
// Any changes made here should also be made there.
try
{
try {
// Begin read if it is not already started
_currentReadRequest ??= _stream.BeginRead(buffer, 0, buffer.Length, null, null);
_currentReadTask ??= _stream.ReadAsync(buffer, 0, buffer.Length, _readCancellationTokenSource.Token);

Check warning on line 80 in QuickFIXn/SocketReader.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 80 in QuickFIXn/SocketReader.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

// Wait for it to complete (given timeout)
_currentReadRequest.AsyncWaitHandle.WaitOne(timeoutMilliseconds);
_currentReadTask.Wait(timeoutMilliseconds);

if (_currentReadRequest.IsCompleted)
{
// Make sure to set _currentReadRequest to before retreiving result
// so a new read can be started next time even if an exception is thrown
var request = _currentReadRequest;
_currentReadRequest = null;
if (_currentReadTask.IsCompleted) {
// Dispose/nullify currentReadTask *before* retreiving .Result.
// Accessting .Result can throw an exception, so we need to reset currentReadTask
// first, to set us up for the next read even if an exception is thrown.
Task<int>? request = _currentReadTask;
_currentReadTask.Dispose();
_currentReadTask = null;

int bytesRead = _stream.EndRead(request);
int bytesRead = request.Result; // (As mentioned above, this can throw an exception!)
if (0 == bytesRead)
throw new SocketException(System.Convert.ToInt32(SocketError.Shutdown));
throw new SocketException(Convert.ToInt32(SocketError.Shutdown));

return bytesRead;
}

return 0;
}
catch (IOException ex) // Timeout
catch (AggregateException ex) // Timeout
{
var inner = ex.InnerException as SocketException;
if (inner?.SocketErrorCode == SocketError.TimedOut)
{
IOException? ioException = ex.InnerException as IOException;
SocketException? inner = ioException?.InnerException as SocketException;
if (inner is not null && inner.SocketErrorCode == SocketError.TimedOut) {
// Nothing read
return 0;
}
else if (inner is not null)
{

if (inner is not null) {
throw inner; //rethrow SocketException part (which we have exception logic for)
}
else
throw; //rethrow original exception

throw; //rethrow original exception
}
}

Expand Down Expand Up @@ -200,6 +202,15 @@ protected void ProcessStream()

protected void DisconnectClient()
{
_readCancellationTokenSource?.Cancel();
_readCancellationTokenSource?.Dispose();
_readCancellationTokenSource = null;

// just wait when read task will be cancelled
_currentReadTask?.ContinueWith(_ => { }).Wait(1000);
_currentReadTask?.Dispose();
_currentReadTask = null;

_stream.Close();
_tcpClient.Close();
}
Expand Down

0 comments on commit 3eb4ae2

Please sign in to comment.