Skip to content

Commit

Permalink
Updated code
Browse files Browse the repository at this point in the history
  • Loading branch information
SylteA committed Nov 14, 2023
1 parent f16107d commit 8d0302e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 63 deletions.
14 changes: 8 additions & 6 deletions bot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ def val_func(cls, v):
return json.loads(v)


class Notification(BaseModel):
channel_id: int
role_id: int


class Postgres(BaseModel):
max_pool_connections: int
min_pool_connections: int
Expand Down Expand Up @@ -100,6 +95,13 @@ class CustomRoles(BaseModel):
divider_role_id: int


class YouTube(BaseModel):
channel_id: str

text_channel_id: int
role_id: int


class Settings(BaseSettings):
aoc: AoC
bot: Bot
Expand All @@ -108,13 +110,13 @@ class Settings(BaseSettings):
postgres: Postgres
guild: Guild
moderation: Moderation
notification: Notification # For tim's YouTube channel (currently unused)
reaction_roles: ReactionRoles
tags: Tags
timathon: Timathon
hastebin: Hastebin
errors: ErrorHandling
custom_roles: CustomRoles
youtube: YouTube

class Config:
env_file = ".env"
Expand Down
125 changes: 68 additions & 57 deletions bot/extensions/youtube/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from bot.services import http

YOUTUBE_URL = re.compile(r"(?P<url>https?://www\.youtube\.com/watch\?v=[\w-]+)")
RSS_FEED_BASE_URL = "https://www.youtube.com/feeds/videos.xml"


class Video(BaseModel):
Expand All @@ -27,69 +28,58 @@ class YoutubeTasks(commands.Cog):
def __init__(self, bot: core.DiscordBot):
self.bot = bot
self.video_links: list[str] = []
self.update_thumbnail = []
self.old_thumbnails: list[tuple[discord.Message, str]] = []
self.check_for_new_videos.start()

def cog_unload(self) -> None:
self.check_for_new_videos.cancel()

@property
def channel(self) -> discord.TextChannel | None:
return self.bot.get_channel(settings.notification.channel_id)
return self.bot.get_channel(settings.youtube.text_channel_id)

async def send_notification(self, video: Video) -> None:
update_thumbnail = False
max_res = video.thumbnail.replace("/hqdefault.jpg", "/maxresdefault.jpg")
async with http.session.head(max_res) as response:
if response.status == 200:
video.thumbnail = max_res
else:
update_thumbnail = True
video.thumbnail = video.thumbnail.replace("/hqdefault.jpg", "/mqdefault.jpg")
@tasks.loop(minutes=2)
async def check_for_new_videos(self):
"""Checks old thumbnails for higher resolution images and looks for new videos"""
await self.check_old_thumbnails()
await self.find_new_videos()

embed = discord.Embed(
title=video.title,
description=video.description.split("\n\n")[0],
url=video.link,
color=discord.Color.red(),
timestamp=datetime.strptime(video.published, "%Y-%m-%dT%H:%M:%S%z"),
)
embed.set_image(url=video.thumbnail)
embed.set_author(
name="Tech With Tim",
url="https://www.youtube.com/c/TechWithTim",
icon_url=self.bot.user.display_avatar.url,
)
embed.set_footer(text="Uploaded", icon_url=self.bot.user.display_avatar.url)
@check_for_new_videos.before_loop
async def before_check(self):
"""Fetches the 10 last videos posted so we don't accidentally re-post it."""
if self.video_links:
return

message = await self.channel.send(
content=f"Hey <@&{settings.notification.role_id}>, **Tim** just posted a video! Go check it out!",
embed=embed,
allowed_mentions=discord.AllowedMentions(roles=True),
)
async for message in self.channel.history(limit=10):
if message.embeds:
embed = message.embeds[0]
self.video_links.append(embed.url)
if embed.image.url.endswith("/mqdefault.jpg"):
self.old_thumbnails.append((message, embed.image.url))

if update_thumbnail:
self.update_thumbnail.append((message.id, video.thumbnail))
else:
match = YOUTUBE_URL.search(message.content)
if match:
self.video_links.append(match.group("url"))

async def check_old_thumbnails(self):
for message_id, thumbnail_url in self.update_thumbnail:
async with http.session.head(thumbnail_url.replace("/mqdefault.jpg", "/maxresdefault.jpg")) as response:
if response.status == 404:
continue
"""Tries to fetch new thumbnails for any videos we've posted with a low resolution thumbnail."""
for message, thumbnail_url in self.old_thumbnails:
max_resolution = thumbnail_url.replace("/mqdefault.jpg", "/maxresdefault.jpg")

if not await self.image_exists(max_resolution):
continue

message = await self.channel.fetch_message(message_id)
embed = message.embeds[0]
embed.set_image(url=thumbnail_url)
await message.edit(embed=embed)
self.update_thumbnail.remove((message_id, thumbnail_url))

@tasks.loop(minutes=2)
async def check_for_new_videos(self):
"""Check for new videos"""
self.old_thumbnails.remove((message, thumbnail_url))

await self.check_old_thumbnails()
async def find_new_videos(self):
"""Fetches most recent videos from rss feed and publishes any new videos."""
url = RSS_FEED_BASE_URL + "?channel_id=" + settings.youtube.channel_id

url = "https://www.youtube.com/feeds/videos.xml?channel_id=UC4JX40jDee_tINbkjycV4Sg"
async with http.session.get(url) as response:
data = await response.text()
tree = ET.fromstring(data)
Expand All @@ -112,18 +102,39 @@ async def check_for_new_videos(self):
self.video_links.append(video.link)
await self.send_notification(video)

@check_for_new_videos.before_loop
async def before_check(self):
if self.video_links:
return
async def send_notification(self, video: Video) -> None:
"""Sends an embed to discord with the new video."""
max_resolution = video.thumbnail.replace("/mqdefault.jpg", "/maxresdefault.jpg")
use_max_resolution = await self.image_exists(max_resolution)

async for message in self.channel.history(limit=10):
if message.embeds:
embed = message.embeds[0]
self.video_links.append(embed.url)
if embed.image.url.endswith("/mqdefault.jpg"):
self.update_thumbnail.append((message.id, embed.image.url))
else:
match = YOUTUBE_URL.search(message.content)
if match:
self.video_links.append(match.group("url"))
if use_max_resolution:
video.thumbnail = max_resolution

embed = discord.Embed(
title=video.title,
description=video.description.split("\n\n")[0],
url=video.link,
color=discord.Color.red(),
timestamp=datetime.strptime(video.published, "%Y-%m-%dT%H:%M:%S%z"),
)
embed.set_image(url=video.thumbnail)
embed.set_author(
name="Tech With Tim",
url="https://www.youtube.com/c/TechWithTim",
icon_url=self.bot.user.display_avatar.url,
)
embed.set_footer(text="Uploaded", icon_url=self.bot.user.display_avatar.url)

message = await self.channel.send(
content=f"Hey <@&{settings.youtube.role_id}>, **Tim** just posted a video! Go check it out!",
embed=embed,
allowed_mentions=discord.AllowedMentions(roles=True),
)

if not use_max_resolution:
self.old_thumbnails.append((message, video.thumbnail))

@staticmethod
async def image_exists(url: str):
async with http.session.head(url) as response:
return response.status == 200

0 comments on commit 8d0302e

Please sign in to comment.