diff --git a/discordgsm/database.py b/discordgsm/database.py index 753a9ea..954f1f2 100644 --- a/discordgsm/database.py +++ b/discordgsm/database.py @@ -1,4 +1,5 @@ from __future__ import annotations +from datetime import datetime from enum import Enum @@ -68,10 +69,12 @@ def connect(self): elif DB_CONNECTION == Driver.MongoDB.value: self.driver = Driver.MongoDB self.conn = MongoClient(DATABASE_URL) - self.collection = self.conn.get_default_database()['servers'] + self.servers = self.conn.get_default_database()['servers'] + self.metrics = self.conn.get_default_database()['metrics'] else: self.driver = Driver.SQLite - self.database = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'data', 'servers.db') + self.database = os.path.join(os.path.dirname( + os.path.realpath(__file__)), '..', 'data', 'servers.db') def __connect_psycopg2(self, database_url: str, max_retries=3): retries = 0 @@ -81,14 +84,16 @@ def __connect_psycopg2(self, database_url: str, max_retries=3): time.sleep(1) try: - pool = psycopg2.pool.ThreadedConnectionPool(1, 10, database_url, sslmode=sslmode) + pool = psycopg2.pool.ThreadedConnectionPool( + 1, 10, database_url, sslmode=sslmode) return pool except psycopg2.OperationalError as e: if retries >= max_retries: raise e retries += 1 - print(f"Connection failed. Retry attempt {retries}/{max_retries}. Retrying in 1 second...") + print( + f"Connection failed. Retry attempt {retries}/{max_retries}. Retrying in 1 second...") def create_table_if_not_exists(self): if self.driver == Driver.MongoDB: @@ -176,9 +181,9 @@ def transform(self, sql: str): @run_in_executor def statistics(self): if self.driver == Driver.MongoDB: - messages = len(self.collection.distinct("message_id")) - channels = len(self.collection.distinct("channel_id")) - guilds = len(self.collection.distinct("guild_id")) + messages = len(self.servers.distinct("message_id")) + channels = len(self.servers.distinct("channel_id")) + guilds = len(self.servers.distinct("guild_id")) pipeline = [ {"$group": { @@ -190,7 +195,7 @@ def statistics(self): } }} ] - unique_servers = len(list(self.collection.aggregate(pipeline))) + unique_servers = len(list(self.servers.aggregate(pipeline))) return { 'messages': messages, @@ -226,7 +231,7 @@ def count_servers_per_game(self): pipeline = [ {"$group": {"_id": "$game_id", "count": {"$sum": 1}}} ] - results = self.collection.aggregate(pipeline) + results = self.servers.aggregate(pipeline) servers_count = {str(row['_id']): int(row['count']) for row in results} results.close() @@ -246,7 +251,7 @@ def count_servers_per_channel(self): pipeline = [ {"$group": {"_id": "$channel_id", "count": {"$sum": 1}}} ] - results = self.collection.aggregate(pipeline) + results = self.servers.aggregate(pipeline) servers_count = {str(row['_id']): int(row['count']) for row in results} results.close() @@ -268,19 +273,19 @@ def __all_servers(self, *, channel_id: int = None, guild_id: int = None, message """Get all servers""" if self.driver == Driver.MongoDB: if channel_id: - results = self.collection.find( + results = self.servers.find( {"channel_id": channel_id}).sort("position") elif guild_id: - results = self.collection.find( + results = self.servers.find( {"guild_id": guild_id}).sort("position") elif message_id: - results = self.collection.find( + results = self.servers.find( {"message_id": message_id}).sort("position") elif game_id: - results = self.collection.find( + results = self.servers.find( {"game_id": game_id}).sort("position") else: - results = self.collection.find({}).sort("position") + results = self.servers.find({}).sort("position") servers = [Server.from_docs(doc, filter_secret) for doc in results] results.close() @@ -326,7 +331,7 @@ def distinct_servers(self): } }} ] - results = self.collection.aggregate(pipeline) + results = self.servers.aggregate(pipeline) servers = [QueryServer(**row['_id']) for row in results] results.close() return servers @@ -339,16 +344,33 @@ def distinct_servers(self): return servers + def server_limit(self, s: Server): + if self.driver == Driver.MongoDB: + pipeline = [ + {"$group": { + "_id": { + "game_id": "$game_id", + "address": "$address", + "query_port": "$query_port", + "query_extra": "$query_extra", + "status": "$status", + "result": "$result" + } + }} + ] + + return int(os.getenv('APP_PUBLIC_SERVER_LIMIT', '10')) + @run_in_executor def add_server(self, s: Server): if self.driver == Driver.MongoDB: try: - max_position = self.collection.find_one({'channel_id': s.channel_id}, sort=[ - ('position', -1)])["position"] + max_position = self.servers.find_one({'channel_id': s.channel_id}, sort=[ + ('position', -1)])["position"] except TypeError: max_position = 0 - self.collection.insert_one({ + self.servers.insert_one({ "position": max_position + 1, "guild_id": s.guild_id, "channel_id": s.channel_id, @@ -386,7 +408,7 @@ def update_servers_message_id(self, servers: list[Server]): ] if operations: - self.collection.bulk_write(operations) + self.servers.bulk_write(operations) return @@ -412,7 +434,7 @@ def update_servers(self, servers: list[Server], *, channel_id: int = None): ] if operations: - self.collection.bulk_write(operations) + self.servers.bulk_write(operations) return @@ -423,6 +445,37 @@ def update_servers(self, servers: list[Server], *, channel_id: int = None): cursor.executemany(self.transform(sql), parameters) self.close(conn, cursor, commit=True) + @run_in_executor + def update_metrics(self, servers: list[Server]): + if self.driver == Driver.MongoDB: + if os.getenv('METRICS_ENABLE', '').lower() == 'true': + limit = int(os.getenv('METRICS_RECORD_LIMIT', '1000')) + + operations = [ + UpdateOne({ + "game_id": server.game_id, + "address": server.address, + "query_port": server.query_port, + "query_extra": server.query_extra + }, { + "$push": { + "records": { + "$each": [{ + "s": server.status, + "p": server.result["numplayers"], + "b": server.result["numbots"], + "m": server.result["maxplayers"], + "c": datetime.utcnow(), + }], + "$slice": limit * -1 + } + } + }, upsert=True) for server in servers + ] + + if operations: + self.metrics.bulk_write(operations) + @run_in_executor def delete_servers(self, *, guild_id: int = None, channel_id: int = None, servers: list[Server] = None): if guild_id is None and channel_id is None and servers is None: @@ -430,15 +483,15 @@ def delete_servers(self, *, guild_id: int = None, channel_id: int = None, server if self.driver == Driver.MongoDB: if guild_id is not None: - self.collection.delete_many({"guild_id": guild_id}) + self.servers.delete_many({"guild_id": guild_id}) elif channel_id is not None: - self.collection.delete_many({"channel_id": channel_id}) + self.servers.delete_many({"channel_id": channel_id}) elif servers is not None: operations = [DeleteOne({"_id": server.id}) for server in servers] if operations: - self.collection.bulk_write(operations) + self.servers.bulk_write(operations) else: conn, cursor = self.cursor() @@ -461,7 +514,7 @@ def find_server(self, channel_id: int, address: str = None, query_port: int = No def __find_server(self, channel_id: int, address: str = None, query_port: int = None): if self.driver == Driver.MongoDB: - result = self.collection.find_one( + result = self.servers.find_one( {"channel_id": channel_id, "address": address, "query_port": query_port}) if not result: @@ -503,12 +556,12 @@ def modify_server_position(self, server1: Server, direction: bool): def __swap_servers_positon(self, server1: Server, server2: Server): if self.driver == Driver.MongoDB: # Update server1's position and message_id to server2's values - self.collection.update_one({"_id": server1.id}, { - "$set": {"position": server2.position, "message_id": server2.message_id}}) + self.servers.update_one({"_id": server1.id}, { + "$set": {"position": server2.position, "message_id": server2.message_id}}) # Update server2's position and message_id to the original server1's values - self.collection.update_one({"_id": server2.id}, { - "$set": {"position": server1.position, "message_id": server1.message_id}}) + self.servers.update_one({"_id": server2.id}, { + "$set": {"position": server1.position, "message_id": server1.message_id}}) else: sql = 'UPDATE servers SET position = case when position = ? then ? else ? end, message_id = case when message_id = ? then ? else ? end WHERE id IN (?, ?)' conn, cursor = self.cursor() @@ -525,7 +578,7 @@ def __swap_servers_positon(self, server1: Server, server2: Server): @run_in_executor def update_server_style_id(self, server: Server): if self.driver == Driver.MongoDB: - self.collection.update_one( + self.servers.update_one( {"_id": server.id}, {"$set": {"style_id": server.style_id}}) return @@ -543,7 +596,7 @@ def update_servers_style_data(self, servers: list[Server]): {"$set": {"style_data": server.style_data}} ) for server in servers ]: - self.collection.bulk_write(operations) + self.servers.bulk_write(operations) return @@ -557,8 +610,8 @@ def update_servers_style_data(self, servers: list[Server]): def __update_servers_channel_id(self, servers: list[Server], channel_id: int): if self.driver == Driver.MongoDB: try: - max_position = self.collection.find_one({'channel_id': channel_id}, sort=[ - ('position', -1)])["position"] + max_position = self.servers.find_one({'channel_id': channel_id}, sort=[ + ('position', -1)])["position"] except TypeError: max_position = 0 @@ -575,7 +628,7 @@ def __update_servers_channel_id(self, servers: list[Server], channel_id: int): ) if operations: - self.collection.bulk_write(operations) + self.servers.bulk_write(operations) return @@ -664,7 +717,7 @@ def import_(self, *, filename: str): servers = json.load(file) # Insert the data into the MongoDB collection - result = self.collection.insert_many(servers) + result = self.servers.insert_many(servers) print(f"Imported {len(result.inserted_ids)} servers.") # If the driver is PostgreSQL or SQLite elif self.driver in [Driver.PostgreSQL, Driver.SQLite]: diff --git a/discordgsm/games.csv b/discordgsm/games.csv index 8c2ac79..94acd50 100644 --- a/discordgsm/games.csv +++ b/discordgsm/games.csv @@ -154,6 +154,7 @@ kingpin,Kingpin: Life of Crime (1999),gamespy1,port=31510;port_query_offset=-10 kisspc,Kiss: Psycho Circus: The Nightmare Child (2000),gamespy1,port=7777;port_query_offset=1 kzmod,Kreedz Climbing (2017),source,port=27015 +leadandgold,Lead and Gold: Gangs of the Wild West (2010),source,port=27015;port_query_offset=1 left4dead,Left 4 Dead (2008),source,port=27015 left4dead2,Left 4 Dead 2 (2009),source,port=27015 diff --git a/discordgsm/main.py b/discordgsm/main.py index e6bfdcf..434f05a 100644 --- a/discordgsm/main.py +++ b/discordgsm/main.py @@ -924,6 +924,7 @@ async def tasks_query(): servers += await asyncio.gather(*chunks) await database.update_servers(servers) + await database.update_metrics(servers) failed = sum(server.status is False for server in servers) success = len(servers) - failed diff --git a/requirements.txt b/requirements.txt index 1f94e04..b4b311a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +aiohttp==3.8.6 backports.zoneinfo==0.2.1;python_version<"3.9" discord.py==2.3.2 flask[async]==3.0.0