-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
229 lines (197 loc) · 8.95 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# *-* coding:utf-8 *-*
'''
Author: WrunDorry
Date: 2024/12/21
Description: qbots-webhook-to-websocket
Licence: AGPL-v3
'''
# 导入所需模块
from fastapi import FastAPI, Request, Header, WebSocket, WebSocketDisconnect, Form, HTTPException
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import sys
from binascii import unhexlify
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.backends import default_backend
import logging
import asyncio
import sqlite3
from starlette.responses import Response
ADMIN_PWD="xxxxx" # 后台管理员密码
ADMIN_ENTER="/admin" # 后台入口路径,格式:/xxx
app = FastAPI()
# 配置跨域设置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 配置模板引擎
templates = Jinja2Templates(directory="templates")
# 用于储存 WebSocket 连接对象的字典
active_connections = {}
# 连接到 SQLite 数据库(如果数据库不存在,则会自动创建)
conn = sqlite3.connect('database.db', check_same_thread=False)
cursor = conn.cursor()
# 创建表(如果表不存在)
cursor.execute("""
CREATE TABLE IF NOT EXISTS secrets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
secret TEXT NOT NULL UNIQUE
)
""")
conn.commit()
# 配置签名计算函数
def generate_signature(bot_secret, event_ts, plain_token):
while len(bot_secret) < 32: # 生成 32 字节的 seed
bot_secret = (bot_secret + bot_secret)[:32]
# 生成私钥
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(bot_secret.encode())
public_key = private_key.public_key()
# 编码
message = f"{event_ts}{plain_token}".encode()
# 计算签名并 16 进制 hex 编码
signature = private_key.sign(message).hex()
return {
"plain_token": plain_token,
"signature": signature
} # 返回签名给 QQ 开放平台
# 定义 Payload 模型
class Payload(BaseModel):
d: dict # Payload 内容
# 配置日志
logging.basicConfig(level=logging.INFO)
def is_secret_valid(secret):
cursor.execute("SELECT * FROM secrets WHERE secret = ?", (secret,))
return cursor.fetchone() is not None
def is_admin(password, secret):
admin_password = ADMIN_PWD
if password == admin_password:
print("密码正确")
else:
return False
cursor.execute("SELECT * FROM secrets WHERE secret = ?", (secret,))
row = cursor.fetchone()
if row :
return True
return False
@app.post("/webhook") # 接收 QQ 开放平台的 webhook 请求
async def handle_webhook(
request: Request,
payload: Payload,
user_agent: str = Header(None),
x_bot_appid: str = Header(None),
secret: str = None # 通过URL查询参数获取secret
):
secret = request.query_params.get('secret') # 从请求的URL中获取secret
if not secret:
logging.error("没有提供 secret 参数")
return {"msg": "error"}
if not is_secret_valid(secret):
logging.error("无效的 secret: %s", secret)
return {"msg": "error"}
body_bytes = await request.body() # 获取请求体
logging.info("收到消息 %s", body_bytes)
body_str = body_bytes.decode('utf-8') # 解码为字符串
if "event_ts" not in payload.d or "plain_token" not in payload.d: # 判断是不是第一次配置回调地址
logging.info("消息事件") # 如果不是第一次配置回调地址,则忽略
else: # 如果是,那么开始计算签名
logging.info("回调地址配置事件")
try: # 套一层 Try-Except 防止签名计算失败
event_ts = payload.d["event_ts"] # 获取事件时间戳
plain_token = payload.d["plain_token"] # 获取 需要计算的 token
# 使用传入的secret计算签名
result = generate_signature(secret, event_ts, plain_token) # 调用函数,计算签名
logging.info("签名计算成功: %s", result)
return result # 返回签名给 QQ 开放平台
except KeyError as e:
logging.error("签名计算时发生了错误: %s", e)
pass
# 这里是处理普通消息事件的部分
if secret in active_connections: # 判断推送的消息所对应的 secret 的 websocket 是否连接
logging.info("即将把消息推送给 ws: %s", secret)
for connection in active_connections[secret]:
await connection.send_text(body_str) # 推送消息给所有与此 secret 连接的 websocket
return {"message": "Data pushed to WebSocket"}
else: # 如果没有连接,则返回错误信息
logging.warning("对应 secret 的 ws 没有被连接: %s", secret)
return {"message": "No active WebSocket connection found for secret"}
@app.websocket("/ws/{secret}") # 建立 WebSocket 服务端
async def websocket_endpoint(websocket: WebSocket, secret: str):
if not is_secret_valid(secret):
logging.error("无效的 secret: %s", secret)
await websocket.close(code=1008, reason="Invalid secret")
return
await websocket.accept() # 接受 WebSocket 连接请求
if secret not in active_connections:
active_connections[secret] = []
active_connections[secret].append(websocket) # 将当前连接存储到 active_connections 字典里的列表中
try:
while True:
data = await websocket.receive_text() # 获取客户端 push 过来的消息
logging.info("收到来自 ws 的消息: %s", data)
except WebSocketDisconnect:
logging.info(f"{secret} 的 WebSocket 连接断开.")
active_connections[secret].remove(websocket) # 当连接断开时,从列表中移除
if not active_connections[secret]: # 如果列表为空,移除该 key
del active_connections[secret]
@app.get(ADMIN_ENTER, response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post(ADMIN_ENTER+"/login")
async def login(response: Response, password: str = Form(...), secret: str = Form(...)):
if not is_admin(password, secret):
logging.error("登录失败: 无效的密码或 secret")
return {"msg": "error"+secret+""}
response = RedirectResponse(url="/manage", status_code=303)
response.set_cookie(key="admin_secret", value=secret)
response.set_cookie(key="admin_password", value=password)
return response
@app.get(ADMIN_ENTER+"/manage", response_class=HTMLResponse)
async def manage_secrets(request: Request):
admin_secret = request.cookies.get("admin_secret")
admin_password=request.cookies.get("admin_password")
if not admin_secret or not is_admin(admin_password, admin_secret):
logging.error("未授权访问管理页面")
return RedirectResponse(url="/", status_code=303)
cursor.execute("SELECT * FROM secrets")
secrets = cursor.fetchall()
return templates.TemplateResponse("manage.html", {"request": request, "secrets": secrets})
@app.post(ADMIN_ENTER+"/create_secret")
async def create_secret(request: Request,secret: str = Form(...)):
admin_secret = request.cookies.get("admin_secret") # 使用传入的 secret 作为管理员 secret
admin_password=request.cookies.get("admin_password") # 使用传入的 password 作为管理员密码
if not is_admin(admin_password, admin_secret):
logging.error("未授权创建 secret")
return {"msg": "error"}
try:
cursor.execute("INSERT INTO secrets (secret) VALUES (?)", (secret,))
conn.commit()
logging.info("成功创建 secret: %s", secret)
return {"msg": "success"}
except sqlite3.IntegrityError:
logging.error("secret 已经存在: %s", secret)
return {"msg": "error"}
@app.post(ADMIN_ENTER+"/delete_secret")
async def delete_secret(request: Request,secret: str = Form(...)):
admin_secret = secret # 使用传入的 secret 作为管理员 secret
admin_password=request.cookies.get("admin_password") # 使用传入的 password 作为管理员密码
if not is_admin(admin_password, admin_secret):
logging.error("未授权删除 secret")
return {"msg": "error"}
try:
cursor.execute("DELETE FROM secrets WHERE secret = ?", (secret,))
conn.commit()
logging.info("成功删除 secret: %s", secret)
return {"msg": "success"}
except Exception as e:
logging.error("删除 secret 时发生错误: %s", e)
return {"msg": "error"}
# 启动服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) # 端口 8000 监听所有 IP