Skip to content

Commit

Permalink
Refactor to utilize tracked state (#1)
Browse files Browse the repository at this point in the history
* Refactor to utilize tracked state

* Bump version to 0.0.9
  • Loading branch information
jjlawren authored May 28, 2020
1 parent 1ff2ba5 commit 7c1789e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
99 changes: 70 additions & 29 deletions plexwebsocket.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""Support for issuing callbacks for Plex client updates via websockets."""
import asyncio
from concurrent.futures._base import CancelledError
from datetime import datetime
import logging

import aiohttp

_LOGGER = logging.getLogger(__name__)

STATE_CONNECTED = "connected"
STATE_DISCONNECTED = "disconnected"
STATE_STARTING = "starting"
STATE_STOPPED = "stopped"


class WebsocketPlayer: # pylint: disable=too-few-public-methods
"""Represent an individual player state in the Plex websocket stream."""
Expand All @@ -33,6 +39,8 @@ def significant_position_change(self, timestamp, new_position):
class PlexWebsocket:
"""Represent a websocket connection to a Plex server."""

# pylint: disable=too-many-instance-attributes

def __init__(self, plex_server, callback, session=None, verify_ssl=True):
"""Initialize a PlexWebsocket instance.
Expand All @@ -51,9 +59,20 @@ def __init__(self, plex_server, callback, session=None, verify_ssl=True):
self.uri = self._get_uri(plex_server)
self.players = {}
self.callback = callback
self._active = True
self._current_task = None
self._ssl = False if verify_ssl is False else None
self._state = None
self.failed_attempts = 0

@property
def state(self):
"""Return the current state."""
return self._state

@state.setter
def state(self, value):
"""Set the state."""
self._state = value
_LOGGER.debug("Websocket %s", value)

@staticmethod
def _get_uri(plex_server):
Expand All @@ -62,43 +81,61 @@ def _get_uri(plex_server):
"/:/websockets/notifications", includeToken=True
).replace("http", "ws")

async def listen(self):
async def running(self):
"""Open a persistent websocket connection and act on events."""
self._active = True
failed_attempts = 0
while self._active:
try:
async with self.session.ws_connect(
self.uri, heartbeat=15, ssl=self._ssl
) as ws_client:
failed_attempts = 0
self._current_task = asyncio.Task.current_task()
_LOGGER.debug("Websocket connected")
self.callback()

async for message in ws_client:
self.state = STATE_STARTING

try:
async with self.session.ws_connect(
self.uri, heartbeat=15, ssl=self._ssl
) as ws_client:
self.state = STATE_CONNECTED
self.failed_attempts = 0
self.callback()

async for message in ws_client:
if self.state == STATE_STOPPED:
break

if message.type == aiohttp.WSMsgType.TEXT:
msg = message.json()["NotificationContainer"]
if self.player_event(msg):
self.callback()

except aiohttp.client_exceptions.ClientConnectionError as error:
retry_delay = min(2 ** (failed_attempts - 1) * 30, 300)
failed_attempts += 1
elif message.type == aiohttp.WSMsgType.CLOSED:
_LOGGER.warning("AIOHTTP websocket connection closed")
break

elif message.type == aiohttp.WSMsgType.ERROR:
_LOGGER.error("AIOHTTP websocket error")
break

except aiohttp.ClientConnectionError as error:
if self.state != STATE_STOPPED:
retry_delay = min(2 ** (self.failed_attempts - 1) * 30, 300)
self.failed_attempts += 1
_LOGGER.error(
"Websocket connection failed, retrying in %ds: %s",
retry_delay,
error,
)
self.state = STATE_DISCONNECTED
await asyncio.sleep(retry_delay)
except Exception as error: # pylint: disable=broad-except
except CancelledError:
_LOGGER.debug("Websocket future cancelled")
self.state = STATE_STOPPED
except Exception as error: # pylint: disable=broad-except
if self.state != STATE_STOPPED:
_LOGGER.exception("Unexpected exception occurred: %s", error)
self.state = STATE_DISCONNECTED
await asyncio.sleep(10)
else:
_LOGGER.error("Websocket disconnected")
if self._active:
# Session IDs reset if Plex server has restarted, be safe
self.players.clear()
await asyncio.sleep(5)
else:
if self.state != STATE_STOPPED:
self.state = STATE_DISCONNECTED

# Session IDs reset if Plex server has restarted, be safe
self.players.clear()
await asyncio.sleep(5)

def player_event(self, msg):
"""Determine if messages relate to an interesting player event."""
Expand Down Expand Up @@ -155,8 +192,12 @@ def player_event(self, msg):

return should_fire

async def listen(self):
"""Close the listening websocket."""

This comment has been minimized.

Copy link
@glensc

glensc Jan 11, 2023

copy paste method description

self.failed_attempts = 0
while self.state != STATE_STOPPED:
await self.running()

def close(self):
"""Close the listening websocket."""
self._active = False
if self._current_task is not None:
self._current_task.cancel()
self.state = STATE_STOPPED
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
with open('README.md') as f:
long_description = f.read()

VERSION="0.0.8"
VERSION="0.0.9"

setup(
name='plexwebsocket',
Expand Down

0 comments on commit 7c1789e

Please sign in to comment.