Skip to content

Commit

Permalink
reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
terencefan committed Oct 12, 2023
1 parent 210f581 commit 3de520d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ internal class ClientConnectionContext : ConnectionContext,
IConnectionStatFeature
{
private const int WritingState = 1;

private const int CompletedState = 2;

private const int IdleState = 0;

private static readonly PipeOptions DefaultPipeOptions = new PipeOptions(pauseWriterThreshold: 0,
Expand All @@ -63,12 +65,13 @@ internal class ClientConnectionContext : ConnectionContext,
useSynchronizationContext: false);

private readonly TaskCompletionSource<object> _connectionEndTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly CancellationTokenSource _abortOutgoingCts = new CancellationTokenSource();

private int _connectionState = IdleState;
private readonly CancellationTokenSource _abortOutgoingCts = new CancellationTokenSource();

private readonly object _heartbeatLock = new object();

private int _connectionState = IdleState;

private List<(Action<object> handler, object state)> _heartbeatHandlers;

private volatile bool _abortOnClose = true;
Expand Down Expand Up @@ -175,6 +178,7 @@ public async Task WriteMessageAsync(ReadOnlySequence<byte> payload)
{
_lastMessageReceivedAt = DateTime.UtcNow.Ticks;
_receivedBytes += payload.Length;

// Start write
await WriteMessageAsyncCore(payload);
}
Expand Down Expand Up @@ -237,6 +241,53 @@ public void CancelOutgoing(int millisecondsDelay = 0)
}
}

internal static bool TryGetRemoteIpAddress(IHeaderDictionary headers, out IPAddress address)
{
var forwardedFor = headers.GetCommaSeparatedValues("X-Forwarded-For");
if (forwardedFor.Length > 0 && IPAddress.TryParse(forwardedFor[0], out address))
{
return true;
}
address = null;
return false;
}

private static void ProcessQuery(string queryString, out string originalPath)
{
originalPath = string.Empty;
var query = QueryHelpers.ParseNullableQuery(queryString);
if (query == null)
{
return;
}

if (query.TryGetValue(Constants.QueryParameter.RequestCulture, out var culture))
{
SetCurrentThreadCulture(culture.FirstOrDefault());
}
if (query.TryGetValue(Constants.QueryParameter.OriginalPath, out var path))
{
originalPath = path.FirstOrDefault();
}
}

private static void SetCurrentThreadCulture(string cultureName)
{
if (!string.IsNullOrEmpty(cultureName))
{
try
{
var requestCulture = new RequestCulture(cultureName);
CultureInfo.CurrentCulture = requestCulture.Culture;
CultureInfo.CurrentUICulture = requestCulture.UICulture;
}
catch (Exception)
{
// skip invalid culture, normal won't hit.
}
}
}

private FeatureCollection BuildFeatures(OpenConnectionMessage serviceMessage)
{
var features = new FeatureCollection();
Expand Down Expand Up @@ -311,52 +362,5 @@ private string GetInstanceId(IDictionary<string, StringValues> header)
}
return string.Empty;
}

internal static bool TryGetRemoteIpAddress(IHeaderDictionary headers, out IPAddress address)
{
var forwardedFor = headers.GetCommaSeparatedValues("X-Forwarded-For");
if (forwardedFor.Length > 0 && IPAddress.TryParse(forwardedFor[0], out address))
{
return true;
}
address = null;
return false;
}

private static void ProcessQuery(string queryString, out string originalPath)
{
originalPath = string.Empty;
var query = QueryHelpers.ParseNullableQuery(queryString);
if (query == null)
{
return;
}

if (query.TryGetValue(Constants.QueryParameter.RequestCulture, out var culture))
{
SetCurrentThreadCulture(culture.FirstOrDefault());
}
if (query.TryGetValue(Constants.QueryParameter.OriginalPath, out var path))
{
originalPath = path.FirstOrDefault();
}
}

