Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated socket reconnect #72

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions src/Nakama/ISocket.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Nakama Authors
// Copyright 2021 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Nakama
Expand Down Expand Up @@ -182,9 +183,11 @@ Task<IPartyMatchmakerTicket> AddMatchmakerPartyAsync(string partyId, string quer
/// <param name="appearOnline">If the user who appear online to other users.</param>
/// <param name="connectTimeout">The time allowed for the socket connection to be established.</param>
/// <param name="langTag">The language tag of the user on the connected socket.</param>
/// <param name="retryConfiguration">The <see cref="RetryConfiguration"/> to use if the socket connection fails due to a transient network error.</param>
/// <param name="canceller">The <see cref="CancellationTokenSource"/> that can be used to cancel the connection request while mid-flight.</param>
/// <returns>A task to represent the asynchronous operation.</returns>
Task ConnectAsync(ISession session, bool appearOnline = false,
int connectTimeout = Socket.DefaultConnectTimeout, string langTag = "en");
int connectTimeout = Socket.DefaultConnectTimeout, string langTag = "en", RetryConfiguration retryConfiguration = null, CancellationTokenSource canceller = null);

/// <summary>
/// Create a multiplayer match on the server.
Expand Down
3 changes: 2 additions & 1 deletion src/Nakama/ISocketAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public interface ISocketAdapter : IDisposable
/// </summary>
/// <param name="uri">The URI of the server.</param>
/// <param name="timeout">The timeout for the connect attempt on the socket.</param>
void Connect(Uri uri, int timeout);
/// <param name="canceller">The <see cref="CancellationTokenSource"/> that can be used to cancel the connection request while mid-flight.</param>
void Connect(Uri uri, int timeout, CancellationTokenSource canceller);

/// <summary>
/// Send data to the server with an asynchronous operation.
Expand Down
5 changes: 4 additions & 1 deletion src/Nakama/RetryInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Net.Http;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace Nakama
Expand Down Expand Up @@ -78,7 +79,9 @@ public async Task InvokeWithRetry(Func<Task> request, RetryHistory history)
/// </summary>
private bool IsTransientException(Exception e)
{
return (e is ApiResponseException apiException && apiException.StatusCode >= 500) || e is HttpRequestException;
return (e is ApiResponseException apiException && apiException.StatusCode >= 500)
|| e is HttpRequestException
|| e is System.IO.IOException; // thrown if error during socket handshake (AsyncProtocolRequest)
}

private Retry CreateNewRetry(RetryHistory history)
Expand Down
110 changes: 80 additions & 30 deletions src/Nakama/Socket.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Nakama Authors
// Copyright 2021 The Nakama Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,15 +103,24 @@ public class Socket : ISocket
public ILogger Logger { get; set; }

private readonly ISocketAdapter _adapter;
private readonly bool _autoReconnect;
private readonly Uri _baseUri;
private readonly Dictionary<string, TaskCompletionSource<WebSocketMessageEnvelope>> _responses;

private Object _lockObj = new Object();

private readonly RetryConfiguration _defaultConnectRetryConfig = new RetryConfiguration(
baseDelay: 500,
jitter: RetryJitter.FullJitter,
listener: null,
maxRetries: 4);

private readonly RetryInvoker _retryInvoker = new RetryInvoker();

/// <summary>
/// A new socket with default options.
/// </summary>
public Socket() : this(Client.DefaultScheme, Client.DefaultHost, Client.DefaultPort, new WebSocketAdapter())
public Socket() : this(Client.DefaultScheme, Client.DefaultHost, Client.DefaultPort, new WebSocketAdapter(), autoReconnect: false)
{
}

Expand All @@ -120,7 +129,7 @@ public class Socket : ISocket
/// </summary>
/// <param name="adapter">The adapter for use with the socket.</param>
public Socket(ISocketAdapter adapter) : this(Client.DefaultScheme, Client.DefaultHost, Client.DefaultPort,
adapter)
adapter, autoReconnect: false)
{
}

