diff --git a/sora/config/path.py b/sora/config/path.py index 4f1dc97..1fdf277 100644 --- a/sora/config/path.py +++ b/sora/config/path.py @@ -21,6 +21,8 @@ # 数据库路径 DATABASE_PATH = Path() / "data" / "database" +# 黑名单数据路径 +USER_BAN_DB_PATH = DATABASE_PATH / "ban.db" # 用户数据路径 USER_INFO_DB_PATH = DATABASE_PATH / "user.db" # 用户绑定数据路径 diff --git a/sora/database/__init__.py b/sora/database/__init__.py index 2d71cd5..f71f5bb 100644 --- a/sora/database/__init__.py +++ b/sora/database/__init__.py @@ -5,19 +5,25 @@ from sora.utils import scheduler from sora.config.path import ( + USER_BAN_DB_PATH, USER_BIND_DB_PATH, USER_INFO_DB_PATH, USER_SIGN_DB_PATH, ) +from .models import BanUser as BanUser from .models import UserBind as UserBind from .models import UserInfo as UserInfo from .models import UserSign as UserSign -from .models import bind, sign, user +from .models import ban, bind, sign, user DATABASE = { "connections": { + "ban_info": { + "engine": "tortoise.backends.sqlite", + "credentials": {"file_path": USER_BAN_DB_PATH}, + }, "user_info": { "engine": "tortoise.backends.sqlite", "credentials": {"file_path": USER_INFO_DB_PATH}, @@ -33,6 +39,10 @@ # 'memory_db': 'sqlite://:memory:' }, "apps": { + "ban_info": { + "models": [ban.__name__], + "default_connection": "ban_info", + }, "user_info": { "models": [user.__name__], "default_connection": "user_info", diff --git a/sora/database/models/__init__.py b/sora/database/models/__init__.py index be6484e..92e82f2 100644 --- a/sora/database/models/__init__.py +++ b/sora/database/models/__init__.py @@ -1,7 +1,9 @@ +from . import ban as ban from . import bind as bind from . import sign as sign from . import user as user +from .ban import BanUser as BanUser from .bind import UserBind as UserBind from .sign import UserSign as UserSign from .user import UserInfo as UserInfo diff --git a/sora/database/models/ban.py b/sora/database/models/ban.py new file mode 100644 index 0000000..1581b79 --- /dev/null +++ b/sora/database/models/ban.py @@ -0,0 +1,84 @@ +import time +from tortoise import fields +from tortoise.models import Model + +from sora.log import logger + + +class BanUser(Model): + id = fields.IntField(pk=True, generated=True, auto_increment=True) + user_id = fields.CharField(max_length=10, unique=True) + """用户ID""" + ban_time = fields.BigIntField() + """ban开始的时间""" + duration = fields.BigIntField() + """ban时长""" + + class Meta: + table = "banUser" + table_description = "封禁人员数据表" + + @classmethod + async def check_ban_time(cls, user_id: int | str) -> int | str: + """ + 说明: + 检测用户被ban时长 + 参数: + * user_id: 用户id + """ + if user := await cls.filter(user_id=str(user_id)).first(): + if ( + time.time() - (user.ban_time + user.duration) > 0 + and user.duration != -1 + ): + return "" + if user.duration == -1: + return "∞" + return int(time.time() - user.ban_time - user.duration) + return "" + + @classmethod + async def is_ban(cls, user_id: int | str) -> bool: + """ + 说明: + 判断用户是否被ban + 参数: + * user_id: 用户id + """ + if await cls.check_ban_time(str(user_id)): + return True + else: + await cls.unban(user_id) + return False + + @classmethod + async def ban(cls, user_id: int | str, duration: int): + """ + 说明: + ban掉目标用户 + 参数: + * user_id: 目标用户id + * duration: ban时长(秒),-1 表示永久封禁 + """ + logger.debug("封禁", f"封禁用户,ID: {str(user_id)},时长: {duration}") + if await cls.filter(user_id=str(user_id)).first(): + await cls.unban(user_id) + await cls.create( + user_id=str(user_id), + ban_time=time.time(), + duration=duration, + ) + + @classmethod + async def unban(cls, user_id: int | str) -> bool: + """ + 说明: + unban用户 + 参数: + * user_id: 用户id + """ + if user := await cls.filter(user_id=user_id).first(): + logger.debug("封禁", f"解除封禁:{str(user_id)}") + await user.delete() + return True + return False diff --git a/sora/plugins/ban/__init__.py b/sora/plugins/ban/__init__.py new file mode 100644 index 0000000..a7a6120 --- /dev/null +++ b/sora/plugins/ban/__init__.py @@ -0,0 +1,139 @@ +from nonebot import require +from nonebot.rule import to_me +from nonebot.internal.adapter import Bot +from nonebot.internal.adapter import Event + +require("nonebot_plugin_saa") +require("nonebot_plugin_alconna") +from arclet.alconna.args import Args +from arclet.alconna.base import Option +from arclet.alconna.core import Alconna +from arclet.alconna.typing import CommandMeta +from nonebot_plugin_alconna import Match, on_alconna, AlconnaMatch +from nonebot_plugin_alconna.adapters import At +from nonebot_plugin_saa import MessageFactory + +from sora.log import logger +from sora.database import BanUser +from sora.utils.user import get_bind_info +from sora.permission import BOT_HELPER, get_helper_list + + +ban = on_alconna( + Alconna( + "ban", + Option( + "add", + Args["target", At | int]["hours?", int]["minutes?", int], + help_text="封禁用户", + ), + Option("remove", Args["target", At | int], help_text="解封用户"), + Option("-l|--list", help_text="查询所有被封禁用户"), + meta=CommandMeta( + description="你被逮捕了!丢进小黑屋!", + usage="@bot /ban [小时] [分钟]", + example=""" + @bot /ban add @user + @bot /ban add 2023081136 2 + @bot /ban add @user 2 30 + @bot /ban remove @user + @bot /ban -l + """, + compact=True, + ), + ), + priority=5, + block=True, + rule=to_me(), + permission=BOT_HELPER, +) + + +@ban.assign("add") +async def add( + bot: Bot, + event: Event, + banUser: Match[At | int] = AlconnaMatch("target"), + hours: Match[int] = AlconnaMatch("hours"), + minutes: Match[int] = AlconnaMatch("minutes"), +): + if isinstance(banUser.result, At): + target = banUser.result.target + if target == bot.self_id: + await MessageFactory("你的权限不够喔").send(at_sender=True) + await ban.finish() + target_id = (await get_bind_info(event, target)).user_id + else: + target_id = banUser.result + + if target_id in await get_helper_list(): + await MessageFactory("你的权限不够喔").send(at_sender=True) + await ban.finish() + + if hours.available: + if minutes.available: + minutes_result = minutes.result + else: + minutes_result = 0 + hours_result = hours.result + logger.info("封禁", f"封禁目标:{target_id},时长:{hours_result}小时{minutes_result}分钟") + await BanUser.ban( + target_id, duration=convert_to_seconds(hours_result, minutes_result) + ) + await MessageFactory( + f"已成功封禁用户:{target_id},时长:{hours_result}小时{minutes_result}分钟" + ).send(at_sender=True) + else: + logger.info("封禁", f"永久封禁目标:{target_id}") + await BanUser.ban(target_id, duration=-1) + await MessageFactory(f"已永久封禁用户:{target_id}").send(at_sender=True) + + await ban.finish() + + +@ban.assign("remove") +async def remove( + bot: Bot, event: Event, banUser: Match[At | int] = AlconnaMatch("target") +): + if isinstance(banUser.result, At): + target = banUser.result.target + if target == bot.self_id: + await MessageFactory("你的权限不够喔").send(at_sender=True) + await ban.finish() + target_id = (await get_bind_info(event, target)).user_id + else: + target_id = banUser.result + + if target_id in await get_helper_list(): + await MessageFactory("你的权限不够喔").send(at_sender=True) + await ban.finish() + + await BanUser.unban(target_id) + logger.info("封禁", f"解除封禁:{target_id}") + await MessageFactory(f"已解除封禁:{target_id}").send(at_sender=True) + + await ban.finish() + + +@ban.assign("list") +async def list(): + ban_users = await BanUser.all() + if ban_users: + message = "\n".join( + [ + f"ID: {ban_user.id}\n" + f"用户ID: {ban_user.user_id}\n" + f"封禁开始时间: {ban_user.ban_time}\n" + f"封禁时长: {ban_user.duration}\n" + for ban_user in ban_users + ] + ) + else: + message = "当前还没有被封禁用户呢" + await MessageFactory(message).send(at_sender=True) + + +def convert_to_seconds(hours: int, minutes: int): + total_minutes = hours * 60 + minutes + total_seconds = total_minutes * 60 + return total_seconds diff --git a/sora/plugins/hooks/__init__.py b/sora/plugins/hooks/__init__.py index 929d6e1..3920332 100644 --- a/sora/plugins/hooks/__init__.py +++ b/sora/plugins/hooks/__init__.py @@ -1,5 +1,4 @@ +from . import check_ban as check_ban from . import user_exist as user_exist from . import eventexpiry as eventexpiry from . import level_upgrade as level_upgrade - -__all__ = ["user_exist", "eventexpiry", "level_upgrade"] diff --git a/sora/plugins/hooks/check_ban.py b/sora/plugins/hooks/check_ban.py new file mode 100644 index 0000000..c7517fd --- /dev/null +++ b/sora/plugins/hooks/check_ban.py @@ -0,0 +1,17 @@ +from typing import Annotated + +from nonebot.message import run_preprocessor +from nonebot.exception import IgnoredException + +from sora.log import logger +from sora.utils.user import getUserInfo +from sora.database import BanUser, UserInfo + + +@run_preprocessor +async def _(userInfo: Annotated[UserInfo, getUserInfo()]): + user_id = userInfo.user_id + is_ban = await BanUser.is_ban(user_id) + if is_ban: + logger.debug("封禁", f"用户 {user_id} 处于黑名单中") + raise IgnoredException("用户处于黑名单中")