From ea29245c2a14002c07bc48464dfdf0975be98bbd Mon Sep 17 00:00:00 2001 From: Mikey Date: Fri, 13 Oct 2023 20:36:13 +0100 Subject: [PATCH] Adding support for syncing to an external platform :eyes: --- utils/api.py | 838 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 838 insertions(+) diff --git a/utils/api.py b/utils/api.py index cabec081..dde20d1e 100644 --- a/utils/api.py +++ b/utils/api.py @@ -1,7 +1,10 @@ import asyncio +import copy import datetime import typing +import aiohttp +import pytz import uvicorn from fastapi import FastAPI, APIRouter, Header, HTTPException, Request from discord.ext import commands @@ -12,6 +15,7 @@ from pydantic import BaseModel +from utils.timestamp import td_format # from helpers import MockContext from utils.utils import tokenGenerator @@ -481,6 +485,840 @@ async def POST_get_fivem( else {"status": "failed"} ) + async def POST_duty_on_actions( + self, authorization: Annotated[str | None, Header()], + request: Request + ): + if not authorization: + return HTTPException(status_code=401, detail="Invalid authorization") + + base_auth = validate_authorization(self.bot, authorization, disable_static_tokens=False) + if not base_auth: + return HTTPException(status_code=401, detail="Invalid authorization") + data = request.query_params.get("ObjectId") + if not data: + return HTTPException(status_code=400, detail="Didn't provide 'ObjectId' parameter.") + + + dataobject = await self.bot.shift_management.shifts.find_by_id(data) + guild = await self.bot.fetch_guild(dataobject["Guild"]) + staff_member = await guild.fetch_member(dataobject['UserID']) + guild_settings = await self.bot.settings.find_by_id(guild.id) + configItem = guild_settings + shift_types = (guild_settings.get('shift_types') or {}).get('types') + mapped = {} + + + + if not shift_types: + shift_type = None + else: + for i in shift_types: + mapped[i['name']] = i + + if len(mapped) != 0: + if dataobject['Type'] in mapped.keys(): + shift_type = mapped[dataobject['Type']] + + + embed = discord.Embed( + title=f"<:ERMAdd:1113207792854106173> Shift Started", color=0xED4348 + ) + try: + embed.set_thumbnail(url=staff_member.display_avatar.url) + embed.set_footer(text="Staff Logging Module") + embed.set_author( + name=staff_member.name, + icon_url=staff_member.display_avatar.url, + ) + except: + pass + + if shift_type: + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>Clocking in. **({shift_type['name']})**", + inline=False, + ) + else: + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value="<:Space:1100877460289101954><:ERMArrow:1111091707841359912>Clocking in.", + inline=False, + ) + embed.add_field( + name="<:ERMList:1111099396990435428> Current Time", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>", + inline=False, + ) + + try: + shift_channel = discord.utils.get( + guild.channels, id=configItem["shift_management"]["channel"] + ) + except: + return 500 + + if shift_channel is None: + return + + await shift_channel.send(embed=embed) + + nickname_prefix = None + changed_nick = False + if shift_type: + if shift_type.get("nickname"): + nickname_prefix = shift_type.get("nickname") + else: + if configItem["shift_management"].get("nickname_prefix"): + nickname_prefix = configItem["shift_management"].get( + "nickname_prefix" + ) + + if nickname_prefix: + current_name = ( + staff_member.nick if staff_member.nick else staff_member.name + ) + new_name = "{}{}".format(nickname_prefix, current_name) + + try: + await staff_member.edit(nick=new_name) + changed_nick = True + except Exception as e: + pass + role = None + + if shift_type: + if shift_type.get("role"): + if isinstance(shift_type.get("role"), list): + role = [ + discord.utils.get(guild.roles, id=rl) + for rl in shift_type.get("role") + ] + else: + role = [ + discord.utils.get( + guild.roles, id=shift_type.get("role") + ) + ] + else: + if configItem["shift_management"]["role"]: + if not isinstance(configItem["shift_management"]["role"], list): + role = [ + discord.utils.get( + guild.roles, + id=configItem["shift_management"]["role"], + ) + ] + else: + role = [ + discord.utils.get(guild.roles, id=role) + for role in configItem["shift_management"]["role"] + ] + if role: + for rl in role: + if rl not in staff_member.roles and rl is not None: + try: + await staff_member.add_roles(rl) + except: + pass + + return 200 + + + async def POST_duty_off_actions(self, + authorization: Annotated[str | None, Header()], + request: Request + ): + if not authorization: + return HTTPException(status_code=401, detail="Invalid authorization") + + base_auth = validate_authorization(self.bot, authorization, disable_static_tokens=False) + if not base_auth: + return HTTPException(status_code=401, detail="Invalid authorization") + data = request.query_params.get("ObjectId") + if not data: + return HTTPException(status_code=400, detail="Didn't provide 'ObjectId' parameter.") + + + dataobject = await self.bot.shift_management.shifts.find_by_id(data) + guild = await self.bot.fetch_guild(dataobject["Guild"]) + staff_member = await guild.fetch_member(dataobject['UserID']) + guild_settings = await self.bot.settings.find_by_id(guild.id) + configItem = guild_settings + shift_types = (guild_settings.get('shift_types') or {}).get('types') + mapped = {} + + + if not shift_types: + shift_type = None + else: + for i in shift_types: + mapped[i['name']] = i + + if len(mapped) != 0: + if dataobject['Type'] in mapped.keys(): + shift_type = mapped[dataobject['Type']] + bot = self.bot + shift = dataobject + member = staff_member + + break_seconds = 0 + if shift: + if shift["Guild"] != guild.id: + shift = None + + if shift: + for index, item in enumerate(shift["Breaks"].copy()): + if item["EndEpoch"] == 0: + item["EndEpoch"] = datetime.datetime.now(tz=pytz.UTC) + shift["Breaks"][index] = item + + startTimestamp = item["StartEpoch"] + endTimestamp = item["EndEpoch"] + break_seconds += int(endTimestamp - startTimestamp) + + if shift.get("Nickname"): + if shift.get("Nickname") == member.nick: + nickname = None + if shift.get("Type") is not None: + settings = await bot.settings.get_settings(guild.id) + shift_types = None + if settings.get("shift_types"): + shift_types = settings["shift_types"].get("types", []) + else: + shift_types = [] + for s in shift_types: + if s["name"] == shift.get("Type"): + shift_type = s + nickname = s["nickname"] if s.get("nickname") else None + if nickname is None: + nickname = settings["shift_management"].get( + "nickname_prefix", "" + ) + try: + await member.edit(nick=member.nick.replace(nickname, "")) + except Exception as e: + pass + + embed = discord.Embed( + title=f"<:ERMRemove:1113207777662345387> Shift Ended", color=0xED4348 + ) + + embed.set_thumbnail(url=member.display_avatar.url) + embed.set_footer(text="Staff Logging Module") + embed.set_author( + name=member.name, + icon_url=member.display_avatar.url, + ) + + if shift.get("Type") != "Default": + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>Clocking out. **({shift_type.get('name')})**", + inline=False, + ) + else: + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value="<:Space:1100877460289101954><:ERMArrow:1111091707841359912>Clocking out.", + inline=False, + ) + + time_delta = datetime.datetime.now(tz=pytz.UTC) - datetime.datetime.fromtimestamp( + shift["StartEpoch"], tz=pytz.UTC + ) + + added_seconds = 0 + removed_seconds = 0 + if "AddedTime" in shift.keys(): + added_seconds = shift["AddedTime"] + if "RemovedTime" in shift.keys(): + removed_seconds = shift["RemovedTime"] + + if break_seconds > 0: + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>{td_format(time_delta)} **({td_format(datetime.timedelta(seconds=break_seconds))})** on break", + inline=False, + ) + else: + embed.add_field( + name="<:ERMList:1111099396990435428> Elapsed Time", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>{td_format(time_delta)}", + inline=False, + ) + + try: + shift_channel = discord.utils.get( + guild.channels, id=configItem["shift_management"]["channel"] + ) + except: + return 500 + + if shift_channel is None: + return + + await shift_channel.send(embed=embed) + + if shift.get("Nickname"): + if shift.get("Nickname") == member.nick: + nickname = None + if shift.get("Type") is not None: + settings = await bot.settings.get_settings(guild.id) + if settings.get("shift_types"): + shift_types = settings["shift_types"].get("types", []) + else: + shift_types = [] + for s in shift_types: + if s["name"] == shift.get("Type"): + shift_type = s + nickname = s["nickname"] if s.get("nickname") else None + if nickname is None: + nickname = settings["shift_management"].get( + "nickname_prefix", "" + ) + try: + await member.edit(nick=member.nick.replace(nickname, "")) + except Exception as e: + pass + role = None + if shift_type: + if shift_type.get("role"): + role = [ + discord.utils.get(guild.roles, id=role) + for role in shift_type.get("role") + ] + else: + if configItem["shift_management"]["role"]: + if not isinstance(configItem["shift_management"]["role"], list): + role = [ + discord.utils.get( + guild.roles, + id=configItem["shift_management"]["role"], + ) + ] + else: + role = [ + discord.utils.get(guild.roles, id=role) + for role in configItem["shift_management"]["role"] + ] + + if role: + for rl in role: + if rl in member.roles and rl is not None: + try: + await member.remove_roles(rl) + except: + pass + return 200 + + + async def POST_duty_break_actions(self, authorization: Annotated[str | None, Header()], request: Request): + if not authorization: + return HTTPException(status_code=401, detail="Invalid authorization") + + base_auth = validate_authorization(self.bot, authorization, disable_static_tokens=False) + if not base_auth: + return HTTPException(status_code=401, detail="Invalid authorization") + data = request.query_params.get("ObjectId") + if not data: + return HTTPException(status_code=400, detail="Didn't provide 'ObjectId' parameter.") + + + dataobject = await self.bot.shift_management.shifts.find_by_id(data) + guild = await self.bot.fetch_guild(dataobject["Guild"]) + staff_member = await guild.fetch_member(dataobject['UserID']) + guild_settings = await self.bot.settings.find_by_id(guild.id) + configItem = guild_settings + shift_types = (guild_settings.get('shift_types') or {}).get('types') + mapped = {} + + + if not shift_types: + shift_type = None + else: + for i in shift_types: + mapped[i['name']] = i + + if len(mapped) != 0: + if dataobject['Type'] in mapped.keys(): + shift_type = mapped[dataobject['Type']] + bot = self.bot + shift = dataobject + member = staff_member + + if shift.get("Nickname"): + if shift.get("Nickname") == member.nick: + nickname = None + if shift.get("Type") is not None: + settings = await bot.settings.get_settings(guild.id) + shift_types = None + if settings.get("shift_types"): + shift_types = settings["shift_types"].get("types", []) + else: + shift_types = [] + for s in shift_types: + if s["name"] == shift.get("Type"): + shift_type = s + nickname = ( + s["nickname"] if s.get("nickname") else None + ) + if nickname is None: + nickname = settings["shift_management"].get( + "nickname_prefix", "" + ) + try: + await member.edit( + nick=member.nick.replace(nickname, "") + ) + except Exception as e: + pass + role = [] + if shift_type: + if shift_type.get("role"): + role = [ + discord.utils.get(guild.roles, id=role) + for role in shift_type.get("role") + ] + else: + if configItem["shift_management"]["role"]: + if not isinstance(configItem["shift_management"]["role"], list): + role = [ + discord.utils.get( + guild.roles, + id=configItem["shift_management"]["role"], + ) + ] + else: + role = [ + discord.utils.get(guild.roles, id=role) + for role in configItem["shift_management"]["role"] + ] + + if role is not None: + for rl in role: + if rl in member.roles and rl is not None: + try: + await member.remove_roles(rl) + except: + pass + + return 200 + + async def POST_duty_end_break_actions(self, authorization: Annotated[str | None, Header()], request: Request): + if not authorization: + return HTTPException(status_code=401, detail="Invalid authorization") + + base_auth = validate_authorization(self.bot, authorization, disable_static_tokens=False) + if not base_auth: + return HTTPException(status_code=401, detail="Invalid authorization") + data = request.query_params.get("ObjectId") + if not data: + return HTTPException(status_code=400, detail="Didn't provide 'ObjectId' parameter.") + + + dataobject = await self.bot.shift_management.shifts.find_by_id(data) + guild = await self.bot.fetch_guild(dataobject["Guild"]) + staff_member = await guild.fetch_member(dataobject['UserID']) + guild_settings = await self.bot.settings.find_by_id(guild.id) + configItem = guild_settings + shift_types = (guild_settings.get('shift_types') or {}).get('types') + mapped = {} + shift_type = None + + + if not shift_types: + shift_type = None + else: + for i in shift_types: + mapped[i['name']] = i + + if len(mapped) != 0: + if dataobject['Type'] in mapped.keys(): + shift_type = mapped[dataobject['Type']] + bot = self.bot + shift = dataobject + member = staff_member + + nickname_prefix = None + changed_nick = False + role = None + + if shift_type: + if shift_type.get("nickname"): + nickname_prefix = shift_type.get("nickname") + else: + if configItem["shift_management"].get("nickname_prefix"): + nickname_prefix = configItem["shift_management"].get("nickname_prefix") + + if nickname_prefix: + current_name = member.nick if member.nick else member.name + new_name = "{}{}".format(nickname_prefix, current_name) + + try: + await member.edit(nick=new_name) + changed_nick = True + except Exception as e: + # # print(e) + pass + + if shift_type: + if shift_type.get("role"): + role = [ + discord.utils.get(guild.roles, id=role) + for role in shift_type.get("role") + ] + else: + if configItem["shift_management"]["role"]: + if not isinstance(configItem["shift_management"]["role"], list): + role = [ + discord.utils.get( + guild.roles, + id=configItem["shift_management"]["role"], + ) + ] + else: + role = [ + discord.utils.get(guild.roles, id=role) + for role in configItem["shift_management"]["role"] + ] + + if role: + for rl in role: + if not rl in member.roles and rl is not None: + try: + await member.add_roles(rl) + except: + pass + return 200 + + async def POST_duty_voided_actions(self, authorization: Annotated[str | None, Header()], request: Request): + if not authorization: + return HTTPException(status_code=401, detail="Invalid authorization") + + base_auth = validate_authorization(self.bot, authorization, disable_static_tokens=False) + if not base_auth: + return HTTPException(status_code=401, detail="Invalid authorization") + data = request.query_params.get("ObjectId") + if not data: + return HTTPException(status_code=400, detail="Didn't provide 'ObjectId' parameter.") + + + dataobject = await self.bot.shift_management.shifts.find_by_id(data) + guild = await self.bot.fetch_guild(dataobject["Guild"]) + staff_member = await guild.fetch_member(dataobject['UserID']) + guild_settings = await self.bot.settings.find_by_id(guild.id) + configItem = guild_settings + shift_types = (guild_settings.get('shift_types') or {}).get('types') + mapped = {} + shift_type = None + + + if not shift_types: + shift_type = None + else: + for i in shift_types: + mapped[i['name']] = i + + if len(mapped) != 0: + if dataobject['Type'] in mapped.keys(): + shift_type = mapped[dataobject['Type']] + bot = self.bot + shift = dataobject + member = staff_member + + nickname_prefix = None + changed_nick = False + role = None + + embed = discord.Embed( + title=f"<:ERMTrash:1111100349244264508> Voided Time", color=0xED4348 + ) + + if shift.get("Nickname"): + if shift.get("Nickname") == member.nick: + nickname = None + if shift.get("Type") is not None: + settings = await bot.settings.get_settings(guild.id) + shift_types = None + if settings.get("shift_types"): + shift_types = settings["shift_types"].get("types", []) + else: + shift_types = [] + for s in shift_types: + if s["name"] == shift.get("Type"): + shift_type = s + nickname = s["nickname"] if s.get("nickname") else None + if nickname is None: + nickname = settings["shift_management"].get( + "nickname_prefix", "" + ) + try: + await member.edit( + nick=member.nick.replace(nickname, "") + ) + except Exception as e: + pass + try: + embed.set_thumbnail(url=member.display_avatar.url) + embed.set_author( + name=member.name, + icon_url=member.display_avatar.url, + ) + except: + pass + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>Voided time.", + inline=False, + ) + + embed.add_field( + name="<:ERMList:1111099396990435428> Elapsed Time", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>{td_format(datetime.datetime.now(tz=pytz.UTC) - datetime.datetime.fromtimestamp(shift['StartEpoch'], tz=pytz.UTC))}", + inline=False, + ) + + embed.set_footer(text="Staff Logging Module") + + sh = await bot.shift_management.get_current_shift(member, guild.id) + await bot.shift_management.shifts.delete_by_id(sh["_id"]) + + + try: + shift_channel = discord.utils.get( + guild.channels, id=configItem["shift_management"]["channel"] + ) + except: + return 500 + + if shift_channel is None: + return + + await shift_channel.send(embed=embed) + + role = None + if shift_type: + if shift_type.get("role"): + role = [ + discord.utils.get(guild.roles, id=role) + for role in shift_type.get("role") + ] + else: + if configItem["shift_management"]["role"]: + if not isinstance(configItem["shift_management"]["role"], list): + role = [ + discord.utils.get( + guild.roles, + id=configItem["shift_management"]["role"], + ) + ] + else: + role = [ + discord.utils.get(guild.roles, id=role) + for role in configItem["shift_management"]["role"] + ] + + if role: + for rl in role: + if rl in member.roles and rl is not None: + try: + await member.remove_roles(rl) + except: + pass + return 200 + + async def POST_punishment_logged(self, authorization: Annotated[str | None, Header()], request: Request): + if not authorization: + return HTTPException(status_code=401, detail="Invalid authorization") + + base_auth = validate_authorization(self.bot, authorization, disable_static_tokens=False) + if not base_auth: + return HTTPException(status_code=401, detail="Invalid authorization") + data = request.query_params.get("ObjectId") + if not data: + return HTTPException(status_code=400, detail="Didn't provide 'ObjectId' parameter.") + + dataobject = await self.bot.punishments.find_by_id(data) + guild = await self.bot.fetch_guild(dataobject["Guild"]) + staff_member = await guild.fetch_member(dataobject['ModeratorID']) + guild_settings = await self.bot.settings.find_by_id(guild.id) + configItem = guild_settings + settings = configItem + warning_types = (await self.bot.punishment_types.get_punishment_types(guild.id)) + + type = dataobject['Type'] + bot = self.bot + designated_channel = None + + if settings: + warning_type = None + designated_channel = None + for warning in warning_types: + if isinstance(warning, str): + if warning.lower() == type.lower(): + warning_type = warning + elif isinstance(warning, dict): + if warning["name"].lower() == type.lower(): + warning_type = warning + + if isinstance(warning_type, str): + if settings["customisation"].get("kick_channel"): + if settings["customisation"]["kick_channel"] != "None": + if type.lower() == "kick": + designated_channel = bot.get_channel( + settings["customisation"]["kick_channel"] + ) + if settings["customisation"].get("ban_channel"): + if settings["customisation"]["ban_channel"] != "None": + if type.lower() == "ban": + designated_channel = bot.get_channel( + settings["customisation"]["ban_channel"] + ) + if settings["customisation"].get("bolo_channel"): + if settings["customisation"]["bolo_channel"] != "None": + if type.lower() == "bolo": + designated_channel = bot.get_channel( + settings["customisation"]["bolo_channel"] + ) + else: + if isinstance(warning_type, dict): + if "channel" in warning_type.keys(): + if warning_type["channel"] != "None": + designated_channel = bot.get_channel( + warning_type["channel"] + ) + + + shift = dataobject + member = staff_member + avatar = None + + async with aiohttp.ClientSession() as session: + async with session.get( + f"https://thumbnails.roblox.com/v1/users/avatar?userIds={dataobject['UserID']}&size=420x420&format=Png" + ) as f: + if f.status == 200: + avatar = await f.json() + avatar = avatar["data"][0]["imageUrl"] + else: + avatar = "" + + embed = discord.Embed( + title="<:ERMAdd:1113207792854106173> Punishment Logged", color=0xED4348 + ) + embed.set_thumbnail(url=avatar) + embed.set_author( + name=member.name, + icon_url=member.display_avatar.url, + ) + try: + embed.set_footer(text="Staff Logging Module") + except: + pass + embed.add_field( + name="<:ERMList:1111099396990435428> Staff Member", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912> {member.mention}", + inline=False, + ) + embed.add_field( + name="<:ERMList:1111099396990435428> Violator", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912> {dataobject['Username']}", + inline=False, + ) + embed.add_field( + name="<:ERMList:1111099396990435428> Type", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912> {type.lower().title()}", + inline=False, + ) + embed.add_field( + name="<:ERMList:1111099396990435428> Reason", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912> {dataobject['Reason']}", + inline=False, + ) + + if designated_channel is None: + designated_channel = discord.utils.get( + guild.channels, id=configItem["punishments"]["channel"] + ) + + + shift = await bot.shift_management.get_current_shift( + member, guild.id + ) + if shift: + shift["Moderations"].append(data) + await bot.shift_management.shifts.update_by_id(shift) + + success = ( + discord.Embed( + title=f"{dataobject['Username']}", + description=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>**Reason:** {dataobject['Reason']}\n<:Space:1100877460289101954><:ERMArrow:1111091707841359912>**Type:** {type.lower().title()}", + color=0xED4348, + ) + .set_author( + name=member.name, icon_url=member.display_avatar.url + ) + .set_thumbnail(url=avatar) + ) + + roblox_id = dataobject['UserID'] + + discord_user = None + async for document in bot.synced_users.db.find({"roblox": roblox_id}): + discord_user = document["_id"] + + if discord_user: + try: + member = await guild.fetch_member(discord_user) + except discord.NotFound: + member = None + + if member: + should_dm = True + + async for doc in bot.consent.db.find({"_id": member.id}): + if doc.get("punishments"): + if document.get("punishments") is False: + should_dm = False + + if should_dm: + try: + personal_embed = discord.Embed( + title="<:ERMPunish:1111095942075138158> You have been moderated!", + description=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912>***{guild.name}** has moderated you in-game*", + color=0xED4348, + ) + personal_embed.add_field( + name="<:ERMList:1111099396990435428> Moderation Details", + value=f"<:Space:1100877460289101954><:ERMArrow:1111091707841359912> **Username:** {dataobject['Username']}\n<:Space:1100877460289101954><:ERMArrow:1111091707841359912> **Reason:** {dataobject['Reason']}\n<:Space:1100877460289101954><:ERMArrow:1111091707841359912> **Type:** {type.lower().title()}", + inline=False, + ) + + try: + personal_embed.set_author( + name=guild.name, icon_url=guild.icon.url + ) + except: + personal_embed.set_author(name=guild.name) + + await member.send( + embed=personal_embed, + content=f"<:ERMAlert:1113237478892130324> **{member.name}**, you have been moderated inside of **{guild.name}**.", + ) + + except: + pass + + try: + await designated_channel.send(embed=embed) + except: + pass + + return 200 + + async def POST_duty_on( self, authorization: Annotated[str | None, Header()],