Expand All @@ -131,10 +140,12 @@ public Socket(ISocketAdapter adapter) : this(Client.DefaultScheme, Client.Defaul
/// <param name="host">The host address of the server.</param>
/// <param name="port">The port number of the server.</param>
/// <param name="adapter">The adapter for use with the socket.</param>
public Socket(string scheme, string host, int port, ISocketAdapter adapter)
/// <param name="autoReconnect">Whether or not the socket should reconnect upon a transient disconnect.</param>
public Socket(string scheme, string host, int port, ISocketAdapter adapter, bool autoReconnect)
{
Logger = NullLogger.Instance;
_adapter = adapter;
_autoReconnect = autoReconnect;
_baseUri = new UriBuilder(scheme, host, port).Uri;
_responses = new Dictionary<string, TaskCompletionSource<WebSocketMessageEnvelope>>();

Expand All @@ -159,7 +170,6 @@ public Socket(string scheme, string host, int port, ISocketAdapter adapter)
{
lock (_lockObj)
{

foreach (var response in _responses)
{
response.Value.TrySetCanceled();
Expand All @@ -175,6 +185,42 @@ public Socket(string scheme, string host, int port, ISocketAdapter adapter)
_adapter.Received += ReceivedMessage;
}

/// <inheritdoc cref="ConnectAsync"/>
public async Task ConnectAsync(ISession session, bool appearOnline = false,
int connectTimeoutSec = DefaultConnectTimeout, string langTag = "en", RetryConfiguration retryConfiguration = null, CancellationTokenSource canceller = null)
{
var uri = new UriBuilder(_baseUri)
{
Path = "/ws",
Query = $"lang={langTag}&status={appearOnline}&token={session.AuthToken}"
}.Uri;


var history = new RetryHistory(retryConfiguration ?? _defaultConnectRetryConfig, userCancelToken: canceller?.Token);

Task connect = _retryInvoker.InvokeWithRetry(() => ConnectAsync(uri, connectTimeoutSec, canceller), history);

await connect;

if (!connect.IsFaulted)
{
Action<Exception> retryCallback = e =>
{
if (!_adapter.IsConnected)
{
// thrown by TCP NetworkStream if service is not reachable for WebSockets
if (e is System.IO.IOException)
{
Task.Run(() => ConnectAsync(uri, connectTimeoutSec, canceller));
}
}
};

_adapter.ReceivedError += retryCallback;
_adapter.Closed += () => _adapter.ReceivedError -= retryCallback;
}
}

/// <inheritdoc cref="AcceptPartyMemberAsync"/>
public Task AcceptPartyMemberAsync(string partyId, IUserPresence presence)
{
Expand Down Expand Up @@ -239,30 +285,6 @@ public Task CloseAsync()
return Task.CompletedTask;
}

/// <inheritdoc cref="ConnectAsync"/>
public Task ConnectAsync(ISession session, bool appearOnline = false,
int connectTimeoutSec = DefaultConnectTimeout, string langTag = "en")
{
var tcs = new TaskCompletionSource<bool>();
Action callback = () => tcs.TrySetResult(true);
Action<Exception> errback = e => tcs.TrySetException(e);
_adapter.Connected += callback;
_adapter.ReceivedError += errback;
var uri = new UriBuilder(_baseUri)
{
Path = "/ws",
Query = $"lang={langTag}&status={appearOnline}&token={session.AuthToken}"
}.Uri;
tcs.Task.ContinueWith(_ =>
{
// NOTE Not fired in Unity WebGL builds.
_adapter.Connected -= callback;
_adapter.ReceivedError -= errback;
});
_adapter.Connect(uri, connectTimeoutSec);
return tcs.Task;
}

/// <inheritdoc cref="ClosePartyAsync"/>
public Task ClosePartyAsync(string partyId)
{
Expand Down Expand Up @@ -765,7 +787,7 @@ public static ISocket From(IClient client, ISocketAdapter adapter)
{
var scheme = client.Scheme.ToLower().Equals("http") ? "ws" : "wss";
// TODO improve how logger is passed into socket object.
return new Socket(scheme, client.Host, client.Port, adapter) {Logger = (client as Client)?.Logger};
return new Socket(scheme, client.Host, client.Port, adapter, autoReconnect: false) {Logger = (client as Client)?.Logger};
}

private void ReceivedMessage(ArraySegment<byte> buffer)
Expand Down Expand Up @@ -929,5 +951,33 @@ private static List<UserPresence> BuildPresenceList(IEnumerable<IUserPresence> p

return presenceList;
}

// wrap the socket adapter events into a task.
private Task ConnectAsync(Uri uri, int timeout, CancellationTokenSource canceller)
{
var tcs = new TaskCompletionSource<bool>();

Action onConnectSuccess = () => {
tcs.TrySetResult(true);
};

Action<Exception> onConnectFailure = e => {
tcs.TrySetException(e);
};

_adapter.Connected += onConnectSuccess;
_adapter.ReceivedError += onConnectFailure;

_adapter.Connect(uri, timeout, canceller);

tcs.Task.ContinueWith(_ =>
{
// NOTE Not fired in Unity WebGL builds.
_adapter.Connected -= onConnectSuccess;
_adapter.ReceivedError -= onConnectFailure;
});

return tcs.Task;
}
}
}
19 changes: 17 additions & 2 deletions src/Nakama/WebSocketAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Threading;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void Close()
}

/// <inheritdoc cref="ISocketAdapter.Connect"/>
public async void Connect(Uri uri, int timeout)
public async void Connect(Uri uri, int timeout, CancellationTokenSource userCanceller)
{
if (_webSocket != null)
{
Expand All @@ -104,8 +105,22 @@ public async void Connect(Uri uri, int timeout)
var clientFactory = new WebSocketClientFactory();
try
{

// timeout cancellation token
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeout));
var lcts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationSource.Token, cts.Token);

var linkedTokens = new List<CancellationToken>();
linkedTokens.Add(cts.Token);
linkedTokens.Add(_cancellationSource.Token);

if (userCanceller != null)
{
linkedTokens.Add(userCanceller.Token);
}

// socket connection cancellation token
var lcts = CancellationTokenSource.CreateLinkedTokenSource(linkedTokens.ToArray());

using (_webSocket = await clientFactory.ConnectAsync(_uri, _options, lcts.Token))
{
IsConnected = true;
Expand Down
106 changes: 106 additions & 0 deletions tests/Nakama.Tests/Socket/TransientExceptionSocketAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright 2021 The Nakama Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Nakama.Tests
{
/// <summary>
/// An adapter which throws transient/retryable exceptions when a socket communicates with the server.
/// </summary>
public class TransientExceptionSocketAdapter : ISocketAdapter
{
public class NetworkSchedule
{
public TransientAdapterResponseType[] ConnectResponses { get; }
public List<Tuple<TimeSpan, TransientAdapterResponseType>> PostConnect { get; }

public NetworkSchedule(TransientAdapterResponseType[] connectResponses)
{
ConnectResponses = connectResponses;
PostConnect = new List<Tuple<TimeSpan, TransientAdapterResponseType>>();
}

public NetworkSchedule(TransientAdapterResponseType[] connect, List<Tuple<TimeSpan, TransientAdapterResponseType>> postConnect)
{
ConnectResponses = connect;
PostConnect = postConnect;
}
}

public bool IsConnected { get; private set; }
public bool IsConnecting { get; private set; }

public event Action Connected;
public event Action Closed;

public event Action<Exception> ReceivedError;
public event Action<ArraySegment<byte>> Received;

private NetworkSchedule _schedule;
private int _consecutiveConnectAttempts;

public TransientExceptionSocketAdapter(NetworkSchedule schedule)
{
_schedule = schedule;
}

public void Close()
{
_consecutiveConnectAttempts = 0;
Closed?.Invoke();
}

public async void Connect(Uri uri, int timeout, CancellationTokenSource canceller)
{
canceller?.Token.ThrowIfCancellationRequested();

if (_schedule.ConnectResponses[_consecutiveConnectAttempts] == TransientAdapterResponseType.ServerOk)
{
_consecutiveConnectAttempts++;
IsConnected = true;
Connected?.Invoke();
}
else
{
_consecutiveConnectAttempts++;
ReceivedError?.Invoke(new System.IO.IOException("Connect exception."));
}

foreach (var postConnect in _schedule.PostConnect)
{
await Task.Delay(postConnect.Item1);

if (postConnect.Item2 == TransientAdapterResponseType.TransientError)
{
IsConnected = false;
ReceivedError?.Invoke(new System.IO.IOException("Post connect exception."));
Close();
}
}
}

public void Dispose() {}

public void Send(ArraySegment<byte> buffer, CancellationToken cancellationToken, bool reliable = true)
{
throw new NotImplementedException("This adapter cannot send messages.");
}
}
}
Loading