Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some fixes && optimizations #4

Open
wants to merge 16 commits into
base: dev
Choose a base branch
from
33 changes: 32 additions & 1 deletion docs/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,38 @@ An event listener in a cog.

Called when an audio WebSocket (to Discord) is closed. This can happen for various reasons (normal and abnormal), e.g. when using an expired voice server update.
4xxx codes are usually bad.
See the `Discord Docs <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>`_.

.. note::
See the `Discord Docs <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>`_.

.. function:: on_harmonize_session_no_longer(player: harmonize.Player)

Called when an audio WebSocket (to Discord) is closed with code 4006.

.. note::
See the `Discord Docs <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>`_.

.. function:: on_harmonize_session_timeout(player: harmonize.Player)

Called when an audio WebSocket (to Discord) is closed with code 4009.

.. note::
See the `Discord Docs <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>`_.

.. function:: on_harmonize_voice_modification(player: harmonize.Player)

Called when an audio WebSocket (to Discord) is closed with code 4014.
E.g., changed the voice channel or kicked out of the channel

.. note::
See the `Discord Docs <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>`_.

.. function:: on_harmonize_voice_crashed(player: harmonize.Player)

Called when an audio WebSocket (to Discord) is closed with code 4015.

.. note::
See the `Discord Docs <https://discord.com/developers/docs/topics/opcodes-and-status-codes#voice-voice-close-event-codes>`_.

.. function:: on_harmonize_extra_event(event_type: str, player: harmonize.Player, data: str)

Expand Down
23 changes: 7 additions & 16 deletions harmonize/connection/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

class Node:
"""Represents a lavalink node

Operations
----------
.. describe:: x == y
Expand All @@ -40,7 +40,7 @@ class Node:
.. describe:: hash(x)

Return the node's hash.

Attributes
----------
identifier : str
Expand Down Expand Up @@ -77,15 +77,6 @@ class Node:

@classmethod
def _load_cache(cls, capacity: int) -> None:
"""
Initializes the cache with the specified capacity.

Args:
capacity (int): The capacity of the cache.

Returns:
None
"""
if cls.__cache is None:
cls.__cache = LFUCache(capacity=capacity)

Expand Down Expand Up @@ -497,7 +488,7 @@ async def get_player(self, guild_id: Union[str, int]) -> dict[str, any]:

async def get_players(self) -> list[dict[str, any]]:
"""|coro|

Retrieves a list of players associated with the session ID.

