diff --git a/bot/cogs/clashofcode.py b/bot/cogs/clashofcode.py deleted file mode 100644 index e7516c46..00000000 --- a/bot/cogs/clashofcode.py +++ /dev/null @@ -1,311 +0,0 @@ -import asyncio -import re -import time - -import aiohttp -import discord -from discord.ext import commands - -from bot.config import settings - -REGEX = re.compile(r"https://www.codingame.com/clashofcode/clash/([0-9a-f]{39})") -API_URL = "https://www.codingame.com/services/ClashOfCode/findClashByHandle" - - -class ClashOfCode(commands.Cog): - def __init__(self, bot): - self.bot = bot - self.session = False - self.session_message: int = 0 - self.session_users = [] - self.last_clash: int = 0 - - @property - def role(self): - return self.bot.guild.get_role(settings.coc.role_id) - - def em(self, mode, players): - embed = discord.Embed(title="**Clash started**") - embed.add_field(name="Mode", value=mode, inline=False) - embed.add_field(name="Players", value=players) - return embed - - @commands.Cog.listener() - async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent): - if payload.user_id == self.bot.user.id: - return - - if self.session_message != 0: - if payload.message_id == self.session_message: - if str(payload.emoji) == "🖐️": - if payload.user_id not in self.session_users: - self.session_users.append(payload.user_id) - - if payload.message_id != settings.coc.message_id: - return - - if self.role in payload.member.roles: - return - - await payload.member.add_roles(self.role) - try: - await payload.member.send(f"Gave you the **{self.role.name}** role!") - except discord.HTTPException: - pass - - @commands.Cog.listener() - async def on_raw_reaction_remove(self, payload: discord.RawReactionActionEvent): - if payload.user_id == self.bot.user.id: - return - - if self.session_message != 0: - if payload.message_id == self.session_message: - if str(payload.emoji) == "🖐️": - if payload.user_id in self.session_users: - self.session_users.remove(payload.user_id) - - if payload.message_id != settings.coc.message_id: - return - - member = self.bot.guild.get_member(payload.user_id) - if self.role not in member.roles: - return - - await member.remove_roles(self.role) - try: - await member.send(f"Removed your **{self.role.name}** role!") - except discord.HTTPException: - pass - - @commands.group(aliases=["coc"]) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - async def clash_of_code(self, ctx: commands.Context): - """Clash of Code""" - if ctx.invoked_subcommand is None: - if self.session_message == 0: - return await ctx.send_help(self.bot.get_command("coc session")) - return await ctx.send_help(self.bot.get_command("coc invite")) - - @clash_of_code.group(aliases=["s"]) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - async def session(self, ctx: commands.Context): - """Start or End a clash of code session""" - if ctx.invoked_subcommand is None: - if self.session_message == 0: - return await ctx.send_help(self.bot.get_command("coc session start")) - return await ctx.send_help(self.bot.get_command("coc session end")) - - @session.command(name="start", aliases=["s"]) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - async def session_start(self, ctx: commands.context): - """Start a new coc session""" - if self.session_message != 0: - return await ctx.send( - "There is an active session right now.\n" - "Join by reacting to the pinned message or using `t.coc session join`. Have fun!" - ) - - pager = commands.Paginator( - prefix=f"**Hey, {ctx.author.mention} is starting a coc session.\n" - f"Use `t.coc session join` or react to this message to join**", - suffix="", - ) - - for member in self.role.members: - if member != ctx.author: - if member.status != discord.Status.offline: - pager.add_line(member.mention + ", ") - - if not len(pager.pages): - return await ctx.send(f"{ctx.author.mention}, Nobody is online to play with <:pepesad:733816214010331197>") - - self.session = True - self.last_clash = int(time.time()) - self.session_users.append(ctx.author.id) - - msg = await ctx.send(pager.pages[0]) - self.session_message = msg.id - await msg.add_reaction("🖐️") - - try: - await msg.pin() - except Exception: - await ctx.send("Failed to pin message") - - while self.session_message != 0: - await asyncio.sleep(10) - - if self.last_clash + 1800 < int(time.time()) and self.session_message != 0: - await ctx.send("Clash session has been closed due to inactivity") - try: - await msg.unpin() - except Exception: - await ctx.send("Failed to unpin message") - - self.last_clash = 0 - self.session_users = [] - self.session_message = 0 - self.session = False - break - - @session.command(name="join", aliases=["j"]) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - async def session_join(self, ctx: commands.Context): - """Join the current active coc session""" - if self.session_message == 0: - return await ctx.send( - "There is no active coc session right now" "use `t.coc session start` to start a coc session" - ) - if ctx.author.id in self.session_users: - return await ctx.send( - "You are already in the session. Have fun playing.\n" - "If you want to leave remove your reaction or use `t.coc session leave`" - ) - self.session_users.append(ctx.author.id) - return await ctx.send("You have joined the session. Have fun playing") - - @session.command(name="leave", aliases=["l"]) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - async def session_leave(self, ctx: commands.Context): - """Leave the current active coc session""" - if self.session_message == 0: - return await ctx.send( - "There is no active coc session right now" "use `t.coc session start` to start a coc session" - ) - if ctx.author.id not in self.session_users: - return await ctx.send( - "You aren't in a clash of code session right now.\n" - "If you want to join react to session message or use `t.coc session join`" - ) - self.session_users.remove(ctx.author.id) - return await ctx.send("You have left the session. No more pings for now") - - @session.command(name="end", aliases=["e"]) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - async def session_end(self, ctx: commands.context): - """Ends the current coc session""" - if self.session_message == 0: - return await ctx.send("There is no active clash of code session") - - try: - msg = await ctx.channel.fetch_message(self.session_message) - try: - await msg.unpin() - except Exception: - await ctx.send("Failed to unpin message") - except Exception: - await ctx.send("Error while fetching message to unpin") - - self.last_clash = 0 - self.session_users = [] - self.session_message = 0 - self.session = False - - return await ctx.send(f"Clash session has been closed by {ctx.author.mention}. See you later") - - @clash_of_code.command(name="invite", aliases=["i"]) - @commands.has_any_role( - settings.moderation.staff_role_id, - settings.coc.role_id, - ) - @commands.check(lambda ctx: ctx.channel.id == settings.coc.channel_id) - @commands.cooldown(1, 60, commands.BucketType.channel) - async def coc_invite(self, ctx: commands.Context, *, url: str = None): - """Mentions all the users with the `Clash Of Code` role that are in the current session.""" - await ctx.message.delete() - if self.session_message == 0: - ctx.command.reset_cooldown(ctx) - return await ctx.send( - "No active Clash of Code session please create one to start playing\n" - "Use `t.coc session start` to start a coc session <:smilecat:727592135171244103>" - ) - - if ctx.author.id not in self.session_users: - ctx.command.reset_cooldown(ctx) - return await ctx.send( - "You can't create a clash unless you participate in the session\n" - "Use `t.coc session join` or react to the pinned message to join the coc session " - "<:smilecat:727592135171244103>" - ) - - if url is None: - ctx.command.reset_cooldown(ctx) - return await ctx.send("You should provide a valid clash of code url") - - link = REGEX.fullmatch(url) - if not link: - ctx.command.reset_cooldown(ctx) - return await ctx.send('Could not find any valid "clashofcode" url') - - self.last_clash = time.time() - - id = link[1] - - async with aiohttp.ClientSession() as session: - async with session.post(API_URL, json=[id]) as resp: - json = await resp.json() - plang = json["programmingLanguages"] - pager = commands.Paginator( - prefix="\n".join( - [ - f"**Hey, {ctx.author.mention} is hosting a Clash Of Code game!**", - f"Mode{'s' if len(json['modes']) > 1 else ''}: {', '.join(json['modes'])}", - f"Programming languages: {', '.join(plang) if plang else 'All'}", - f"Join here: {link[0]}", - ] - ), - suffix="", - ) - - for member_id in self.session_users: - if member_id != ctx.author.id: - member = self.bot.get_user(member_id) - pager.add_line(member.mention + ", ") - - if not len(pager.pages): - return await ctx.send(f"{ctx.author.mention}, Nobody is online to play with <:pepesad:733816214010331197>") - - for page in pager.pages: - await ctx.send(page) - - async with aiohttp.ClientSession() as session: - while not json["started"]: - await asyncio.sleep(10) # wait 10s to avoid flooding the API - async with session.post(API_URL, json=[id]) as resp: - json = await resp.json() - - players = len(json["players"]) - players_text = ", ".join( - [p["codingamerNickname"] for p in sorted(json["players"], key=lambda p: p["position"])] - ) - start_message = await ctx.send(embed=self.em(json["mode"], players_text)) - - async with aiohttp.ClientSession() as session: - while not json["finished"]: - await asyncio.sleep(10) # wait 10s to avoid flooding the API - async with session.post(API_URL, json=[id]) as resp: - json = await resp.json() - - if len(json["players"]) != players: - players_text = ", ".join( - [p["codingamerNickname"] for p in sorted(json["players"], key=lambda p: p["position"])] - ) - await start_message.edit(embed=self.em(json["mode"], players_text)) - - await ctx.em( - title="**Clash finished**", - description="\n".join( - ["Results:"] - + [ - # Example "1. Takos (Code length: 123, Score 100%, Time 1:09)" - f"{p['rank']}. {p['codingamerNickname']} (" - + (f"Code length: {p['criterion']}, " if json["mode"] == "SHORTEST" else "") - + f"Score: {p['score']}%, Time: {p['duration'] // 60_000}:{p['duration'] // 1000 % 60:02})" - for p in sorted(json["players"], key=lambda p: p["rank"]) - ] - ), - ) - - -async def setup(bot): - await bot.add_cog(ClashOfCode(bot)) diff --git a/bot/core.py b/bot/core.py index f3a14134..2db15e95 100644 --- a/bot/core.py +++ b/bot/core.py @@ -105,6 +105,16 @@ async def on_app_command_error(self, interaction: "InteractionType", error: app_ return if isinstance(error, app_commands.CheckFailure): + if interaction.command.qualified_name.startswith("coc "): + if isinstance(error, app_commands.MissingAnyRole): + await interaction.response.send_message( + "You need to have the Clash Of Code role to use this command", ephemeral=True + ) + else: + await interaction.response.send_message( + "You need to be in the Clash Of Code channel to use this command", ephemeral=True + ) + log.info(f"{interaction.user} failed to use the command {interaction.command.qualified_name}") return diff --git a/bot/extensions/clashofcode/__init__.py b/bot/extensions/clashofcode/__init__.py new file mode 100644 index 00000000..32e41e23 --- /dev/null +++ b/bot/extensions/clashofcode/__init__.py @@ -0,0 +1,9 @@ +from bot.core import DiscordBot + +from .commands import ClashOfCode +from .events import ClashOfCodeEvents + + +async def setup(bot: DiscordBot) -> None: + await bot.add_cog(ClashOfCode(bot=bot)) + await bot.add_cog(ClashOfCodeEvents(bot=bot)) diff --git a/bot/extensions/clashofcode/commands.py b/bot/extensions/clashofcode/commands.py new file mode 100644 index 00000000..eb74b581 --- /dev/null +++ b/bot/extensions/clashofcode/commands.py @@ -0,0 +1,277 @@ +import asyncio +import re +import time + +import aiohttp +import discord +from discord import Forbidden, HTTPException, NotFound, app_commands +from discord.ext import commands + +from bot import core +from bot.config import settings +from bot.extensions.clashofcode.utils import coc_helper + +REGEX = re.compile(r"https://www.codingame.com/clashofcode/clash/([0-9a-f]{39})") +API_URL = "https://www.codingame.com/services/ClashOfCode/findClashByHandle" + + +def em(mode, players): + embed = discord.Embed(title="**Clash started**") + embed.add_field(name="Mode", value=mode, inline=False) + embed.add_field(name="Players", value=players) + return embed + + +class ClashOfCode(commands.GroupCog, group_name="coc"): + session_commands = app_commands.Group( + name="session", + description="Commands for clash of code sessions", + ) + + def __init__(self, bot: core.DiscordBot): + self.bot = bot + self.session = False + self.last_clash: int = 0 + + @property + def role(self): + return self.bot.guild.get_role(settings.coc.role_id) + + @session_commands.command(name="start") + @app_commands.check(lambda interaction: interaction.channel_id == settings.coc.channel_id) + async def session_start(self, interaction: core.InteractionType): + """Start a new coc session""" + + if coc_helper.session_message != 0: + return await interaction.response.send_message( + "There is an active session right now.\n" + "Join by reacting to the pinned message or using `/coc session join`. Have fun!", + ephemeral=True, + ) + + pager = commands.Paginator( + prefix=f"**Hey, {interaction.user.mention} is starting a coc session.\n" + f"Use `/coc session join` or react to this message to join**", + suffix="", + ) + + for member in self.role.members: + if member != interaction.user: + if member.status != discord.Status.offline: + pager.add_line(member.mention + ", ") + + if not len(pager.pages): + return await interaction.response.send_message( + "Nobody is online to play with <:pepesad:733816214010331197>", ephemeral=True + ) + + self.session = True + self.last_clash = int(time.time()) + coc_helper.session_users.append(interaction.user.id) + + await interaction.response.send_message(pager.pages[0], allowed_mentions=discord.AllowedMentions(users=True)) + message = await interaction.original_response() + coc_helper.session_message = message.id + await message.add_reaction("🖐️") + + try: + await message.pin() + except (Forbidden, HTTPException): + await interaction.channel.send("Failed to pin message") + + while coc_helper.session_message != 0: + await asyncio.sleep(10) + + if self.last_clash + 1800 < int(time.time()) and coc_helper.session_message != 0: + await interaction.channel.send("Clash session has been closed due to inactivity") + try: + await message.unpin() + except (Forbidden, HTTPException, NotFound): + await interaction.channel.send("Failed to unpin message") + + self.last_clash = 0 + coc_helper.session_users = [] + coc_helper.session_message = 0 + self.session = False + break + + @session_commands.command(name="join") + @app_commands.check(lambda interaction: interaction.channel_id == settings.coc.channel_id) + async def session_join(self, interaction: core.InteractionType): + """Join the current active coc session""" + + if coc_helper.session_message == 0: + return await interaction.response.send_message( + "There is no active coc session right now" "use `/coc session start` to start a coc session", + ephemeral=True, + ) + + if interaction.user.id in coc_helper.session_users: + return await interaction.response.send_message( + "You are already in the session. Have fun playing.\n" + "If you want to leave remove your reaction or use `/coc session leave`", + ephemeral=True, + ) + + coc_helper.session_users.append(interaction.user.id) + return await interaction.response.send_message("You have joined the session. Have fun playing", ephemeral=True) + + @session_commands.command(name="leave") + @app_commands.check(lambda interaction: interaction.channel_id == settings.coc.channel_id) + async def session_leave(self, interaction: core.InteractionType): + """Leave the current active coc session""" + + if coc_helper.session_message == 0: + return await interaction.response.send_message( + "There is no active coc session right now" "use `/coc session start` to start a coc session", + ephemeral=True, + ) + + if interaction.user.id not in coc_helper.session_users: + return await interaction.response.send_message( + "You aren't in a clash of code session right now.\n" + "If you want to join react to session message or use `/coc session join`", + ephemeral=True, + ) + + coc_helper.session_users.remove(interaction.user.id) + return await interaction.response.send_message( + "You have left the session. No more pings for now", ephemeral=True + ) + + @session_commands.command(name="end") + @app_commands.check(lambda interaction: interaction.channel_id == settings.coc.channel_id) + async def session_end(self, interaction: core.InteractionType): + """Ends the current coc session""" + + if coc_helper.session_message == 0: + return await interaction.response.send_message("There is no active clash of code session", ephemeral=True) + + try: + msg = await interaction.channel.fetch_message(coc_helper.session_message) + try: + await msg.unpin() + except (Forbidden, HTTPException, NotFound): + await interaction.channel.send("Failed to unpin message") + except (Forbidden, HTTPException, NotFound): + await interaction.channel.send("Error while fetching message to unpin") + + self.last_clash = 0 + coc_helper.session_users = [] + coc_helper.session_message = 0 + self.session = False + + return await interaction.response.send_message( + f"Clash session has been closed by {interaction.user.mention}. See you later", + allowed_mentions=discord.AllowedMentions(users=True), + ) + + @app_commands.command(name="invite") + @app_commands.checks.has_any_role( + settings.moderation.staff_role_id, + settings.coc.role_id, + ) + @app_commands.check(lambda interaction: interaction.channel_id == settings.coc.channel_id) + async def coc_invite(self, interaction: core.InteractionType, url: str): + """Mentions all the users with the `Clash Of Code` role that are in the current session.""" + + if coc_helper.session_message == 0: + return await interaction.response.send_message( + "No active Clash of Code session please create one to start playing\n" + "Use `/coc session start` to start a coc session <:smilecat:727592135171244103>", + ephemeral=True, + ) + + if interaction.user.id not in coc_helper.session_users: + return await interaction.response.send_message( + "You can't create a clash unless you participate in the session\n" + "Use `/coc session join` or react to the pinned message to join the coc session " + "<:smilecat:727592135171244103>", + ephemeral=True, + ) + + link = REGEX.fullmatch(url) + if not link: + return await interaction.response.send_message('Could not find any valid "clashofcode" url', ephemeral=True) + + # Defer the response to acknowledge the interaction while doing slow tasks + await interaction.response.defer(ephemeral=True, thinking=True) + + self.last_clash = time.time() + session_id = link[1] + + async with aiohttp.ClientSession() as session: + async with session.post(API_URL, json=[session_id]) as resp: + json = await resp.json() + + plang = json["programmingLanguages"] + pager = commands.Paginator( + prefix="\n".join( + [ + f"**Hey, {interaction.user.mention} is hosting a Clash Of Code game!**", + f"Mode{'s' if len(json['modes']) > 1 else ''}: {', '.join(json['modes'])}", + f"Programming languages: {', '.join(plang) if plang else 'All'}", + f"Join here: {link[0]}", + ] + ), + suffix="", + ) + + for member_id in coc_helper.session_users: + if member_id != interaction.user.id: + member = self.bot.get_user(member_id) + pager.add_line(member.mention + ", ") + + if not len(pager.pages): + return await interaction.followup.send( + "Nobody is online to play with <:pepesad:733816214010331197>", ephemeral=True + ) + + for page in pager.pages: + await interaction.channel.send(page, allowed_mentions=discord.AllowedMentions(users=True)) + + await interaction.delete_original_response() + + async with aiohttp.ClientSession() as session: + while not json["started"]: + await asyncio.sleep(10) # wait 10s to avoid flooding the API + async with session.post(API_URL, json=[session_id]) as resp: + json = await resp.json() + + players = len(json["players"]) + players_text = ", ".join( + [p["codingamerNickname"] for p in sorted(json["players"], key=lambda p: p["position"])] + ) + start_message = await interaction.channel.send(embed=em(json["mode"], players_text)) + + async with aiohttp.ClientSession() as session: + while not json["finished"]: + await asyncio.sleep(10) # wait 10s to avoid flooding the API + async with session.post(API_URL, json=[session_id]) as resp: + json = await resp.json() + + if len(json["players"]) != players: + players_text = ", ".join( + [p["codingamerNickname"] for p in sorted(json["players"], key=lambda p: p["position"])] + ) + await start_message.edit(embed=em(json["mode"], players_text)) + + embed = discord.Embed( + title="**Clash finished**", + description="\n".join( + ["Results:"] + + [ + # Example "1. Takos (Code length: 123, Score 100%, Time 1:09)" + f"{p['rank']}. {p['codingamerNickname']} (" + + (f"Code length: {p['criterion']}, " if json["mode"] == "SHORTEST" else "") + + f"Score: {p['score']}%, Time: {p['duration'] // 60_000}:{p['duration'] // 1000 % 60:02})" + for p in sorted(json["players"], key=lambda p: p["rank"]) + ] + ), + ) + + await interaction.channel.send(embed=embed) + + +async def setup(bot: core.DiscordBot): + await bot.add_cog(ClashOfCode(bot=bot)) diff --git a/bot/extensions/clashofcode/events.py b/bot/extensions/clashofcode/events.py new file mode 100644 index 00000000..f45f2d4d --- /dev/null +++ b/bot/extensions/clashofcode/events.py @@ -0,0 +1,64 @@ +import discord +from discord.ext import commands + +from bot import core +from bot.config import settings +from bot.extensions.clashofcode.utils import coc_helper + + +class ClashOfCodeEvents(commands.Cog): + """Events for clash of code in discord.""" + + def __init__(self, bot: core.DiscordBot): + self.bot = bot + + @property + def role(self): + return self.bot.guild.get_role(settings.coc.role_id) + + @commands.Cog.listener() + async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent): + if payload.user_id == self.bot.user.id: + return + + if coc_helper.session_message != 0: + if payload.message_id == coc_helper.session_message: + if str(payload.emoji) == "🖐️": + if payload.user_id not in coc_helper.session_users: + coc_helper.session_users.append(payload.user_id) + + if payload.message_id != settings.coc.message_id: + return + + if self.role in payload.member.roles: + return + + await payload.member.add_roles(self.role) + try: + await payload.member.send(f"Gave you the **{self.role.name}** role!") + except discord.HTTPException: + pass + + @commands.Cog.listener() + async def on_raw_reaction_remove(self, payload: discord.RawReactionActionEvent): + if payload.user_id == self.bot.user.id: + return + + if coc_helper.session_message != 0: + if payload.message_id == coc_helper.session_message: + if str(payload.emoji) == "🖐️": + if payload.user_id in coc_helper.session_users: + coc_helper.session_users.remove(payload.user_id) + + if payload.message_id != settings.coc.message_id: + return + + member = self.bot.guild.get_member(payload.user_id) + if self.role not in member.roles: + return + + await member.remove_roles(self.role) + try: + await member.send(f"Removed your **{self.role.name}** role!") + except discord.HTTPException: + pass diff --git a/bot/extensions/clashofcode/utils.py b/bot/extensions/clashofcode/utils.py new file mode 100644 index 00000000..82f3afa8 --- /dev/null +++ b/bot/extensions/clashofcode/utils.py @@ -0,0 +1,7 @@ +class ClashOfCodeHelper: + def __init__(self): + self.session_message: int = 0 + self.session_users = [] + + +coc_helper = ClashOfCodeHelper() diff --git a/cli.py b/cli.py index 7908687f..6129e31f 100644 --- a/cli.py +++ b/cli.py @@ -138,8 +138,8 @@ async def main(ctx): "bot.extensions.levelling", "bot.extensions.persistent_roles", "bot.extensions.polls", + "bot.extensions.clashofcode", "bot.cogs._help", - "bot.cogs.clashofcode", "bot.cogs.roles", )