diff --git a/DMBotNetwork/server.py b/DMBotNetwork/server.py index 088a200..85561fd 100644 --- a/DMBotNetwork/server.py +++ b/DMBotNetwork/server.py @@ -84,7 +84,7 @@ async def _call_net_method(cls, method_name: str, **kwargs) -> Any: return method(cls, **valid_kwargs) # DB work - async def _db_login_user(self, login: str, password: str) -> Optional[str]: + async def db_login_user(self, login: str, password: str) -> Optional[str]: try: async with self._connection.execute("SELECT password FROM users WHERE username = ?", (login,)) as cursor: row = await cursor.fetchone() @@ -101,7 +101,7 @@ async def _db_login_user(self, login: str, password: str) -> Optional[str]: logging.error(f"Error logging in user {login}: {e}") return None - async def _db_add_user(self, username: str, password: str, access: Dict[str, bool]) -> None: + async def db_add_user(self, username: str, password: str, access: Dict[str, bool]) -> None: hashed_password = await self._hash_password(password) packed_access = msgpack.packb(access) try: @@ -113,7 +113,7 @@ async def _db_add_user(self, username: str, password: str, access: Dict[str, boo except Exception as e: logging.error(f"Error adding user {username}: {e}") - async def _db_get_access(self, username: str) -> Optional[Dict[str, bool]]: + async def db_get_access(self, username: str) -> Optional[Dict[str, bool]]: try: async with self._connection.execute("SELECT access FROM users WHERE username = ?", (username,)) as cursor: row = await cursor.fetchone() @@ -124,14 +124,14 @@ async def _db_get_access(self, username: str) -> Optional[Dict[str, bool]]: logging.error(f"Error getting access for user {username}: {e}") return None - async def _db_delete_user(self, username: str) -> None: + async def db_delete_user(self, username: str) -> None: try: await self._connection.execute("DELETE FROM users WHERE username = ?", (username,)) await self._connection.commit() except Exception as e: logging.error(f"Error deleting user {username}: {e}") - async def _db_change_password(self, username: str, new_password: str) -> None: + async def db_change_password(self, username: str, new_password: str) -> None: hashed_password = await self._hash_password(new_password) try: await self._connection.execute("UPDATE users SET password = ? WHERE username = ?", (hashed_password, username)) @@ -139,7 +139,7 @@ async def _db_change_password(self, username: str, new_password: str) -> None: except Exception as e: logging.error(f"Error changing password for user {username}: {e}") - async def _db_change_access(self, username: str, new_access: Optional[Dict[str, bool]] = None) -> None: + async def db_change_access(self, username: str, new_access: Optional[Dict[str, bool]] = None) -> None: if username == "owner": new_access = {"full_access": True} @@ -169,7 +169,7 @@ async def _req_auth(self, reader: StreamReader, writer: StreamWriter) -> str: await self.send_data(writer, {"action": "log", "log_type": "error", "msg": "Data must contain 'login' and 'password'."}) return None - return await self._db_login_user(user_data['login'], user_data['password']) + return await self.db_login_user(user_data['login'], user_data['password']) except asyncio.TimeoutError: await self.send_data(writer, {"action": "log", "log_type": "error", "msg": "Timeout error."}) diff --git a/Tests/Server.py b/Tests/Server.py index a002f19..011bab7 100644 --- a/Tests/Server.py +++ b/Tests/Server.py @@ -21,45 +21,45 @@ async def asyncTearDown(self): shutil.rmtree(self.db_path) async def test_init_db(self): - access = await self.server._db_get_access('owner') + access = await self.server.db_get_access('owner') self.assertIsNotNone(access) self.assertIn('full_access', access) self.assertTrue(access['full_access']) async def test_db_add_user(self): - await self.server._db_add_user('test_user', 'password123', {'read': True}) - access = await self.server._db_get_access('test_user') + await self.server.db_add_user('test_user', 'password123', {'read': True}) + access = await self.server.db_get_access('test_user') self.assertIsNotNone(access) self.assertIn('read', access) self.assertTrue(access['read']) async def test_db_login_user(self): - await self.server._db_add_user('test_user', 'password123', {'read': True}) - login_result = await self.server._db_login_user('test_user', 'password123') + await self.server.db_add_user('test_user', 'password123', {'read': True}) + login_result = await self.server.db_login_user('test_user', 'password123') self.assertEqual(login_result, 'test_user') - wrong_login_result = await self.server._db_login_user('test_user', 'wrongpassword') + wrong_login_result = await self.server.db_login_user('test_user', 'wrongpassword') self.assertIsNone(wrong_login_result) async def test_db_change_password(self): - await self.server._db_add_user('test_user', 'password123', {'read': True}) - await self.server._db_change_password('test_user', 'newpassword123') - login_result = await self.server._db_login_user('test_user', 'newpassword123') + await self.server.db_add_user('test_user', 'password123', {'read': True}) + await self.server.db_change_password('test_user', 'newpassword123') + login_result = await self.server.db_login_user('test_user', 'newpassword123') self.assertEqual(login_result, 'test_user') async def test_db_change_access(self): - await self.server._db_add_user('test_user', 'password123', {'read': True}) - await self.server._db_change_access('test_user', {'write': True}) - access = await self.server._db_get_access('test_user') + await self.server.db_add_user('test_user', 'password123', {'read': True}) + await self.server.db_change_access('test_user', {'write': True}) + access = await self.server.db_get_access('test_user') self.assertIsNotNone(access) self.assertIn('write', access) self.assertTrue(access['write']) self.assertNotIn('read', access) async def test_db_delete_user(self): - await self.server._db_add_user('test_user', 'password123', {'read': True}) - await self.server._db_delete_user('test_user') - access = await self.server._db_get_access('test_user') + await self.server.db_add_user('test_user', 'password123', {'read': True}) + await self.server.db_delete_user('test_user') + access = await self.server.db_get_access('test_user') self.assertIsNone(access) async def test_send_receive_data(self):