diff --git a/opengsq/protocols/gamespy3.py b/opengsq/protocols/gamespy3.py index 5c9aa50..23825dd 100644 --- a/opengsq/protocols/gamespy3.py +++ b/opengsq/protocols/gamespy3.py @@ -1,11 +1,10 @@ from __future__ import annotations -import re - from opengsq.binary_reader import BinaryReader from opengsq.exceptions import InvalidPacketException from opengsq.protocol_base import ProtocolBase from opengsq.protocol_socket import UdpClient +from opengsq.responses.gamespy2 import Status class GameSpy3(ProtocolBase): @@ -16,12 +15,12 @@ class GameSpy3(ProtocolBase): full_name = "GameSpy Protocol version 3" challenge_required = False - async def get_status(self) -> dict: + async def get_status(self) -> Status: """ Asynchronously retrieves the status of the game server. The status includes information about the server, players, and teams. - :return: A dictionary containing the status of the game server. + :return: A Status object containing the status of the game server. """ # Connect to remote host with UdpClient() as udpClient: @@ -61,8 +60,7 @@ async def get_status(self) -> dict: br = BinaryReader(response) - result = {} - result["info"] = {} + info = {} while True: key = br.read_string() @@ -70,24 +68,11 @@ async def get_status(self) -> dict: if key == "": break - result["info"][key] = br.read_string() - - pattern = re.compile(rb"\x00([^a-zA-Z])([a-zA-Z_]+)\x00\x00(.+?(?=\x00\x00))") - current_id = -1 - current_name = None - - for id, name, data in re.findall(pattern, b"\x00" + br.read()): - values = data.split(b"\x00") - name = name.decode("utf-8").split("_")[0] + info[key] = br.read_string() - if current_id != id and id != b"\x00": - current_id, current_name = id, name - result[current_name] = [{} for _ in range(len(values))] + status = Status(info, self.__get_dictionaries(br), self.__get_dictionaries(br)) - for i in range(len(values)): - result[current_name][i][name] = values[i].decode("utf-8") - - return result + return status async def __read(self, udpClient: UdpClient) -> bytes: packet_count = -1 @@ -151,6 +136,52 @@ async def __read(self, udpClient: UdpClient) -> bytes: return response + def __get_dictionaries(self, br: BinaryReader) -> list[dict[str, str]]: + kvs = [] + + # Return if BaseStream is end + if br.is_end(): + return kvs + + # Skip a byte + br.read_byte() + + # Player/Team index + i = 0 + + while not br.is_end(): + key = br.read_string() + + if key: + # Skip \x00 + br.read_byte() + + # Remove the trailing "_t" + key = key.rstrip("t").rstrip("_") + + # Change the key to name + if key in ["player", "team"]: + key = "name" + + while not br.is_end(): + value = br.read_string() + + if value: + # Add a Dictionary object if not exists + if len(kvs) < i + 1: + kvs.append({}) + + kvs[i][key] = value + i += 1 + else: + break + + i = 0 + else: + break + + return kvs + if __name__ == "__main__": import asyncio