From eca4956a3805e5967b7cbae09469db64460a0144 Mon Sep 17 00:00:00 2001 From: Neelansh Date: Thu, 25 Jul 2024 03:10:02 +0530 Subject: [PATCH 1/4] add timeout to websocket connection and socket heartbeat --- lib/src/socket.dart | 14 ++++++++++++-- lib/src/socket_options.dart | 11 +++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/lib/src/socket.dart b/lib/src/socket.dart index f2adf64..0e05e78 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -210,7 +210,7 @@ class PhoenixSocket { // Wait for the WebSocket to be ready before continuing. In case of a // failure to connect, the future will complete with an error and will be // caught. - await _ws!.ready; + await _ws!.ready.timeout(_options.timeout); _socketState = SocketState.connected; @@ -455,9 +455,19 @@ class PhoenixSocket { } try { - await sendMessage(_heartbeatMessage()); + await sendMessage(_heartbeatMessage()).timeout(_options.heartbeatTimeout); _logger.fine('[phoenix_socket] Heartbeat completed'); return true; + } on TimeoutException catch (err, stacktrace) { + _logger.severe( + '[phoenix_socket] Heartbeat message timed out', + err, + stacktrace, + ); + if (_ws != null) { + _closeSink(normalClosure, 'heartbeat timeout'); + } + return false; } on WebSocketChannelException catch (err, stacktrace) { _logger.severe( '[phoenix_socket] Heartbeat message failed: WebSocketChannelException', diff --git a/lib/src/socket_options.dart b/lib/src/socket_options.dart index 778344e..c549d9a 100644 --- a/lib/src/socket_options.dart +++ b/lib/src/socket_options.dart @@ -13,6 +13,10 @@ class PhoenixSocketOptions { /// The interval between heartbeat roundtrips Duration? heartbeat, + /// The duration after which a heartbeat request + /// is considered timed out + Duration? heartbeatTimeout, + /// The list of delays between reconnection attempts. /// /// The last duration will be repeated until it works. @@ -40,6 +44,7 @@ class PhoenixSocketOptions { }) : _timeout = timeout ?? const Duration(seconds: 10), serializer = serializer ?? const MessageSerializer(), _heartbeat = heartbeat ?? const Duration(seconds: 30), + _heartbeatTimeout = heartbeatTimeout ?? const Duration(seconds: 10), assert(!(params != null && dynamicParams != null), "Can't set both params and dynamicParams"); @@ -49,6 +54,7 @@ class PhoenixSocketOptions { final Duration _timeout; final Duration _heartbeat; + final Duration _heartbeatTimeout; /// Duration after which a request is assumed to have timed out. Duration get timeout => _timeout; @@ -56,6 +62,11 @@ class PhoenixSocketOptions { /// Duration between heartbeats Duration get heartbeat => _heartbeat; + /// Duration after which a heartbeat request is considered timed out. + /// If the server does not respond to a heartbeat request within this + /// duration, the connection is considered lost. + Duration get heartbeatTimeout => _heartbeatTimeout; + /// Optional list of Duration between reconnect attempts final List reconnectDelays; From a4f0d1f3b09418a53ce431c0796ccef6bfc26674 Mon Sep 17 00:00:00 2001 From: Neelansh Date: Thu, 25 Jul 2024 20:03:30 +0530 Subject: [PATCH 2/4] added timeout to WebSocket ready future --- lib/src/socket.dart | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/src/socket.dart b/lib/src/socket.dart index 0e05e78..3b03bca 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -164,7 +164,7 @@ class PhoenixSocket { String get endpoint => _endpoint; /// The [Uri] containing all the parameters and options for the - /// remote connection to occue. + /// remote connection to occur. Uri get mountPoint => _mountPoint; /// Whether the underlying socket is connected of not. @@ -222,6 +222,17 @@ class PhoenixSocket { } else { throw PhoenixException(); } + } on TimeoutException catch (err, stackTrace) { + _logger.severe( + 'Timed out waiting for WebSocket to be ready', err, stackTrace); + _closeSink(); + _ws = null; + _socketState = SocketState.closed; + // Calling this method will trigger a reconnect + // making _shouldReconnect = false as we are manually calling _delayedReconnect() + _shouldReconnect = false; + _onSocketError(err, stackTrace); + completer.complete(_delayedReconnect()); } catch (err, stackTrace) { _logger.severe('Raised Exception', err, stackTrace); From 3b6b30085f15ba4eea26ebff2d47cd28e35ca5c3 Mon Sep 17 00:00:00 2001 From: Neelansh Date: Thu, 25 Jul 2024 20:04:05 +0530 Subject: [PATCH 3/4] closing sink before assigning null to ws --- lib/src/socket.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/src/socket.dart b/lib/src/socket.dart index 3b03bca..5183f5e 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -235,7 +235,7 @@ class PhoenixSocket { completer.complete(_delayedReconnect()); } catch (err, stackTrace) { _logger.severe('Raised Exception', err, stackTrace); - + _closeSink(); _ws = null; _socketState = SocketState.closed; @@ -573,6 +573,7 @@ class PhoenixSocket { code: _ws?.closeCode, ); final exc = PhoenixException(socketClosed: ev); + _closeSink(); _ws = null; if (!_stateStreamController.isClosed) { From 60820dcd7de651cf2b7429825f8fad869e9b3789 Mon Sep 17 00:00:00 2001 From: Neelansh Date: Thu, 25 Jul 2024 20:04:22 +0530 Subject: [PATCH 4/4] cancel heartbeat when socket is disposed --- lib/src/socket.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/socket.dart b/lib/src/socket.dart index 5183f5e..6633893 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -282,6 +282,7 @@ class PhoenixSocket { _disposed = true; _ws?.sink.close(); + _cancelHeartbeat(); for (final sub in _subscriptions) { sub.cancel();