Skip to content

Commit

Permalink
add timeout to websocket connection and socket heartbeat (#86)
Browse files Browse the repository at this point in the history
* add timeout to websocket connection and socket heartbeat

* added timeout to WebSocket ready future

* closing sink before assigning null to ws

* cancel heartbeat when socket is disposed
  • Loading branch information
Neelansh-ns authored Aug 13, 2024
1 parent 6fcde10 commit 25fa1da
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
31 changes: 27 additions & 4 deletions lib/src/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class PhoenixSocket {
String get endpoint => _endpoint;

/// The [Uri] containing all the parameters and options for the
/// remote connection to occue.
/// remote connection to occur.
Uri get mountPoint => _mountPoint;

/// Whether the underlying socket is connected of not.
Expand Down Expand Up @@ -210,7 +210,7 @@ class PhoenixSocket {
// Wait for the WebSocket to be ready before continuing. In case of a
// failure to connect, the future will complete with an error and will be
// caught.
await _ws!.ready;
await _ws!.ready.timeout(_options.timeout);

_socketState = SocketState.connected;

Expand All @@ -222,9 +222,20 @@ class PhoenixSocket {
} else {
throw PhoenixException();
}
} on TimeoutException catch (err, stackTrace) {
_logger.severe(
'Timed out waiting for WebSocket to be ready', err, stackTrace);
_closeSink();
_ws = null;
_socketState = SocketState.closed;
// Calling this method will trigger a reconnect
// making _shouldReconnect = false as we are manually calling _delayedReconnect()
_shouldReconnect = false;
_onSocketError(err, stackTrace);
completer.complete(_delayedReconnect());
} catch (err, stackTrace) {
_logger.severe('Raised Exception', err, stackTrace);

_closeSink();
_ws = null;
_socketState = SocketState.closed;

Expand Down Expand Up @@ -271,6 +282,7 @@ class PhoenixSocket {

_disposed = true;
_ws?.sink.close();
_cancelHeartbeat();

for (final sub in _subscriptions) {
sub.cancel();
Expand Down Expand Up @@ -455,9 +467,19 @@ class PhoenixSocket {
}

try {
await sendMessage(_heartbeatMessage());
await sendMessage(_heartbeatMessage()).timeout(_options.heartbeatTimeout);
_logger.fine('[phoenix_socket] Heartbeat completed');
return true;
} on TimeoutException catch (err, stacktrace) {
_logger.severe(
'[phoenix_socket] Heartbeat message timed out',
err,
stacktrace,
);
if (_ws != null) {
_closeSink(normalClosure, 'heartbeat timeout');
}
return false;
} on WebSocketChannelException catch (err, stacktrace) {
_logger.severe(
'[phoenix_socket] Heartbeat message failed: WebSocketChannelException',
Expand Down Expand Up @@ -552,6 +574,7 @@ class PhoenixSocket {
code: _ws?.closeCode,
);
final exc = PhoenixException(socketClosed: ev);
_closeSink();
_ws = null;

if (!_stateStreamController.isClosed) {
Expand Down
11 changes: 11 additions & 0 deletions lib/src/socket_options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class PhoenixSocketOptions {
/// The interval between heartbeat roundtrips
Duration? heartbeat,

/// The duration after which a heartbeat request
/// is considered timed out
Duration? heartbeatTimeout,

/// The list of delays between reconnection attempts.
///
/// The last duration will be repeated until it works.
Expand Down Expand Up @@ -40,6 +44,7 @@ class PhoenixSocketOptions {
}) : _timeout = timeout ?? const Duration(seconds: 10),
serializer = serializer ?? const MessageSerializer(),
_heartbeat = heartbeat ?? const Duration(seconds: 30),
_heartbeatTimeout = heartbeatTimeout ?? const Duration(seconds: 10),
assert(!(params != null && dynamicParams != null),
"Can't set both params and dynamicParams");

Expand All @@ -49,13 +54,19 @@ class PhoenixSocketOptions {

final Duration _timeout;
final Duration _heartbeat;
final Duration _heartbeatTimeout;

/// Duration after which a request is assumed to have timed out.
Duration get timeout => _timeout;

/// Duration between heartbeats
Duration get heartbeat => _heartbeat;

/// Duration after which a heartbeat request is considered timed out.
/// If the server does not respond to a heartbeat request within this
/// duration, the connection is considered lost.
Duration get heartbeatTimeout => _heartbeatTimeout;

/// Optional list of Duration between reconnect attempts
final List<Duration> reconnectDelays;

Expand Down

0 comments on commit 25fa1da

Please sign in to comment.