Skip to content

Commit

Permalink
Added access to ByteStreamHandler and MessageRecognizer of MessageCha…
Browse files Browse the repository at this point in the history
…nnel
  • Loading branch information
RolandKoenig committed Sep 11, 2020
1 parent 95d0820 commit 84abd33
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
25 changes: 25 additions & 0 deletions MessageCommunicator/MessageChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public IMessageReceiveHandler? ReceiveHandler
set => _messageRecognizer.ReceiveHandler = value;
}

/// <summary>
/// Access to internal objects.
/// Be careful when using them, wrong method calls can cause unexpected state!
/// </summary>
public MessageChannelInternals Internals { get; }

public MessageChannel(
ByteStreamHandlerSettings byteStreamHandlerSettings,
MessageRecognizerSettings messageRecognizerSettings,
Expand All @@ -43,6 +49,8 @@ public MessageChannel(
_messageRecognizer.ByteStreamHandler = _byteStreamHandler;

this.ReceiveHandler = receiveHandler;

this.Internals = new MessageChannelInternals(this);
}

public MessageChannel(
Expand Down Expand Up @@ -90,5 +98,22 @@ public Task StopAsync()
{
return _byteStreamHandler.StopAsync();
}

//*********************************************************************
//*********************************************************************
//*********************************************************************
public class MessageChannelInternals
{
private MessageChannel _owner;

public MessageChannelInternals(MessageChannel owner)
{
_owner = owner;
}

public IByteStreamHandler ByteStreamHandler => _owner._byteStreamHandler;

public IMessageRecognizer MessageRecognizer => _owner._messageRecognizer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public class TcpPassiveByteStreamHandler : TcpByteStreamHandler

public ushort ListeningPort { get; }

/// <summary>
/// Gets the true listening port in case ListeningPort is set to 0.
/// </summary>
public ushort ActualListeningPort { get; private set; }

public override bool IsRunning => _isRunning;

/// <inheritdoc />
Expand Down Expand Up @@ -51,6 +56,7 @@ internal TcpPassiveByteStreamHandler(

this.ListeningIPAddress = listeningIPAddress;
this.ListeningPort = listeningPort;
this.ActualListeningPort = listeningPort;
}

/// <inheritdoc />
Expand Down Expand Up @@ -122,11 +128,11 @@ private async void RunConnectionMainLoop(int loopId)
CancellationTokenSource? lastCancelTokenSource = null;
TcpListener? tcpListener = null;
var reconnectErrorCount = 0;
var currentListenerPort = this.ListeningPort;
while(loopId == _runningLoopCounter)
{
if (tcpListener == null)
{
this.ActualListeningPort = this.ListeningPort;
try
{
if (this.IsLoggerSet)
Expand All @@ -137,7 +143,7 @@ private async void RunConnectionMainLoop(int loopId)
}
tcpListener = new TcpListener(this.ListeningIPAddress, this.ListeningPort);
tcpListener.Start();
currentListenerPort = (ushort)((IPEndPoint)tcpListener.LocalEndpoint).Port;
this.ActualListeningPort = (ushort)((IPEndPoint)tcpListener.LocalEndpoint).Port;

reconnectErrorCount = 0;
_currentListener = tcpListener;
Expand All @@ -146,7 +152,7 @@ private async void RunConnectionMainLoop(int loopId)
{
this.Log(
LoggingMessageType.Info,
StringBuffer.Format("TcpListener created for port {0}", currentListenerPort));
StringBuffer.Format("TcpListener created for port {0}", this.ActualListeningPort));
}
}
catch (Exception ex)
Expand All @@ -155,7 +161,7 @@ private async void RunConnectionMainLoop(int loopId)
{
this.Log(
LoggingMessageType.Error,
StringBuffer.Format("Error while creating TcpListener for port {0}: {1}", currentListenerPort, ex.Message),
StringBuffer.Format("Error while creating TcpListener for port {0}: {1}", this.ActualListeningPort, ex.Message),
exception: ex);
}

Expand All @@ -182,7 +188,7 @@ await this.WaitByReconnectWaitTimeAsync(reconnectErrorCount)
this.Log(
LoggingMessageType.Info,
StringBuffer.Format("Listening for incoming connections on port {0}...",
currentListenerPort));
this.ActualListeningPort));
}

actTcpClient = await tcpListener.AcceptTcpClientAsync()
Expand All @@ -196,7 +202,7 @@ await this.WaitByReconnectWaitTimeAsync(reconnectErrorCount)
LoggingMessageType.Info,
StringBuffer.Format(
"Got new connection on listening port {0}. Connection established between {1} and {2}",
currentListenerPort, actLocalEndPoint.ToString(), actPartnerEndPoint.ToString()));
this.ActualListeningPort, actLocalEndPoint.ToString(), actPartnerEndPoint.ToString()));
}
}
catch (ObjectDisposedException)
Expand All @@ -212,7 +218,7 @@ await this.WaitByReconnectWaitTimeAsync(reconnectErrorCount)
{
this.Log(
LoggingMessageType.Error,
StringBuffer.Format("Error while listening for incoming connections on port {0}: {1}", currentListenerPort, ex.Message),
StringBuffer.Format("Error while listening for incoming connections on port {0}: {1}", this.ActualListeningPort, ex.Message),
exception: ex);
}

Expand Down

0 comments on commit 84abd33

Please sign in to comment.