Skip to content

Commit

Permalink
Update GameSpy Protocol version 1
Browse files Browse the repository at this point in the history
  • Loading branch information
BattlefieldDuck committed Jan 23, 2024
1 parent 0af196e commit da5ae9a
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 49 deletions.
135 changes: 86 additions & 49 deletions opengsq/protocols/gamespy1.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,113 @@
import re

from opengsq.responses.gamespy1 import Status
from opengsq.binary_reader import BinaryReader
from opengsq.protocol_base import ProtocolBase
from opengsq.protocol_socket import UdpClient


class GameSpy1(ProtocolBase):
"""GameSpy Protocol version 1"""
full_name = 'GameSpy Protocol version 1'

full_name = "GameSpy Protocol version 1"

# Legacy:UT_Server_Query - (https://wiki.beyondunreal.com/Legacy:UT_Server_Query)
# Query_commands - (https://wiki.beyondunreal.com/XServerQuery#Query_commands)
class __Request():
BASIC = b'\\basic\\'
INFO = b'\\info\\xserverquery'
RULES = b'\\rules\\xserverquery'
PLAYERS = b'\\players\\xserverquery'
STATUS = b'\\status\\xserverquery'
TEAMS = b'\\teams\\'
class __Request:
BASIC = b"\\basic\\"
INFO = b"\\info\\xserverquery"
RULES = b"\\rules\\xserverquery"
PLAYERS = b"\\players\\xserverquery"
STATUS = b"\\status\\xserverquery"
TEAMS = b"\\teams\\"

async def get_basic(self) -> dict:
"""This returns basic server information, mainly for recognition."""
return self.__parse_as_key_values(await self.__connect_and_send(self.__Request.BASIC))
"""
Asynchronously retrieves the basic information of the game server.
:return: A dictionary containing the basic information of the game server.
"""
return self.__parse_as_key_values(
await self.__connect_and_send(self.__Request.BASIC)
)

# Server may still response with Legacy version
async def get_info(self, xserverquery: bool = True) -> dict:
"""
Information about the current game running on the server.
Asynchronously retrieves the information of the current game running on the server.
If the server uses XServerQuery, he sends you the new information, otherwise he'll give you back the old information.
:param xserverquery: A boolean indicating whether to use XServerQuery.
:return: A dictionary containing the information of the current game.
"""
data = xserverquery and self.__Request.INFO or self.__Request.INFO.replace(b'xserverquery', b'')
data = (
xserverquery
and self.__Request.INFO
or self.__Request.INFO.replace(b"xserverquery", b"")
)
return self.__parse_as_key_values(await self.__connect_and_send(data))

async def get_rules(self, xserverquery: bool = True) -> list:
"""
Setting for the current game, return sets of rules depends on the running game type.
Asynchronously retrieves the rules of the current game running on the server.
If the server uses XServerQuery, he sends you the new information, otherwise he'll give you back the old information.
:param xserverquery: A boolean indicating whether to use XServerQuery.
:return: A list containing the rules of the current game.
"""
data = xserverquery and self.__Request.RULES or self.__Request.RULES.replace(b'xserverquery', b'')
data = (
xserverquery
and self.__Request.RULES
or self.__Request.RULES.replace(b"xserverquery", b"")
)
return self.__parse_as_key_values(await self.__connect_and_send(data))

async def get_players(self, xserverquery: bool = True) -> list:
"""
Returns information about each player on the server.
Asynchronously retrieves the information of each player on the server.
If the server uses XServerQuery, he sends you the new information, otherwise he'll give you back the old information.
:param xserverquery: A boolean indicating whether to use XServerQuery.
:return: A list containing the information of each player.
"""
data = xserverquery and self.__Request.PLAYERS or self.__Request.PLAYERS.replace(b'xserverquery', b'')
data = (
xserverquery
and self.__Request.PLAYERS
or self.__Request.PLAYERS.replace(b"xserverquery", b"")
)
return self.__parse_as_object(await self.__connect_and_send(data))

async def get_status(self, xserverquery: bool = True) -> dict:
async def get_status(self, xserverquery: bool = True) -> Status:
"""
XServerQuery: \\info\\xserverquery\\rules\\xserverquery\\players\\xserverquery
Asynchronously retrieves the status of the game server.
XServerQuery: \\info\\xserverquery\\rules\\xserverquery\\players\\xserverquery
Old response: \\basic\\info\\rules\\players\\
If the server uses XServerQuery, he sends you the new information, otherwise he'll give you back the old information.
:param xserverquery: A boolean indicating whether to use XServerQuery.
:return: A Status object containing the status of the game server.
"""
data = xserverquery and self.__Request.STATUS or self.__Request.STATUS.replace(b'xserverquery', b'')
data = (
xserverquery
and self.__Request.STATUS
or self.__Request.STATUS.replace(b"xserverquery", b"")
)
br = await self.__connect_and_send(data)

status = {}
status['info'] = self.__parse_as_key_values(br, is_status=True)
status['players'] = self.__parse_as_object(br, is_player=True)
status['teams'] = [] if br.is_end() else self.__parse_as_object(br)
info = self.__parse_as_key_values(br, is_status=True)
players = self.__parse_as_object(br, is_player=True)
teams = [] if br.is_end() else self.__parse_as_object(br)

