Skip to content

Commit

Permalink
Avoid duplicate message
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-Sindo committed Oct 17, 2023
1 parent 25c6143 commit c44d3b1
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/Microsoft.Azure.SignalR.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public static class ErrorCodes
public static class HttpClientNames
{
public const string Resilient = "Resilient";
public const string MessageResilient = "MessageResilient";
}
}
}
12 changes: 12 additions & 0 deletions src/Microsoft.Azure.SignalR.Common/Utilities/RestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public Task SendWithRetryAsync(
return SendAsyncCore(Constants.HttpClientNames.Resilient, api, httpMethod, productInfo, methodName, args, handleExpectedResponse == null ? null : response => Task.FromResult(handleExpectedResponse(response)), cancellationToken);
}

public Task SendMessageWithRetryAsync(
RestApiEndpoint api,
HttpMethod httpMethod,
string productInfo,
string? methodName = null,
object[]? args = null,
Func<HttpResponseMessage, bool>? handleExpectedResponse = null,
CancellationToken cancellationToken = default)
{
return SendAsyncCore(Constants.HttpClientNames.MessageResilient, api, httpMethod, productInfo, methodName, args, handleExpectedResponse == null ? null : response => Task.FromResult(handleExpectedResponse(response)), cancellationToken);
}

private async Task ThrowExceptionOnResponseFailureAsync(HttpResponseMessage response)
{
if (response.IsSuccessStatusCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Threading.Tasks;
Expand Down Expand Up @@ -188,9 +189,14 @@ private static IServiceCollection AddRestClientFactory(this IServiceCollection s
// Else the timeout is enforced by TimeoutHttpMessageHandler.
})
.ConfigurePrimaryHttpMessageHandler(ConfigureProxy)
.AddHttpMessageHandler(sp => ActivatorUtilities.CreateInstance<RetryHttpMessageHandler>(sp))
.AddHttpMessageHandler(sp => ActivatorUtilities.CreateInstance<RetryHttpMessageHandler>(sp, (HttpStatusCode code) => IsTransientErrorForNonMessageApi(code)))
.AddHttpMessageHandler(sp => ActivatorUtilities.CreateInstance<TimeoutHttpMessageHandler>(sp));

services
.AddHttpClient(Constants.HttpClientNames.MessageResilient, (sp, client) => client.Timeout = sp.GetRequiredService<IOptions<ServiceManagerOptions>>().Value.HttpClientTimeout)
.ConfigurePrimaryHttpMessageHandler(ConfigureProxy)
.AddHttpMessageHandler(sp => ActivatorUtilities.CreateInstance<RetryHttpMessageHandler>(sp, (HttpStatusCode code) => IsTransientErrorAndIdempotentForMessageApi(code)));

services.AddSingleton(sp =>
{
var options = sp.GetRequiredService<IOptions<ServiceManagerOptions>>().Value;
Expand All @@ -202,6 +208,14 @@ private static IServiceCollection AddRestClientFactory(this IServiceCollection s
return services;

static HttpMessageHandler ConfigureProxy(IServiceProvider sp) => new HttpClientHandler() { Proxy = sp.GetRequiredService<IOptions<ServiceManagerOptions>>().Value.Proxy };

static bool IsTransientErrorAndIdempotentForMessageApi(HttpStatusCode code) =>
// Runtime returns 500 for timeout errors too, to avoid duplicate message, we exclude 500 here.
code > HttpStatusCode.InternalServerError;

static bool IsTransientErrorForNonMessageApi(HttpStatusCode code) =>
code >= HttpStatusCode.InternalServerError ||
code == HttpStatusCode.RequestTimeout;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ namespace Microsoft.Azure.SignalR.Management;
internal class RetryHttpMessageHandler : DelegatingHandler
{
private readonly IBackOffPolicy _retryDelayProvider;
private readonly Func<HttpStatusCode, bool> _isTransientError;

public RetryHttpMessageHandler(IBackOffPolicy retryDelayProvider)
public RetryHttpMessageHandler(IBackOffPolicy retryDelayProvider, Func<HttpStatusCode, bool> transientErrorPredicate)
{
_retryDelayProvider = retryDelayProvider;
_isTransientError = transientErrorPredicate;
}

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
Expand All @@ -32,8 +34,9 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
try
{
var response = await base.SendAsync(request, cancellationToken);
if (IsTransientError(response.StatusCode))
if (_isTransientError(response.StatusCode))
{

var innerException = new HttpRequestException(
$"Response status code does not indicate success: {(int)response.StatusCode} ({response.ReasonPhrase})");
ex = new AzureSignalRRuntimeException(response.RequestMessage?.RequestUri?.ToString(), innerException);
Expand Down Expand Up @@ -70,10 +73,4 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
await Task.Delay(delays.Current, cancellationToken);
} while (true);
}

private static bool IsTransientError(HttpStatusCode code)
{
return code >= HttpStatusCode.InternalServerError ||
code == HttpStatusCode.RequestTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public override async Task SendAllExceptAsync(string methodName, object[] args,
}

var api = await _restApiProvider.GetBroadcastEndpointAsync(_appName, _hubName, excluded: excludedConnectionIds);
await _restClient.SendWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
await _restClient.SendMessageWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
}

public override async Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
Expand All @@ -116,7 +116,7 @@ public override async Task SendConnectionAsync(string connectionId, string metho
}

var api = await _restApiProvider.GetSendToConnectionEndpointAsync(_appName, _hubName, connectionId);
await _restClient.SendWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
await _restClient.SendMessageWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
}

public override async Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default)
Expand All @@ -142,7 +142,7 @@ public override async Task SendGroupExceptAsync(string groupName, string methodN
}

var api = await _restApiProvider.GetSendToGroupEndpointAsync(_appName, _hubName, groupName, excluded: excludedConnectionIds);
await _restClient.SendWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
await _restClient.SendMessageWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
}

public override async Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object[] args, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -173,7 +173,7 @@ public override async Task SendUserAsync(string userId, string methodName, objec
}

var api = await _restApiProvider.GetSendToUserEndpointAsync(_appName, _hubName, userId);
await _restClient.SendWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
await _restClient.SendMessageWithRetryAsync(api, HttpMethod.Post, _productInfo, methodName, args, handleExpectedResponse: null, cancellationToken: cancellationToken);
}

public override async Task SendUsersAsync(IReadOnlyList<string> userIds, string methodName, object[] args, CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,23 @@ public async Task CustomizeHttpClientTimeoutTestAsync()
o.ConnectionString = FakeEndpointUtils.GetFakeConnectionString(1).Single();
o.HttpClientTimeout = TimeSpan.FromSeconds(1);
})
.ConfigureServices(services => services.AddHttpClient(Constants.HttpClientNames.Resilient).AddHttpMessageHandler(sp => new WaitInfinitelyHandler()))
.ConfigureServices(services =>
{
services.AddHttpClient(Constants.HttpClientNames.MessageResilient).AddHttpMessageHandler(sp => new WaitInfinitelyHandler());
services.AddHttpClient(Constants.HttpClientNames.Resilient).AddHttpMessageHandler(sp => new WaitInfinitelyHandler());
})
.BuildServiceManager();
var requestStartTime = DateTime.UtcNow;
var serviceHubContext = await serviceManager.CreateHubContextAsync("hub", default);
await Assert.ThrowsAsync<TaskCanceledException>(() => serviceHubContext.Clients.All.SendCoreAsync("method", null));
await TestCoreAsync(() => serviceHubContext.Clients.All.SendCoreAsync("method", null));
await TestCoreAsync(() => serviceHubContext.ClientManager.CloseConnectionAsync("connectionId"));
}

static async Task TestCoreAsync(Func<Task> testAction)
{
var requestStartTime = DateTime.UtcNow;
await Assert.ThrowsAsync<TaskCanceledException>(testAction);
var elapsed = DateTime.UtcNow - requestStartTime;
_outputHelper.WriteLine($"Request elapsed time: {elapsed.Ticks}");
// Don't know why, the elapsed time sometimes is shorter than 1 second, but it should be close to 1 second.
Assert.True(elapsed >= TimeSpan.FromSeconds(0.8));
Assert.True(elapsed < TimeSpan.FromSeconds(1.2));
Expand Down
Loading

0 comments on commit c44d3b1

Please sign in to comment.