Skip to content

Commit

Permalink
improved WS connection
Browse files Browse the repository at this point in the history
  • Loading branch information
elad-bar committed Oct 19, 2022
1 parent 1f59bda commit dfdbbbb
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
## 2.0.13

- Add support for all interfaces but `loopback` [#76](https://github.com/elad-bar/ha-edgeos/issues/76)
- Improved WS connection management
- Improve WS connection management
- Fix WS ping message
- Change interval of ping message
- Add WS connection compression to support deflate
- Add 3 sensors for WS messages - Received, Ignored and Error

## 2.0.12
Expand Down
77 changes: 46 additions & 31 deletions custom_components/edgeos/component/api/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
from __future__ import annotations

from asyncio import sleep
import json
import logging
import re
Expand Down Expand Up @@ -102,6 +103,7 @@ async def initialize(self, config_data: ConfigData | None = None):
autoclose=True,
max_msg_size=WS_MAX_MSG_SIZE,
timeout=WS_TIMEOUT,
compress=WS_COMPRESSION_DEFLATE
) as ws:

await self.set_status(ConnectivityStatus.Connected)
Expand Down Expand Up @@ -142,11 +144,18 @@ async def async_send_heartbeat(self):
_LOGGER.debug(f"Keep alive message sent")

if self.status == ConnectivityStatus.Connected:
content = "{CLIENT_PING}"
content = {
"CLIENT_PING": "",
"SESSION_ID": self._api_session_id
}

content_str = json.dumps(content)
data = f"{len(content_str)}\n{content_str}"
data_for_log = data.replace("\n", "")

_LOGGER.debug(f"Keep alive data to be sent: {content}")
_LOGGER.debug(f"Keep alive data to be sent: {data_for_log}")

await self._ws.send_str(content)
await self._ws.send_str(data)

async def _listen(self):
_LOGGER.info(f"Starting to listen connected")
Expand All @@ -156,32 +165,31 @@ async def _listen(self):

_LOGGER.info("Subscribed to WS payloads")

async for msg in self._ws:
should_exit = True
listening = True

if msg.type in WS_CLOSING_MESSAGE:
_LOGGER.warning(f"WS Connection message: {msg.type}")
while listening and self.status == ConnectivityStatus.Connected:
async for msg in self._ws:
is_closing_type = msg.type in WS_CLOSING_MESSAGE
is_error = msg.type == aiohttp.WSMsgType.ERROR
is_closing_data = False if is_closing_type or is_error else msg.data == "close"

elif msg.type == aiohttp.WSMsgType.ERROR:
_LOGGER.warning(f"WS Error message, Description: {self._ws.exception()}")
if is_closing_type or is_error or is_closing_data:
_LOGGER.warning(
f"WS stopped listening, "
f"Message: {str(msg)}, "
f"Exception: {self._ws.exception()}"
)

error_messages = self.data.get(WS_ERROR_MESSAGES, 0)
listening = False
break

self.data[WS_ERROR_MESSAGES] = error_messages + 1

else:
if self._can_log_messages:
_LOGGER.debug(f"New message received: {str(msg)}")

should_exit = msg.data == "close"
else:
if self._can_log_messages:
_LOGGER.debug(f"Message received: {str(msg)}")

if not should_exit:
await self.parse_message(msg.data)

should_exit = self.status != ConnectivityStatus.Connected

if should_exit:
break
await sleep(1)

_LOGGER.info(f"Stop listening")

Expand Down Expand Up @@ -237,12 +245,20 @@ def _get_corrected_message(self, message):
new_message_length = len(message) - len(str(previous_message_length)) - 1

if new_message_length > previous_message_length:
_LOGGER.debug(
f"Ignored partial message, "
f"Expected {previous_message_length} chars, "
f"Provided {new_message_length}, "
f"Content: {message}"
)
if self._can_log_messages:
_LOGGER.debug(
f"Ignored partial message, "
f"Expected {previous_message_length} chars, "
f"Provided {new_message_length}, "
f"Content: {message}"
)

else:
_LOGGER.debug(
f"Ignored partial message, "
f"Expected {previous_message_length} chars, "
f"Provided {new_message_length}"
)

message = original_message

Expand All @@ -267,8 +283,9 @@ def _get_subscription_data(self):
content = json.dumps(data, separators=(STRING_COMMA, STRING_COLON))
content_length = len(content)
data = f"{content_length}\n{content}"
data_for_log = data.replace("\n", "")

_LOGGER.debug(f"Subscription data to be sent: {data}")
_LOGGER.debug(f"Subscription data to be sent: {data_for_log}")

return data

Expand All @@ -286,8 +303,6 @@ async def _message_handler(self, payload=None):
try:
if payload is not None:
for key in payload:
_LOGGER.debug(f"Running parser of {key}")

data = payload.get(key)
handler = self._ws_handlers.get(key)

Expand Down
10 changes: 6 additions & 4 deletions custom_components/edgeos/component/helpers/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

WS_MAX_MSG_SIZE = 0
WS_RECONNECT_INTERVAL = timedelta(seconds=30)
WS_TIMEOUT = timedelta(seconds=60)
WS_TIMEOUT = timedelta(minutes=1)
WS_WARNING_INTERVAL = timedelta(seconds=95)

DEFAULT_UPDATE_API_INTERVAL = timedelta(seconds=60)
WS_COMPRESSION_DEFLATE = 15

DEFAULT_UPDATE_API_INTERVAL = timedelta(minutes=1)
DEFAULT_UPDATE_ENTITIES_INTERVAL = timedelta(seconds=1)
DEFAULT_HEARTBEAT_INTERVAL = timedelta(seconds=25)
DEFAULT_CONSIDER_AWAY_INTERVAL = timedelta(seconds=180)
DEFAULT_HEARTBEAT_INTERVAL = timedelta(seconds=50)
DEFAULT_CONSIDER_AWAY_INTERVAL = timedelta(minutes=3)

STORAGE_DATA_FILE_CONFIG = "config"
STORAGE_DATA_FILE_API_DEBUG = "debug.api"
Expand Down

0 comments on commit dfdbbbb

Please sign in to comment.