-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysql.py
82 lines (67 loc) · 2.3 KB
/
mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from __future__ import annotations
import dataclasses
import asyncmy
from lib import config
from lib.debezium import ChiiNotifyField
@dataclasses.dataclass
class User:
uid: int
username: str
nickname: str
async def create_mysql_client() -> MySql:
db = MySql(
pool=await asyncmy.create_pool(
host=config.MYSQL_DSN.host,
port=config.MYSQL_DSN.port,
user=config.MYSQL_DSN.user,
password=config.MYSQL_DSN.password,
db=config.MYSQL_DSN.path.lstrip("/"),
autocommit=False,
pool_recycle=3600 * 7,
),
)
return db
class MySql:
__pool: asyncmy.Pool
def __init__(self, pool: asyncmy.Pool):
self.__pool = pool
self.pool = pool
async def get_notify_field(self, ntf_id: int) -> ChiiNotifyField:
conn: asyncmy.Connection
async with self.__pool.acquire() as conn, conn.cursor() as cur:
await cur.execute(
"SELECT ntf_id,ntf_hash,ntf_rid,ntf_title from chii_notify_field where ntf_id = %s",
ntf_id,
)
r = await cur.fetchone()
return ChiiNotifyField(
ntf_id=r[0],
ntf_hash=r[1],
ntf_rid=r[2],
ntf_title=r[3],
)
async def get_user(self, uid: int) -> User:
conn: asyncmy.Connection
async with self.__pool.acquire() as conn, conn.cursor() as cur:
await cur.execute(
"SELECT uid, username, nickname from chii_members where uid = %s",
uid,
)
uid, username, nickname = await cur.fetchone()
return User(uid=uid, username=username, nickname=nickname)
async def get_blocklist(self, uid: int) -> set[int]:
conn: asyncmy.Connection
async with self.__pool.acquire() as conn, conn.cursor() as cur:
await cur.execute(
"SELECT blocklist from chii_memberfields where uid = %s",
uid,
)
row = await cur.fetchone()
if not row:
return set()
blocklist: str = row[0]
s = set()
for x in [v.strip() for v in blocklist.split(",")]:
if x:
s.add(x)
return s