return status
return Status(info, players, teams)

async def get_teams(self) -> list:
"""Returns information about each team on the server."""
return self.__parse_as_object(await self.__connect_and_send(self.__Request.TEAMS))
"""
Asynchronously retrieves the information of each team on the server.
:return: A list containing the information of each team.
"""
return self.__parse_as_object(
await self.__connect_and_send(self.__Request.TEAMS)
)

# Receive packets and sort it
async def __get_packets_response(self, udpClient: UdpClient):
Expand All @@ -83,11 +119,11 @@ async def __get_packets_response(self, udpClient: UdpClient):
packet = await udpClient.recv()

# Get the packet number from query_id
r = re.compile(rb'\\queryid\\\d+\.(\d+)')
number, payload = int(r.search(packet).group(1)), r.sub(b'', packet)
r = re.compile(rb"\\queryid\\\d+\.(\d+)")
number, payload = int(r.search(packet).group(1)), r.sub(b"", packet)

# If it is the last packet, it will contain b'\\final\\' at the end of the response
if payload.endswith(b'\\final\\'):
if payload.endswith(b"\\final\\"):
# Save the packet count
packet_count = number

Expand All @@ -98,7 +134,7 @@ async def __get_packets_response(self, udpClient: UdpClient):
payloads[number] = payload[1:] if number == 1 else payload

# Sort the payload and return as bytes
response = b''.join(payloads[number] for number in sorted(payloads))
response = b"".join(payloads[number] for number in sorted(payloads))

return response

Expand All @@ -118,18 +154,18 @@ def __parse_as_key_values(self, br: BinaryReader, is_status=False):

# Bind key value
while br.remaining_bytes() > 0:
key = br.read_string(b'\\').lower()
key = br.read_string(b"\\").lower()

# Check is the end of key_values
if is_status:
items = key.split('_')
items = key.split("_")

if len(items) > 1 and items[1].isdigit():
# Read already, so add it back
br.prepend_bytes(key.encode() + b'\\')
br.prepend_bytes(key.encode() + b"\\")
break

value = br.read_string(b'\\')
value = br.read_string(b"\\")
kv[key] = value.strip()

return kv
Expand All @@ -139,25 +175,25 @@ def __parse_as_object(self, br: BinaryReader, is_player=False):

while br.remaining_bytes() > 0:
# Get the key, for example player_1, frags_1, ping_1, etc...
key = br.read_string(b'\\').lower()
key = br.read_string(b"\\").lower()

if is_player and key.startswith('teamname_'):
if is_player and key.startswith("teamname_"):
# Read already, so add it back
br.prepend_bytes(key.encode() + b'\\')
br.prepend_bytes(key.encode() + b"\\")
break

# Extract to name and index, for example name=player, index=1
matches = re.search(r'(.+?)_(\d+)', key)
matches = re.search(r"(.+?)_(\d+)", key)
name = matches.group(1)
name = 'player' if name == 'playername' else name
name = 'team' if name == 'teamname' else name
name = "player" if name == "playername" else name
name = "team" if name == "teamname" else name
index = int(matches.group(2))

# Get the value, and strip it since some values contain whitespaces
value = br.read_string(b'\\').strip()
value = br.read_string(b"\\").strip()

# Some servers (bf1942) report the same player multiple times, so filter it by keyhash
if name == 'keyhash':
if name == "keyhash":
if value in keyhashes:
filters.append(index)
else:
Expand All @@ -174,16 +210,17 @@ def __parse_as_object(self, br: BinaryReader, is_player=False):
return [v for k, v in items.items() if k not in filters]


if __name__ == '__main__':
if __name__ == "__main__":
import asyncio
import json
from dataclasses import asdict

async def main_async():
gs1 = GameSpy1(host='51.81.48.224', port=23000, timeout=5.0) # bfield1942
gs1 = GameSpy1(host="51.81.48.224", port=23000, timeout=5.0) # bfield1942
# gs1 = GameSpy1(address='139.162.235.20', query_port=7778, timeout=5.0) # ut
# gs1 = GameSpy1(address='192.223.24.6', query_port=7778, timeout=5.0) # ut
gs1 = GameSpy1(host='141.94.205.35', port=12300, timeout=5.0) # mohaa
gs1 = GameSpy1(host="141.94.205.35", port=12300, timeout=5.0) # mohaa
status = await gs1.get_status()
print(json.dumps(status, indent=None) + '\n')
print(json.dumps(asdict(status), indent=None) + "\n")

asyncio.run(main_async())
1 change: 1 addition & 0 deletions opengsq/responses/gamespy1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .status import Status
24 changes: 24 additions & 0 deletions opengsq/responses/gamespy1/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from dataclasses import dataclass


@dataclass
class Status:
"""
Represents the server status.
"""

info: dict[str, str]
"""Server's info. If is_XServerQuery is True, then it includes \\info\\xserverquery\\rules\\xserverquery, else \\basic\\info\\rules\\"""

players: list[dict[str, str]]
"""Server's players."""

teams: list[dict[str, str]]
"""Server's teams. Only when is_x_server_query is True."""

@property
def is_XServerQuery(self) -> bool:
"""Indicates whether the response is XServerQuery or old response."""
return "XServerQuery" in self.info

0 comments on commit da5ae9a

Please sign in to comment.