Skip to content

Commit

Permalink
first step to fix dvd-dev#486 by making room to a second websocket ha…
Browse files Browse the repository at this point in the history
…ndling challenge events.
  • Loading branch information
Leicas committed Nov 26, 2024
1 parent 33169ee commit f4b24ef
Showing 1 changed file with 42 additions and 21 deletions.
63 changes: 42 additions & 21 deletions custom_components/hilo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry, api: API) -> None:
self.find_meter(self._hass)
self.entry = entry
self.devices: Devices = Devices(api)
self._websocket_reconnect_task: asyncio.Task | None = None
self._update_task: asyncio.Task | None = None
self.invocations = {0: self.subscribe_to_location}
self._websocket_reconnect_tasks: list[asyncio.Task | None] = [None, None]
self._update_task: list[asyncio.Task | None] = [None, None]
self.invocations = {0: self.subscribe_to_location, 1: self.subscribe_to_challenge}
self.hq_plan_name = entry.options.get(CONF_HQ_PLAN_NAME, DEFAULT_HQ_PLAN_NAME)
self.appreciation = entry.options.get(
CONF_APPRECIATION_PHASE, DEFAULT_APPRECIATION_PHASE
Expand Down Expand Up @@ -348,12 +348,26 @@ async def subscribe_to_location(self, inv_id: int) -> None:
[self.devices.location_id], "SubscribeToLocation", inv_id
)

@callback
async def subscribe_to_challenge(self, inv_id: int, event_id: int =0) -> None:
"""Sends the json payload to receive updates from the challenge."""
LOG.debug(f"Subscribing to challenge {event_id} at location {self.devices.location_id}")
await self._api.websocket2.async_invoke(
[event_id, self.devices.location_id], "SubscribeToChallenge", inv_id
)

@callback
async def request_status_update(self) -> None:
await self._api.websocket.send_status()
for inv_id, inv_cb in self.invocations.items():
await inv_cb(inv_id)

@callback
async def request_status_update_challenge(self) -> None:
await self._api.websocket2.send_status()
for inv_id, inv_cb in self.invocations.items():
await inv_cb(inv_id)

@callback
def _get_unknown_source_tracker(self) -> HiloDevice:
return {
Expand Down Expand Up @@ -426,8 +440,13 @@ async def async_init(self, scan_interval: int) -> None:

self._api.websocket.add_connect_callback(self.request_status_update)
self._api.websocket.add_event_callback(self.on_websocket_event)
self._websocket_reconnect_task = asyncio.create_task(
self.start_websocket_loop()
self._api.websocket2.add_connect_callback(self.request_status_update_challenge)
self._api.websocket2.add_event_callback(self.on_websocket_event)
self._websocket_reconnect_tasks[0] = asyncio.create_task(
self.start_websocket_loop(self._api.websocket, 0)
)
self._websocket_reconnect_tasks[1] = asyncio.create_task(
self.start_websocket_loop(self._api.websocket2, 1)
)
# asyncio.create_task(self._api.websocket.async_connect())

Expand All @@ -450,39 +469,41 @@ async def websocket_disconnect_listener(_: Event) -> None:
name="hilo",
update_interval=timedelta(seconds=scan_interval),
update_method=self.async_update,
)
)

async def start_websocket_loop(self) -> None:
async def start_websocket_loop(self, websocket, id) -> None:
"""Start a websocket reconnection loop."""
if TYPE_CHECKING:
assert self._api.websocket
assert websocket

should_reconnect = True

try:
await self._api.websocket.async_connect()
await self._api.websocket.async_listen()
await websocket.async_connect()
await websocket.async_listen()
except asyncio.CancelledError:
LOG.debug("Request to cancel websocket loop received")
raise
except WebsocketError as err:
LOG.error(f"Failed to connect to websocket: {err}", exc_info=err)
await self.cancel_websocket_loop()
await self.cancel_websocket_loop(websocket, id)
except InvalidCredentialsError:
LOG.warning("Invalid credentials? Refreshing websocket infos")
await self.cancel_websocket_loop()
await self.cancel_websocket_loop(websocket, id)
await self._api.refresh_ws_token()
except Exception as err: # pylint: disable=broad-except
LOG.error(
f"Unknown exception while connecting to websocket: {err}", exc_info=err
)
await self.cancel_websocket_loop()
await self.cancel_websocket_loop(websocket, id)



if should_reconnect:
LOG.info("Disconnected from websocket; reconnecting in 5 seconds.")
await asyncio.sleep(5)
self._websocket_reconnect_task = self._hass.async_create_task(
self.start_websocket_loop()
self._websocket_reconnect_tasks[id] = self._hass.async_create_task(
self.start_websocket_loop(websocket, id)
)

async def cancel_task(self, task) -> None:
Expand All @@ -496,15 +517,15 @@ async def cancel_task(self, task) -> None:
task = None
return task

async def cancel_websocket_loop(self) -> None:
async def cancel_websocket_loop(self, websocket, id) -> None:
"""Stop any existing websocket reconnection loop."""
self._websocket_reconnect_task = await self.cancel_task(
self._websocket_reconnect_task
self._websocket_reconnect_tasks[id] = await self.cancel_task(
self._websocket_reconnect_tasks[id]
)
self._update_task = await self.cancel_task(self._update_task)
self._update_task[id] = await self.cancel_task(self._update_task[id])
if TYPE_CHECKING:
assert self._api.websocket
await self._api.websocket.async_disconnect()
assert websocket
await websocket.async_disconnect()

async def async_update(self) -> None:
"""Updates tarif periodically."""
Expand Down

0 comments on commit f4b24ef

Please sign in to comment.