Skip to content

Commit

Permalink
0.2.6 (#17)
Browse files Browse the repository at this point in the history
- добвлен декоратор require_access для проверки доступа вызвавшего
сетевой метод пользователя
  • Loading branch information
themanyfaceddemon authored Sep 19, 2024
1 parent 336c5af commit d1a7fa5
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 37 deletions.
5 changes: 3 additions & 2 deletions DMBotNetwork/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from .main.client import Client
from .main.server import Server
from .main.utils.cl_unit import ClUnit
from .main.utils.decorator import require_access
from .main.utils.server_db import ServerDB

__all__ = ["Client", "Server", "ClUnit", "ServerDB"]
__version__ = "0.2.5"
__all__ = ["Client", "Server", "ClUnit", "require_access", "ServerDB"]
__version__ = "0.2.6"
70 changes: 39 additions & 31 deletions DMBotNetwork/main/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,42 +203,50 @@ async def _cl_handler(

try:
while cls._is_online:
receive_package = await cl_unit.receive_package()
if not isinstance(receive_package, dict):
await cl_unit.send_log_error("Receive data type expected dict.")
continue

code = receive_package.pop("code", None)
if not code:
await cl_unit.send_log_error("Receive data must has 'code' key.")
continue

if code == ResponseCode.NET_REQ:
func_name = receive_package.pop("net_func_name", None)
await cls._call_func(
func_name,
cl_unit,
**receive_package,
)
try:
receive_package = await cl_unit.receive_package()
if not isinstance(receive_package, dict):
await cl_unit.send_log_error("Receive data type expected dict.")
continue

elif code == ResponseCode.GET_REQ:
func_name = receive_package.pop("net_func_name", None)
get_key = receive_package.pop("net_get_key", None)
if get_key is None:
code = receive_package.pop("code", None)
if not code:
await cl_unit.send_log_error(
"Receive data must has 'code' key."
)
continue

data = await cls._call_func(
func_name,
cl_unit,
**receive_package,
)
await cl_unit.send_package(
ResponseCode.GET_REQ, get_key=get_key, data=data
if code == ResponseCode.NET_REQ:
func_name = receive_package.pop("net_func_name", None)
await cls._call_func(
func_name,
cl_unit,
**receive_package,
)

elif code == ResponseCode.GET_REQ:
func_name = receive_package.pop("net_func_name", None)
get_key = receive_package.pop("net_get_key", None)
if get_key is None:
continue

data = await cls._call_func(
func_name,
cl_unit,
**receive_package,
)
await cl_unit.send_package(
ResponseCode.GET_REQ, get_key=get_key, data=data
)

else:
await cl_unit.send_log_error("Unknown 'code' for net type.")

except PermissionError as err:
await cl_unit.send_log_error(
f"Access error. Insufficient permissions for the following: {err}"
)

else:
await cl_unit.send_log_error("Unknown 'code' for net type.")

except (
asyncio.CancelledError,
ConnectionAbortedError,
Expand Down
34 changes: 34 additions & 0 deletions DMBotNetwork/main/utils/decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import List

from .cl_unit import ClUnit
from .server_db import ServerDB

def require_access(req_access: List[str] | str):
"""
A decorator that ensures the user has the required access level(s) before executing the function.
Args:
req_access (List[str] | str): The required access level(s). Can be a single string or a list of strings
representing the access levels needed to execute the function.
Returns:
function: The decorated function that checks user access before execution.
Raises:
PermissionError: If the user does not have the necessary access permissions,
this exception is raised with a message indicating the missing permissions.
"""

if isinstance(req_access, str):
req_access = [req_access]

def decorator(func):
async def wrapper(cl_unit: ClUnit, *args, **kwargs):
if await ServerDB.check_access_login(cl_unit.login, req_access):
return await func(cl_unit, *args, **kwargs)
else:
raise PermissionError(f"Access error. Insufficient permissions for the following: {'; '.join(req_access)}")

return wrapper

return decorator
6 changes: 3 additions & 3 deletions DMBotNetwork/main/utils/server_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,15 @@ async def get_all_users(cls) -> List[str]:
except Exception as err:
logger.error(f"Error fetching all users: {err}")
return []

# Работа с доступами
@classmethod
async def check_access_login(cls, username: str, need_access: list[str]) -> bool:
async def check_access_login(cls, username: str, need_access: List[str]) -> bool:
access_dict = await cls.get_access(username)
return cls.check_access(access_dict, need_access) if access_dict else False

@staticmethod
def check_access(access_dict: Dict[str, bool], need_access: list[str]) -> bool:
def check_access(access_dict: Dict[str, bool], need_access: List[str]) -> bool:
if access_dict.get("full_access", False):
return True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="DMBotNetwork",
version="0.2.5",
version="0.2.6",
packages=find_packages(),
install_requires=["aiosqlite", "aiofiles", "bcrypt", "msgpack"],
author="Angels And Demons dev team",
Expand Down

0 comments on commit d1a7fa5

Please sign in to comment.