-
-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
ab93f41
commit 6acbc0e
Showing
11 changed files
with
321 additions
and
3 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
17 changes: 17 additions & 0 deletions
17
src/nonebot_plugin_alconna/uniseg/adapters/mail/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from nonebot_plugin_alconna.uniseg.loader import BaseLoader | ||
from nonebot_plugin_alconna.uniseg.constraint import SupportAdapter | ||
|
||
|
||
class Loader(BaseLoader): | ||
def get_adapter(self) -> SupportAdapter: | ||
return SupportAdapter.mail | ||
|
||
def get_builder(self): | ||
from .builder import MailMessageBuilder | ||
|
||
return MailMessageBuilder() | ||
|
||
def get_exporter(self): | ||
from .exporter import MailMessageExporter | ||
|
||
return MailMessageExporter() |
53 changes: 53 additions & 0 deletions
53
src/nonebot_plugin_alconna/uniseg/adapters/mail/builder.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from typing import TYPE_CHECKING | ||
|
||
from nonebot.adapters import Bot, Event | ||
from nonebot.adapters.mail.event import NewMailMessageEvent | ||
from nonebot.adapters.mail.message import Html, Attachment, MessageSegment | ||
|
||
from nonebot_plugin_alconna.uniseg.constraint import SupportAdapter | ||
from nonebot_plugin_alconna.uniseg.builder import MessageBuilder, build | ||
from nonebot_plugin_alconna.uniseg.segment import File, Text, Audio, Image, Reply, Video | ||
|
||
|
||
class MailMessageBuilder(MessageBuilder[MessageSegment]): | ||
@classmethod | ||
def get_adapter(cls) -> SupportAdapter: | ||
return SupportAdapter.mail | ||
|
||
@build("html") | ||
def html(self, seg: Html): | ||
return Text(seg.data["html"]).mark(0, len(seg.data["html"]), "html") | ||
|
||
@build("attachment") | ||
def attachment(self, seg: Attachment): | ||
mtype = seg.data["content_type"] | ||
if mtype and mtype.startswith("image"): | ||
return Image( | ||
raw=seg.data["data"], | ||
mimetype=mtype, | ||
name=seg.data["name"], | ||
) | ||
if mtype and mtype.startswith("audio"): | ||
return Audio( | ||
raw=seg.data["data"], | ||
mimetype=mtype, | ||
name=seg.data["name"], | ||
) | ||
if mtype and mtype.startswith("video"): | ||
return Video( | ||
raw=seg.data["data"], | ||
mimetype=mtype, | ||
name=seg.data["name"], | ||
) | ||
return File( | ||
raw=seg.data["data"], | ||
mimetype=seg.data["content_type"], | ||
name=seg.data["name"], | ||
) | ||
|
||
async def extract_reply(self, event: Event, bot: Bot): | ||
if TYPE_CHECKING: | ||
assert isinstance(event, NewMailMessageEvent) | ||
|
||
if event.reply: | ||
return Reply(event.reply.id, msg=event.reply.message, origin=event.reply) |
98 changes: 98 additions & 0 deletions
98
src/nonebot_plugin_alconna/uniseg/adapters/mail/exporter.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
from pathlib import Path | ||
from typing import Union | ||
|
||
from tarina import lang | ||
from nonebot.adapters import Bot, Event | ||
from nonebot.internal.driver import Request | ||
from nonebot.adapters.mail import Bot as MailBot | ||
from nonebot.adapters.mail.event import NewMailMessageEvent | ||
from nonebot.adapters.mail.message import Message, MessageSegment | ||
|
||
from nonebot_plugin_alconna.uniseg.constraint import SupportScope, SerializeFailed | ||
from nonebot_plugin_alconna.uniseg.exporter import Target, SupportAdapter, MessageExporter, export | ||
from nonebot_plugin_alconna.uniseg.segment import At, File, Text, Audio, Image, Reply, Video, Voice | ||
|
||
|
||
class MailMessageExporter(MessageExporter[Message]): | ||
def get_message_type(self): | ||
return Message | ||
|
||
@classmethod | ||
def get_adapter(cls) -> SupportAdapter: | ||
return SupportAdapter.mail | ||
|
||
def get_target(self, event: Event, bot: Union[Bot, None] = None) -> Target: | ||
assert isinstance(event, NewMailMessageEvent) | ||
return Target( | ||
event.get_user_id(), | ||
private=True, | ||
adapter=self.get_adapter(), | ||
self_id=bot.self_id if bot else None, | ||
scope=SupportScope.mail, | ||
) | ||
|
||
def get_message_id(self, event: Event) -> str: | ||
assert isinstance(event, NewMailMessageEvent) | ||
return event.id | ||
|
||
@export | ||
async def text(self, seg: Text, bot: Union[Bot, None]) -> "MessageSegment": | ||
if not seg.styles: | ||
return MessageSegment.text(seg.text) | ||
style = seg.extract_most_style() | ||
if style == "link": | ||
if not getattr(seg, "_children", []): | ||
return MessageSegment.html(f'<a href="{seg.text}">{seg.text}</a>') | ||
else: | ||
return MessageSegment.html(f'<a href="{seg.text}">{seg._children[0].text}</a>') # type: ignore | ||
return MessageSegment.html(str(seg)) | ||
|
||
@export | ||
async def at(self, seg: At, bot: Union[Bot, None]) -> "MessageSegment": | ||
if seg.flag == "user": | ||
return MessageSegment.html(f'<a href="mailto:{seg.target}">@{seg.target}</a>') | ||
elif seg.flag == "channel": | ||
return MessageSegment.html(f" #{seg.target}") | ||
else: | ||
raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type="at", seg=seg)) | ||
|
||
@export | ||
async def media(self, seg: Union[Image, Voice, Video, Audio, File], bot: Union[Bot, None]) -> "MessageSegment": | ||
name = seg.__class__.__name__.lower() | ||
|
||
if seg.raw and (seg.id or seg.name): | ||
return MessageSegment.attachment(seg.raw, seg.id or seg.name, seg.mimetype) | ||
elif seg.path: | ||
path = Path(seg.path) | ||
return MessageSegment.attachment(path, path.name) | ||
elif bot and seg.url: | ||
if name == "image": | ||
return MessageSegment.html(f'<img src="{seg.url}" />') | ||
elif name == "video": | ||
return MessageSegment.html(f'<video src="{seg.url}" controls />') | ||
elif name in ["audio", "voice"]: | ||
return MessageSegment.html(f'<audio src="{seg.url}" controls />') | ||
resp = await bot.adapter.request(Request("GET", seg.url)) | ||
return MessageSegment.attachment( | ||
resp.content, # type: ignore | ||
seg.id or seg.name or seg.url.split("/")[-1], | ||
) | ||
else: | ||
raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type=name, seg=seg)) | ||
|
||
@export | ||
async def reply(self, seg: Reply, bot: Union[Bot, None]) -> "MessageSegment": | ||
return MessageSegment("mail:reply", {"message_id": seg.id}) # type: ignore | ||
|
||
async def send_to(self, target: Union[Target, Event], bot: Bot, message: Message, **kwargs): | ||
assert isinstance(bot, MailBot) | ||
|
||
in_reply_to = None | ||
if message.has("$mail:reply"): | ||
reply = message["mail:reply", 0] | ||
message = message.exclude("mail:reply") | ||
in_reply_to = reply.data["message_id"] | ||
|
||
if isinstance(target, Event): | ||
return await bot.send(target, message, in_reply_to=in_reply_to, **kwargs) # type: ignore | ||
return await bot.send_to(recipient=target.id, message=message, in_reply_to=in_reply_to, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from typing import TYPE_CHECKING, Union | ||
|
||
from nonebot.adapters import Bot | ||
from nonebot.adapters.mail.bot import Bot as MailBot | ||
|
||
from nonebot_plugin_alconna.uniseg.target import Target, TargetFetcher | ||
from nonebot_plugin_alconna.uniseg.constraint import SupportScope, SupportAdapter | ||
|
||
|
||
class MailTargetFetcher(TargetFetcher): | ||
@classmethod | ||
def get_adapter(cls) -> SupportAdapter: | ||
return SupportAdapter.mail | ||
|
||
async def fetch(self, bot: Bot, target: Union[Target, None] = None): | ||
if TYPE_CHECKING: | ||
assert isinstance(bot, MailBot) | ||
if target and not target.private: | ||
return | ||
for uid in await bot.get_unseen_uids(): | ||
yield Target(uid, private=True, adapter=self.get_adapter(), self_id=bot.self_id, scope=SupportScope.mail) |
Oops, something went wrong.