Returns
Expand Down Expand Up @@ -540,9 +531,9 @@ async def update_player(
**kwargs
) -> Optional[dict[str, any]]:
"""|coro|

Updates the state of a player with the given guild ID.

Parameters
----------
guild_id : Union[str, int]
Expand Down Expand Up @@ -572,12 +563,12 @@ async def update_player(
If not specified, no additional user data will be associated.
**kwargs
Additional keyword arguments to pass to the request.

Returns
-------
Optional[dict[str, any]]
The updated player information, or None if no update was made.

Raises
------
InvalidSession
Expand Down
72 changes: 56 additions & 16 deletions harmonize/connection/transport.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from asyncio import sleep
from typing import TYPE_CHECKING, Optional

Expand All @@ -17,9 +18,10 @@
from harmonize.abstract.serializable import Serializable
from harmonize.enums import NodeStatus, EndReason, Severity
from harmonize.exceptions import AuthorizationError, NodeUnknownError, Forbidden, RequestError
from harmonize.objects import Stats
from harmonize.objects import Stats, Track

if TYPE_CHECKING:
from harmonize import Player
from harmonize.connection.node import Node

__all__ = (
Expand Down Expand Up @@ -97,9 +99,8 @@ async def _connect_back(self) -> None:
f"was success to successfully connect/reconnect to Lavalink V4 after "
f'{retries} connection attempts.'
)
self.dispatch("node_ready", self._node)

await self._listen()
asyncio.run_coroutine_threadsafe(self._listen(), loop=asyncio.get_event_loop())
break

if self._retries <= retries:
Expand All @@ -125,7 +126,18 @@ async def connect(self) -> None:

await self._connect_back()
except Exception as e:
logger.warning(f"Connection timeout to Lavalink V4: {e}")
logger.warning(f"An error ({type(e).__name__}) was thrown when connecting: {e}")

def _handle_ws_closed_event(self, player: Player, data: dict[any, any]) -> None:
match int(data["code"]):
case 4006:
self.dispatch("session_no_longer", player)
case 4009:
self.dispatch("session_timeout", player)
case 4014:
self.dispatch("voice_modification", player)
case 4015:
self.dispatch("voice_crashed", player)

async def _handle_event(self, data: dict[any, any]) -> None:
player = self._node.players.get(int(data['guildId'])) # type: ignore
Expand All @@ -137,19 +149,23 @@ async def _handle_event(self, data: dict[any, any]) -> None:

return

try:
await player.handle_event(data)
except Exception as e:
logger.error(f'Player {player.guild.id} threw an error whilst handling event : {e}')

if event_type == 'TrackStartEvent':
self.dispatch(
"track_start",
player,
player.queue.current
)
elif event_type == 'TrackEndEvent':
end_reason = EndReason(data['reason'])
self.dispatch(
"track_end",
player,
player.queue.history[0],
end_reason
Track.from_dict(data["track"]),
EndReason(data["reason"])
)
elif event_type == 'TrackExceptionEvent':
exception = data['exception']
Expand All @@ -167,19 +183,43 @@ async def _handle_event(self, data: dict[any, any]) -> None:
assert player.queue.current is not None
self.dispatch("track_stuck", player, player.queue.current, int(data['thresholdMs']))
elif event_type == 'WebSocketClosedEvent':
"""
+------+---------------------------+----------------------------------------------------------+
| CODE | DESCRIPTION | EXPLANATION |
+======+===========================+==========================================================+
| 4001 | Unknown opcode | You sent an invalid opcode. |
+------+---------------------------+----------------------------------------------------------+
| 4002 | Failed to decode payload | You sent an invalid payload in your identifying to the |
| | | Gateway. |
+------+---------------------------+----------------------------------------------------------+
| 4003 | Not authenticated | You sent a payload before identifying with the Gateway. |
+------+---------------------------+----------------------------------------------------------+
| 4004 | Authentication failed | The token you sent in your identify payload is incorrect.|
+------+---------------------------+----------------------------------------------------------+
| 4005 | Already authenticated | You sent more than one identify payload. Stahp. |
+------+---------------------------+----------------------------------------------------------+
| 4006 | Session no longer valid | Your session is no longer valid. |
+------+---------------------------+----------------------------------------------------------+
| 4009 | Session timeout | Your session has timed out. |
+------+---------------------------+----------------------------------------------------------+
| 4011 | Server not found | We can't find the server you're trying to connect to. |
+------+---------------------------+----------------------------------------------------------+
| 4012 | Unknown protocol | We didn't recognize the protocol you sent. |
+------+---------------------------+----------------------------------------------------------+
| 4014 | Disconnected | Channel was deleted, you were kicked, voice server |
| | | changed, or the main gateway session was dropped. |
| | | Should not reconnect. |
+------+---------------------------+----------------------------------------------------------+
| 4015 | Voice server crashed | The server crashed. Our bad! Try resuming. |
+------+---------------------------+----------------------------------------------------------+
| 4016 | Unknown encryption mode | We didn't recognize your encryption. |
+------+---------------------------+----------------------------------------------------------+
"""
self.dispatch("discord_ws_closed", player, int(data['code']), data['reason'], bool(data['byRemote']))
Krispeckt marked this conversation as resolved.
Show resolved Hide resolved
self._handle_ws_closed_event(player, data)
else:
return self.dispatch("extra_event", event_type, player, data)

if player and event_type in (
'TrackStuckEvent',
'TrackEndEvent'
):
try:
await player.handle_event(EndReason(data['reason']))
except Exception as e:
logger.error(f'Player {player.guild.id} threw an error whilst handling event : {e}')

async def _handle_message(self, data: dict[any, any] | list[any]) -> None:
if not isinstance(data, dict) or 'op' not in data:
return
Expand Down
72 changes: 38 additions & 34 deletions harmonize/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

from harmonize.abstract import Filter
from harmonize.connection import Pool
from harmonize.enums import EndReason, LoopStatus
from harmonize.exceptions import RequestError, InvalidChannelStateException
from harmonize.enums import LoopStatus
from harmonize.exceptions import InvalidChannelStateException, RequestError
from harmonize.objects import Track, MISSING
from harmonize.queue import Queue

Expand All @@ -36,9 +36,6 @@ class Player(VoiceProtocol):
node : :class:`harmonize.connection.Node`
The node the player is connected to.

connection_event : :class:`asyncio.Event`
An event triggered when the player's connection state changes.

voice_state : dict[str, any]
The current voice state of the player.

Expand Down Expand Up @@ -82,6 +79,7 @@ class Player(VoiceProtocol):

def __call__(self, client: Client, channel: VocalGuildChannel) -> Player:
super().__init__(client, channel)
self._guild = channel.guild
return self

def __init__(self, *args, **kwargs) -> None:
Expand Down Expand Up @@ -132,6 +130,12 @@ def volume(self) -> int:
def queue(self) -> Queue:
return self._queue

@queue.setter
Krispeckt marked this conversation as resolved.
Show resolved Hide resolved
def queue(self, value: Queue) -> None:
if not isinstance(value, Queue):
raise TypeError('Queue must be an instance of Queue')
self._queue = value

@property
def filters(self) -> list[Filter]:
return list(self._filters.values())
Expand All @@ -145,48 +149,48 @@ async def on_voice_server_update(self, data: dict) -> None:
await self._dispatch_voice_update()

async def on_voice_state_update(self, data: dict) -> None:
if not data['channel_id']:
if not (channel := int(data["channel_id"])):
return await self.disconnect(force=True)

self._connected = True
self.channel = self.client.get_channel(channel) # type: ignore

if data['session_id'] != self._voice_state.get('sessionId'):
self._voice_state.update(sessionId=data['session_id'])

await self._dispatch_voice_update()

async def _dispatch_voice_update(self) -> None:
if {'sessionId', 'endpoint', 'token'} == self._voice_state.keys():
await self._node.update_player(guild_id=self.guild.id, voice_state=self._voice_state)
self._connection_event.set()
try:
await self._node.update_player(guild_id=self.guild.id, voice_state=self._voice_state)
except RequestError:
await self.disconnect(force=True)
else:
self._connection_event.set()

async def handle_event(self, reason: EndReason) -> None:
async def handle_event(self, data: dict[any, any]) -> None:
"""|coro|