private static void SetCurrentThreadCulture(string cultureName)
{
if (!string.IsNullOrEmpty(cultureName))
{
try
{
var requestCulture = new RequestCulture(cultureName);
CultureInfo.CurrentCulture = requestCulture.Culture;
CultureInfo.CurrentUICulture = requestCulture.UICulture;
}
catch (Exception)
{
// skip invalid culture, normal won't hit.
}
}
}
}
}
20 changes: 14 additions & 6 deletions src/Microsoft.Azure.SignalR/ServerConnections/ServiceConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@ internal partial class ServiceConnection : ServiceConnectionBase
{
private const int DefaultCloseTimeoutMilliseconds = 30000;

private const string ClientConnectionCountInHub = "#clientInHub";

private const string ClientConnectionCountInServiceConnection = "#client";

// Fix issue: https://github.com/Azure/azure-signalr/issues/198
// .NET Framework has restriction about reserved string as the header name like "User-Agent"
private static readonly Dictionary<string, string> CustomHeader = new Dictionary<string, string> { { Constants.AsrsUserAgent, ProductInfo.GetProductInfo() } };

private const string ClientConnectionCountInHub = "#clientInHub";
private const string ClientConnectionCountInServiceConnection = "#client";

private readonly IConnectionFactory _connectionFactory;

private readonly IClientConnectionFactory _clientConnectionFactory;

private readonly int _closeTimeOutMilliseconds;

private readonly IClientConnectionManager _clientConnectionManager;

private readonly ConcurrentDictionary<string, string> _connectionIds =
Expand Down Expand Up @@ -155,10 +159,12 @@ protected override Task OnClientDisconnectedAsync(CloseConnectionMessage closeCo
{
context.AbortOnClose = false;
context.Features.Set<IConnectionMigrationFeature>(new ConnectionMigrationFeature(ServerId, to));

// We have to prevent SignalR `{type: 7}` (close message) from reaching our client while doing migration.
// Since all data messages will be sent to `ServiceConnection` directly.
// We can simply ignore all messages came from the application.
context.CancelOutgoing();

// The close connection message must be the last message, so we could complete the pipe.
context.CompleteIncoming();
}
Expand Down Expand Up @@ -211,14 +217,15 @@ protected override Task OnPingMessageAsync(PingMessage pingMessage)
if (RuntimeServicePingMessage.TryGetOffline(pingMessage, out var instanceId))
{
_clientInvocationManager.Caller.CleanupInvocationsByInstance(instanceId);

// Router invocations will be cleanup by its `CleanupInvocationsByConnection`, which is called by `RemoveClientConnection`.
// In `base.OnPingMessageAsync`, `CleanupClientConnections(instanceId)` will finally execute `RemoveClientConnection` for each ConnectionId.
// In `base.OnPingMessageAsync`, `CleanupClientConnections(instanceId)` will finally execute `RemoveClientConnection` for each ConnectionId.
}
#endif
return base.OnPingMessageAsync(pingMessage);
}

private async Task ProcessClientConnectionAsync(ClientConnectionContext connection)
private async Task ProcessClientConnectionAsync(ClientConnectionContext connection)
{
try
{
Expand Down Expand Up @@ -276,6 +283,7 @@ private async Task ProcessClientConnectionAsync(ClientConnectionContext connecti
// Inform the Service that we will remove the client because SignalR told us it is disconnected.
var serviceMessage =
new CloseConnectionMessage(connection.ConnectionId, errorMessage: exception?.Message);

// when it fails, it means the underlying connection is dropped
// service is responsible for closing the client connections in this case and there is no need to throw
await SafeWriteAsync(serviceMessage);
Expand Down Expand Up @@ -494,4 +502,4 @@ private Task OnErrorCompletionAsync(ErrorCompletionMessage errorCompletionMessag
return Task.CompletedTask;
}
}
}
}

0 comments on commit 3de520d

Please sign in to comment.