diff --git a/DMBotNetwork/client.py b/DMBotNetwork/client.py index 15cf2ff..2b01ed6 100644 --- a/DMBotNetwork/client.py +++ b/DMBotNetwork/client.py @@ -24,15 +24,13 @@ class Client: _listen_task: Optional[asyncio.Task] = None def __init_subclass__(cls, **kwargs): - """Автоматически регистрирует методы, начинающиеся с 'net_', как сетевые методы. - - Args: - **kwargs: Дополнительные аргументы. - """ + """Автоматически регистрирует методы, начинающиеся с 'net_', как сетевые методы.""" super().__init_subclass__(**kwargs) - for method in dir(cls): - if callable(getattr(cls, method)) and method.startswith("net_"): - cls._net_methods[method[4:]] = getattr(cls, method) + cls._net_methods = { + method[4:]: getattr(cls, method) + for method in dir(cls) + if callable(getattr(cls, method)) and method.startswith("net_") + } # Сеттеры и геттеры @classmethod @@ -77,13 +75,10 @@ def get_server_file_path(cls) -> Optional[Path]: @classmethod def get_server_name(cls) -> Optional[str]: - if cls._is_auth: - return cls._server_name - - return None + return cls._server_name if cls._is_auth else None @classmethod - def is_connect(cls) -> bool: + def is_connected(cls) -> bool: return cls._is_connected and cls._is_auth # Основные методы взаимодействия с сервером @@ -96,34 +91,20 @@ async def connect(cls) -> None: try: cls._listen_task = asyncio.create_task(cls._connect_and_listen()) - except Exception as e: logger.error(f"Error creating connect task: {e}") @classmethod async def request_method(cls, spec_type: str, **kwargs) -> None: - """Запрашивает выполнение метода на сервере. - - Args: - spec_type (str): Указание метода, который нужно вызвать. - - Raises: - ConnectionError: Если соединение с сервером не установлено. - """ + """Запрашивает выполнение метода на сервере.""" if not cls._writer: raise ConnectionError("Not connected to server") - request_data = { - "action": "net", - "type": spec_type, - **kwargs - } - + request_data = {"action": "net", "type": spec_type, **kwargs} try: await cls._send_data(request_data) - except Exception as e: - logger.error(f"Error requesting method 'net.{spec_type}'. kwargs: '{kwargs}'. Error: {e}") + logger.error(f"Error requesting method 'net.{spec_type}': {e}") @classmethod async def close_connection(cls) -> None: @@ -137,23 +118,14 @@ async def _connect_and_listen(cls) -> None: try: cls._reader, cls._writer = await asyncio.open_connection(cls._host, cls._port) cls._is_connected = True - await cls.listen_for_messages() - except Exception as e: logger.error(f"Error in connection and listening: {e}") await cls._close() @classmethod async def _send_data(cls, data: Any) -> None: - """Отправляет данные на сервер. - - Args: - data (Any): Данные для отправки. - - Raises: - ConnectionError: Если соединение с сервером не установлено. - """ + """Отправляет данные на сервер.""" if not cls._writer: raise ConnectionError("Not connected to server") @@ -164,41 +136,28 @@ async def _send_data(cls, data: Any) -> None: cls._writer.write(packed_data) await cls._writer.drain() - except Exception as e: logger.error(f"Error sending data: {e}") @classmethod async def _receive_data(cls) -> Any: - """Получает данные с сервера. - - Raises: - ConnectionError: Если соединение с сервером не установлено. - - Returns: - Any: Распакованные данные или None в случае ошибки. - """ + """Получает данные с сервера.""" if not cls._reader: raise ConnectionError("Not connected to server") try: data_size_bytes = await cls._reader.readexactly(4) data_size = int.from_bytes(data_size_bytes, 'big') - packed_data = await cls._reader.readexactly(data_size) - return msgpack.unpackb(packed_data) - except asyncio.IncompleteReadError as e: logger.error(f"Connection closed while receiving data: {e}") await cls._close() return None - except ConnectionResetError as e: logger.error(f"Connection reset by server: {e}") await cls._close() return None - except Exception as e: logger.error(f"Unexpected error receiving data: {e}") await cls._close() @@ -218,7 +177,6 @@ async def _close(cls) -> None: try: cls._writer.close() await cls._writer.wait_closed() - except Exception as e: logger.error(f"Error closing connection: {e}") @@ -241,13 +199,11 @@ async def listen_for_messages(cls) -> None: 'action': cls._action_processor, 'req': cls._req_processor } - for key, processor in processors.items(): - data_type = server_data.get(key, None) + data_type = server_data.get(key) if data_type: await processor(data_type, server_data) break - except Exception as e: logger.error(f"Error in listen_for_messages: {e}") await cls._close() @@ -256,36 +212,24 @@ async def listen_for_messages(cls) -> None: async def _req_processor(cls, req_type, server_data: dict) -> None: if req_type == "auth": await cls._req_authenticate() - elif req_type == 'connect': await cls._req_connect(server_data) - else: - logger.warning(f"Unexpected action type from server: {req_type}") + logger.warning(f"Unexpected request type from server: {req_type}") @classmethod async def _action_processor(cls, action_type, server_data: dict) -> None: if action_type == 'log': cls.log_processor(server_data) - elif action_type == 'net': await cls._call_method(cls._net_methods, server_data.get('type'), **server_data) - else: logger.warning(f"Unexpected action type from server: {action_type}") @classmethod - async def _call_method(cls, metods_dict: Dict[str, Any], method_name: str, **kwargs) -> Any: - """Вызывает зарегистрированный метод по его имени. - - Args: - metods_dict (Dict[str, Any]): Словарь, из которого будут вызываться методы. - method_name (str): Имя метода для вызова. - - Returns: - Any: Результат выполнения метода, если найден, иначе None. - """ - method = metods_dict.get(method_name) + async def _call_method(cls, methods_dict: Dict[str, Any], method_name: str, **kwargs) -> Any: + """Вызывает зарегистрированный метод по его имени.""" + method = methods_dict.get(method_name) if method is None: logger.error(f"Net method {method_name} not found.") return None @@ -298,21 +242,15 @@ async def _call_method(cls, metods_dict: Dict[str, Any], method_name: str, **kwa return await method(cls, **valid_kwargs) else: return method(cls, **valid_kwargs) - except Exception as e: logger.error(f"Error calling method {method_name}: {e}") return None @classmethod async def _req_authenticate(cls) -> None: - """Аутентифицирует клиента на сервере. - """ + """Аутентифицирует клиента на сервере.""" try: - await cls._send_data({ - "login": cls._login, - "password": cls._password - }) - + await cls._send_data({"login": cls._login, "password": cls._password}) except Exception as e: logger.error(f"Error during authentication: {e}") @@ -321,8 +259,7 @@ async def _req_connect(cls, server_data: dict) -> None: status = server_data.get('status', 1) if status == 1: logger.error("Auth fail") - cls._close() - + await cls._close() elif status == 0: cls._is_auth = True cls._server_name = server_data.get('server_name', "not_set_server_name") @@ -340,9 +277,8 @@ def log_processor(cls, server_data: dict) -> None: log_type = server_data.get('log_type', 'not_set') log_method = log_methods.get(log_type) msg = server_data.get('msg', "empty") - + if log_method: log_method(msg) - else: logger.warning(f"Unknown log_type: {log_type}. Message: {msg}") diff --git a/DMBotNetwork/server.py b/DMBotNetwork/server.py index ccbf1ad..51d7e5e 100644 --- a/DMBotNetwork/server.py +++ b/DMBotNetwork/server.py @@ -30,50 +30,41 @@ class Server: def __init_subclass__(cls, **kwargs): """Автоматически регистрирует методы, начинающиеся с 'net_', как сетевые методы.""" super().__init_subclass__(**kwargs) - for method in dir(cls): - if callable(getattr(cls, method)) and method.startswith("net_"): - cls._net_methods[method[4:]] = getattr(cls, method) + cls._net_methods = { + method[4:]: getattr(cls, method) + for method in dir(cls) + if callable(getattr(cls, method)) and method.startswith("net_") + } + # Сеттеры @classmethod - def set_host(cls, host: str) -> None: - cls._host = host - + def set_host(cls, host: str) -> None: cls._host = host @classmethod - def set_port(cls, port: int) -> None: - cls._port = port - + def set_port(cls, port: int) -> None: cls._port = port @classmethod - def set_server_name(cls, server_name: str) -> None: - cls._server_name = server_name - + def set_server_name(cls, server_name: str) -> None: cls._server_name = server_name @classmethod - def set_db_path(cls, db_path: Path) -> None: - cls._db_path = db_path - + def set_db_path(cls, db_path: Path) -> None: cls._db_path = db_path @classmethod - def set_owner_password(cls, owner_password: str) -> None: - cls._owner_password = owner_password + def set_owner_password(cls, owner_password: str) -> None: cls._owner_password = owner_password + # Запуск и остановка сервера @classmethod async def start(cls) -> None: """Запускает сервер и начинает прослушивание входящих подключений.""" - if not cls._host or not cls._port or not cls._db_path: + if not all([cls._host, cls._port, cls._db_path]): logger.error("Host, port, and database path must be set before starting the server.") return try: await cls._init_db() cls._is_online = True - cls._server = await asyncio.start_server(cls._client_handle, cls._host, cls._port) - async with cls._server: logger.info(f'Server started on {cls._host}:{cls._port}') await cls._server.serve_forever() - except asyncio.CancelledError: await cls.stop() - except Exception as e: logger.error(f"Error starting server: {e}") await cls.stop() @@ -82,25 +73,19 @@ async def start(cls) -> None: async def stop(cls) -> None: """Останавливает сервер и закрывает все активные подключения.""" cls._is_online = False - - for login, writer in cls._connects.items(): - await cls._close_connect(login, writer) - + await asyncio.gather(*(cls._close_connect(login, writer) for login, writer in cls._connects.items()), return_exceptions=True) cls._connects.clear() - if cls._server: cls._server.close() await cls._server.wait_closed() - if cls._connection: await cls._connection.close() - logger.info('Server stopped.') + # Инициализация базы данных @classmethod async def _init_db(cls) -> None: - """Инициализирует базу данных, создавая необходимые таблицы, если они не существуют. - Также добавляет пользователя 'owner' с полным доступом, если он отсутствует.""" + """Инициализирует базу данных, создавая необходимые таблицы.""" try: cls._connection = await aiosqlite.connect(cls._db_path / "server.db") await cls._connection.execute(""" @@ -119,91 +104,50 @@ async def _init_db(cls) -> None: ("owner", owner_password_hashed, msgpack.packb({"full_access": True})) ) await cls._connection.commit() - except Exception as e: logger.error(f"Error initializing database: {e}") raise + # Вспомогательные методы для работы с базой данных @classmethod async def _user_exists(cls, username: str) -> bool: - """Проверяет, существует ли пользователь в базе данных. - - Args: - username (str): Имя пользователя для проверки. - - Returns: - bool: True, если пользователь существует, иначе False. - """ + """Проверяет, существует ли пользователь в базе данных.""" try: async with cls._connection.execute("SELECT 1 FROM users WHERE username = ?", (username,)) as cursor: return await cursor.fetchone() is not None - except Exception as e: logger.error(f"Error checking if user exists: {e}") return False @classmethod async def _check_password(cls, password: str, db_password: bytes) -> bool: - """Проверяет соответствие пароля пользователя с хешем из базы данных. - - Args: - password (str): Введенный пользователем пароль. - db_password (bytes): Хеш пароля из базы данных. - - Returns: - bool: True, если пароли совпадают, иначе False. - """ + """Проверяет соответствие пароля пользователя с хешем из базы данных.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, bcrypt.checkpw, password.encode(), db_password) - + @classmethod async def _hash_password(cls, password: str) -> bytes: - """Генерирует хеш пароля для безопасного хранения в базе данных. - - Args: - password (str): Пароль для хеширования. - - Returns: - bytes: Хеш пароля. - """ + """Генерирует хеш пароля для безопасного хранения в базе данных.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, bcrypt.hashpw, password.encode(), bcrypt.gensalt()) + # Работа с пользователями в базе данных @classmethod async def db_login_user(cls, login: str, password: str) -> Optional[str]: - """Проверяет учетные данные пользователя и возвращает логин, если они верны. - - Args: - login (str): Логин пользователя. - password (str): Пароль пользователя. - - Returns: - Optional[str]: Логин пользователя, если учетные данные верны, иначе None. - """ + """Проверяет учетные данные пользователя и возвращает логин, если они верны.""" try: async with cls._connection.execute("SELECT password FROM users WHERE username = ?", (login,)) as cursor: row = await cursor.fetchone() - if row and await cls._check_password(password, row[0]): return login return None - except Exception as e: logger.error(f"Error logging in user {login}: {e}") return None @classmethod async def db_add_user(cls, username: str, password: str, access: Dict[str, bool]) -> bool: - """Добавляет нового пользователя в базу данных. - - Args: - username (str): Логин нового пользователя. - password (str): Пароль нового пользователя. - access (Dict[str, bool]): Словарь прав доступа пользователя. - - Returns: - bool: True, если пользователь успешно добавлен, иначе False. - """ + """Добавляет нового пользователя в базу данных.""" hashed_password = await cls._hash_password(password) packed_access = msgpack.packb(access) try: @@ -213,144 +157,84 @@ async def db_add_user(cls, username: str, password: str, access: Dict[str, bool] ) await cls._connection.commit() return True - except Exception as e: logger.error(f"Error adding user {username}: {e}") return False @classmethod async def db_get_access(cls, username: str) -> Optional[Dict[str, bool]]: - """Возвращает права доступа пользователя. - - Args: - username (str): Логин пользователя. - - Returns: - Optional[Dict[str, bool]]: Словарь прав доступа пользователя, если он существует, иначе None. - """ + """Возвращает права доступа пользователя.""" if username in cls._access_cache: return cls._access_cache[username] - + async with cls._connection.execute("SELECT access FROM users WHERE username = ?", (username,)) as cursor: row = await cursor.fetchone() if row: access_data = msgpack.unpackb(row[0]) cls._access_cache[username] = access_data return access_data - return None @classmethod async def db_delete_user(cls, username: str) -> bool: - """Удаляет пользователя из базы данных. - - Args: - username (str): Логин пользователя для удаления. - - Returns: - bool: True, если пользователь успешно удален, иначе False. - """ + """Удаляет пользователя из базы данных.""" try: await cls._connection.execute("DELETE FROM users WHERE username = ?", (username,)) await cls._connection.commit() return True - except Exception as e: logger.error(f"Error deleting user {username}: {e}") return False @classmethod async def db_change_password(cls, username: str, new_password: str) -> bool: - """Изменяет пароль пользователя. - - Args: - username (str): Логин пользователя. - new_password (str): Новый пароль пользователя. - - Returns: - bool: True, если пароль успешно изменен, иначе False. - """ + """Изменяет пароль пользователя.""" hashed_password = await cls._hash_password(new_password) try: await cls._connection.execute("UPDATE users SET password = ? WHERE username = ?", (hashed_password, username)) await cls._connection.commit() cls._access_cache.pop(username, None) return True - except Exception as e: logger.error(f"Error changing password for user {username}: {e}") return False @classmethod async def db_change_access(cls, username: str, new_access: Optional[Dict[str, bool]] = None) -> bool: - """Изменяет права доступа пользователя. - - Args: - username (str): Логин пользователя. - new_access (Optional[Dict[str, bool]], optional): Новый словарь прав доступа. По умолчанию базовый словарь. - - Returns: - bool: True, если права успешно изменены, иначе False. - """ + """Изменяет права доступа пользователя.""" if username == "owner": new_access = {"full_access": True} - if not new_access: new_access = cls.BASE_ACCESS.copy() packed_access = msgpack.packb(new_access) - try: await cls._connection.execute("UPDATE users SET access = ? WHERE username = ?", (packed_access, username)) await cls._connection.commit() cls._access_cache.pop(username, None) return True - except Exception as e: logger.error(f"Error changing access for user {username}: {e}") return False + # Проверка доступа @classmethod async def check_access_login(cls, username: str, need_access: list[str]) -> bool: - """Проверяет, есть ли у пользователя необходимые права доступа. - - Args: - username (str): Логин пользователя. - need_access (List[str]): Список прав, которые необходимо проверить. - - Returns: - bool: True, если пользователь обладает всеми необходимыми правами, иначе False. - """ + """Проверяет, есть ли у пользователя необходимые права доступа.""" access_dict = await cls.db_get_access(username) return cls.check_access(access_dict, need_access) if access_dict else False @staticmethod def check_access(access_dict: Dict[str, bool], need_access: list[str]) -> bool: - """Проверяет, имеет ли пользователь необходимые права. - - Args: - access_dict (Dict[str, bool]): Словарь прав доступа пользователя. - need_access (List[str]): Список прав, которые необходимо проверить. - - Returns: - bool: True, если все права присутствуют, иначе False. - """ + """Проверяет, имеет ли пользователь необходимые права.""" if access_dict.get("full_access", False): return True - return all(access_dict.get(access, False) for access in need_access) + # Аутентификация и обработка запросов @classmethod async def _req_auth(cls, reader: StreamReader, writer: StreamWriter) -> Optional[str]: - """Запрашивает аутентификацию пользователя. - - Args: - reader (StreamReader): Объект для чтения данных от клиента. - writer (StreamWriter): Объект для отправки данных клиенту. - - Returns: - Optional[str]: Логин пользователя, если аутентификация успешна, иначе None. - """ + """Запрашивает аутентификацию пользователя.""" try: await cls.send_data(writer, {"req": "auth"}) user_data = await asyncio.wait_for(cls._receive_data(reader), timeout=cls.TIME_OUT) @@ -360,11 +244,9 @@ async def _req_auth(cls, reader: StreamReader, writer: StreamWriter) -> Optional return None return await cls.db_login_user(user_data['login'], user_data['password']) - except asyncio.TimeoutError: await cls.send_log(writer, "Timeout error.", 'error') return None - except Exception as err: logger.error(f"Authentication error: {err}") await cls.send_log(writer, "Internal server error.", 'error') @@ -375,7 +257,7 @@ async def _client_handle(cls, reader: StreamReader, writer: StreamWriter) -> Non """Основной цикл обработки запросов от клиента после успешной аутентификации.""" login = await cls._req_auth(reader, writer) if not login: - cls.send_data(writer, {'req': 'connect', 'status': 1}) + await cls.send_data(writer, {'req': 'connect', 'status': 1}) await cls._close_connect(writer=writer) return @@ -386,113 +268,67 @@ async def _client_handle(cls, reader: StreamReader, writer: StreamWriter) -> Non try: while cls._is_online: user_data = await cls._receive_data(reader) - if isinstance(user_data, dict): - action_type = user_data.get('action', None) - if action_type == "net": - answer = await cls._call_method(cls._net_methods, user_data.get('type'), user_login=login, **user_data) - await cls.send_data(writer, answer) - - except asyncio.IncompleteReadError as err: - logger.info(f"Client connection closed: {err}") - - except ConnectionResetError as err: - logger.info(f"Client reset the connection: {err}") - + if isinstance(user_data, dict) and user_data.get('action') == "net": + answer = await cls._call_method(cls._net_methods, user_data.get('type'), user_login=login, **user_data) + await cls.send_data(writer, answer) + except (asyncio.IncompleteReadError, ConnectionResetError) as err: + logger.info(f"Client connection issue: {err}") except Exception as err: logger.error(f"Unexpected error from client: {err}") - - await cls._close_connect(login, writer) + finally: + await cls._close_connect(login, writer) @classmethod async def _close_connect(cls, login: Optional[str] = None, writer: Optional[StreamWriter] = None) -> None: - """Закрывает соединение с клиентом и удаляет его из списка активных подключений. - - Args: - login (Optional[str], optional): Логин пользователя. Defaults to None. - writer (Optional[StreamWriter], optional): Объект для отправки данных клиенту. Defaults to None. - """ + """Закрывает соединение с клиентом и удаляет его из списка активных подключений.""" if not login: - for client_login, stored_writer in cls._connects.items(): - if writer and stored_writer == writer: - login = client_login - break - - if login in cls._connects: - del cls._connects[login] - + login = next((user_login for user_login, w in cls._connects.items() if w == writer), None) + + if login: + cls._connects.pop(login, None) + logger.info(f"User '{login}' is disconnected") + if writer: try: writer.close() await writer.wait_closed() - except Exception as err: logger.error(f"Error closing connection for {login}: {err}") - - if login: - logger.info(f"User '{login}' is disconnected") + # Отправка данных клиентам @classmethod async def send_log_login(cls, login: str, msg: str, log_type: str = 'info') -> None: - await Server.send_data_login(login, {"action": "log", "log_type": log_type, "msg": msg}) + await cls.send_data_login(login, {"action": "log", "log_type": log_type, "msg": msg}) @classmethod async def send_log(cls, writer: StreamWriter, msg: str, log_type: str = 'info') -> None: - await Server.send_data(writer, {"action": "log", "log_type": log_type, "msg": msg}) + await cls.send_data(writer, {"action": "log", "log_type": log_type, "msg": msg}) @classmethod async def send_data_login(cls, login: str, data: Any) -> None: - """Отправляет данные пользователю по его логину. - - Args: - login (str): Логин пользователя. - data (Any): Данные для отправки. - - Raises: - ValueError: Если пользователь с указанным логином не подключен. - """ + """Отправляет данные пользователю по его логину.""" if login not in cls._connects: raise ValueError("Unknown login") - await cls.send_data(cls._connects[login], data) @classmethod async def send_data(cls, writer: StreamWriter, data: Any) -> None: - """Отправляет данные клиенту. - - Args: - writer (StreamWriter): Объект для отправки данных клиенту. - data (Any): Данные для отправки. - """ + """Отправляет данные клиенту.""" try: packed_data = msgpack.packb(data) await cls.send_raw(writer, packed_data) - except Exception as e: logger.error(f"Error sending data: {e}") @classmethod async def broadcast_data(cls, data: Any) -> None: - """Отправляет данные всем клиентам. - - Args: - data (Any): Данные для отправки. - """ + """Отправляет данные всем клиентам.""" packed_data = msgpack.packb(data) - tasks = [] - - for _, writer in cls._connects.items(): - tasks.append(cls.send_raw(writer, packed_data)) - - await asyncio.gather(*tasks, return_exceptions=True) + await asyncio.gather(*(cls.send_raw(writer, packed_data) for writer in cls._connects.values()), return_exceptions=True) @classmethod async def send_raw(cls, writer: StreamWriter, data: bytes) -> None: - """Отправляет сырые данные клиенту. - - Args: - writer (StreamWriter): Объект для отправки данных клиенту. - data (bytes): Данные для отправки. - """ + """Отправляет сырые данные клиенту.""" try: writer.write(len(data).to_bytes(4, byteorder='big')) await writer.drain() @@ -504,31 +340,16 @@ async def send_raw(cls, writer: StreamWriter, data: bytes) -> None: @classmethod async def _receive_data(cls, reader: StreamReader) -> Any: - """Получает данные от клиента. - - Args: - reader (StreamReader): Объект для чтения данных от клиента. - - Returns: - Any: Распакованные данные или None в случае ошибки. - """ + """Получает данные от клиента.""" data_size_bytes = await reader.readexactly(4) data_size = int.from_bytes(data_size_bytes, 'big') packed_data = await reader.readexactly(data_size) return msgpack.unpackb(packed_data) @classmethod - async def _call_method(cls, metods_dict: Dict[str, Any], method_name: str, **kwargs) -> Any: - """Вызывает зарегистрированный метод по его имени. - - Args: - metods_dict (Dict[str, Any]): Словарь из которого будут вызываться методы. - method_name (str): Имя метода для вызова. - - Returns: - Any: Результат выполнения метода, если найден, иначе None. - """ - method = metods_dict.get(method_name) + async def _call_method(cls, methods_dict: Dict[str, Any], method_name: str, **kwargs) -> Any: + """Вызывает зарегистрированный метод по его имени.""" + method = methods_dict.get(method_name) if method is None: logger.error(f"Net method {method_name} not found.") return None @@ -541,7 +362,6 @@ async def _call_method(cls, metods_dict: Dict[str, Any], method_name: str, **kwa return await method(cls, **valid_kwargs) else: return method(cls, **valid_kwargs) - except Exception as e: logger.error(f"Error calling method {method_name}: {e}") return None