Handles an event triggered by the player, such as a track finishing or a load failure.
Handles events received from the data source.

Note
----
This function is required for autoplay please do not touch it for personal use
This function is used in the processing of events within the player

Parameters
----------
reason : :class:`harmonize.enums.EndReason`
The reason for the event.
data : dict[any, any]
The event data from `Lavalink <https://lavalink.dev/api/websocket.html#event-types>`_

Returns
-------
None
"""
if (
reason.value == EndReason.FINISHED.value
or reason.value == EndReason.LOAD_FAILED.value
):
try:
match data["type"]:
case "TrackStuckEvent":
await self.play()
except RequestError as error:
logger.error(
'Encountered a request error whilst '
f'starting a new track on guild ({self.guild.id}) {error}'
)
case "TrackEndEvent":
if data["reason"] in ('finished', 'loadFailed'):
await self.play()

async def update_state(self, state: dict) -> None:
"""|coro|
Expand Down Expand Up @@ -299,7 +303,7 @@ async def _play_back(

options['paused'] = pause

if track := await self._queue._go_to_next(track):
if track := await self._queue.load_next(track):
options["encoded_track"] = track.encoded

return await self._node.update_player(
Expand Down Expand Up @@ -806,7 +810,14 @@ async def move_to(

Moves the player to a specified voice channel.

Args:
Note
----
This method will clear the `_connection_event` event
and wait for the player to connect to the specified channel.
If the connection attempt times out or is cancelled, the player will be destroyed.

Parameters
----------
channel : VocalGuildChannel | None
The voice channel to move the player to. If `None`, the player will remain in its current channel.
timeout : Optional[float]
Expand All @@ -827,13 +838,6 @@ async def move_to(
Raises
------
InvalidChannelStateException: If the player tries to move without a valid guild or channel.

Note
-----
This method will clear the `_connection_event` event
and wait for the player to connect to the specified channel.
If the connection attempt times out or is cancelled, the player will be destroyed.

"""
if not self.guild:
raise InvalidChannelStateException("Player tried to move without a valid guild.")
Expand Down
Loading