diff --git a/lib/src/socket.dart b/lib/src/socket.dart index f2adf64..6633893 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -164,7 +164,7 @@ class PhoenixSocket { String get endpoint => _endpoint; /// The [Uri] containing all the parameters and options for the - /// remote connection to occue. + /// remote connection to occur. Uri get mountPoint => _mountPoint; /// Whether the underlying socket is connected of not. @@ -210,7 +210,7 @@ class PhoenixSocket { // Wait for the WebSocket to be ready before continuing. In case of a // failure to connect, the future will complete with an error and will be // caught. - await _ws!.ready; + await _ws!.ready.timeout(_options.timeout); _socketState = SocketState.connected; @@ -222,9 +222,20 @@ class PhoenixSocket { } else { throw PhoenixException(); } + } on TimeoutException catch (err, stackTrace) { + _logger.severe( + 'Timed out waiting for WebSocket to be ready', err, stackTrace); + _closeSink(); + _ws = null; + _socketState = SocketState.closed; + // Calling this method will trigger a reconnect + // making _shouldReconnect = false as we are manually calling _delayedReconnect() + _shouldReconnect = false; + _onSocketError(err, stackTrace); + completer.complete(_delayedReconnect()); } catch (err, stackTrace) { _logger.severe('Raised Exception', err, stackTrace); - + _closeSink(); _ws = null; _socketState = SocketState.closed; @@ -271,6 +282,7 @@ class PhoenixSocket { _disposed = true; _ws?.sink.close(); + _cancelHeartbeat(); for (final sub in _subscriptions) { sub.cancel(); @@ -455,9 +467,19 @@ class PhoenixSocket { } try { - await sendMessage(_heartbeatMessage()); + await sendMessage(_heartbeatMessage()).timeout(_options.heartbeatTimeout); _logger.fine('[phoenix_socket] Heartbeat completed'); return true; + } on TimeoutException catch (err, stacktrace) { + _logger.severe( + '[phoenix_socket] Heartbeat message timed out', + err, + stacktrace, + ); + if (_ws != null) { + _closeSink(normalClosure, 'heartbeat timeout'); + } + return false; } on WebSocketChannelException catch (err, stacktrace) { _logger.severe( '[phoenix_socket] Heartbeat message failed: WebSocketChannelException', @@ -552,6 +574,7 @@ class PhoenixSocket { code: _ws?.closeCode, ); final exc = PhoenixException(socketClosed: ev); + _closeSink(); _ws = null; if (!_stateStreamController.isClosed) { diff --git a/lib/src/socket_options.dart b/lib/src/socket_options.dart index 778344e..c549d9a 100644 --- a/lib/src/socket_options.dart +++ b/lib/src/socket_options.dart @@ -13,6 +13,10 @@ class PhoenixSocketOptions { /// The interval between heartbeat roundtrips Duration? heartbeat, + /// The duration after which a heartbeat request + /// is considered timed out + Duration? heartbeatTimeout, + /// The list of delays between reconnection attempts. /// /// The last duration will be repeated until it works. @@ -40,6 +44,7 @@ class PhoenixSocketOptions { }) : _timeout = timeout ?? const Duration(seconds: 10), serializer = serializer ?? const MessageSerializer(), _heartbeat = heartbeat ?? const Duration(seconds: 30), + _heartbeatTimeout = heartbeatTimeout ?? const Duration(seconds: 10), assert(!(params != null && dynamicParams != null), "Can't set both params and dynamicParams"); @@ -49,6 +54,7 @@ class PhoenixSocketOptions { final Duration _timeout; final Duration _heartbeat; + final Duration _heartbeatTimeout; /// Duration after which a request is assumed to have timed out. Duration get timeout => _timeout; @@ -56,6 +62,11 @@ class PhoenixSocketOptions { /// Duration between heartbeats Duration get heartbeat => _heartbeat; + /// Duration after which a heartbeat request is considered timed out. + /// If the server does not respond to a heartbeat request within this + /// duration, the connection is considered lost. + Duration get heartbeatTimeout => _heartbeatTimeout; + /// Optional list of Duration between reconnect attempts final List reconnectDelays;