diff --git a/libs/funcs.py b/libs/funcs.py index 56b3acbaccb..f3e81a0a13a 100644 --- a/libs/funcs.py +++ b/libs/funcs.py @@ -20,12 +20,14 @@ from .log import Log logger_Funcs = Log('QD.Http.Funcs').getlogger() + + class pusher(object): - def __init__(self,db:DB,sql_session=None): + def __init__(self, db: DB, sql_session=None): self.db = db self.sql_session = sql_session - async def judge_res(self,res:aiohttp.ClientResponse): + async def judge_res(self, res: aiohttp.ClientResponse): if (res.status == 200): return "True" else: @@ -48,54 +50,79 @@ async def pusher(self, userid, pushsw, flg, title, content): if (notice['noticeflg'] & flg != 0): user = await self.db.user.get(userid, fields=('id', 'email', 'email_verified', 'nickname'), sql_session=sql_session) diypusher = notice['diypusher'] - if (diypusher != ''):diypusher = json.loads(diypusher) - self.barklink = notice['barkurl'] - pusher = {} - pusher["mailpushersw"] = False if (notice['noticeflg'] & 0x80) == 0 else True - pusher["barksw"] = False if (notice['noticeflg'] & 0x40) == 0 else True - pusher["schansw"] = False if (notice['noticeflg'] & 0x20) == 0 else True - pusher["wxpushersw"] = False if (notice['noticeflg'] & 0x10) == 0 else True - pusher["cuspushersw"] = False if (notice['noticeflg'] & 0x100) == 0 else True - pusher["qywxpushersw"] = False if (notice['noticeflg'] & 0x200) == 0 else True - pusher["tgpushersw"] = False if (notice['noticeflg'] & 0x400) == 0 else True - pusher["dingdingpushersw"] = False if (notice['noticeflg'] & 0x800) == 0 else True - pusher["qywxwebhooksw"] = False if (notice['noticeflg'] & 0x1000) == 0 else True - - def nonepush(*args,**kwargs): + if (diypusher != ''): + diypusher = json.loads(diypusher) + self.barklink = notice['barkurl'] + pusher = {} + pusher["mailpushersw"] = False if ( + notice['noticeflg'] & 0x80) == 0 else True + pusher["barksw"] = False if ( + notice['noticeflg'] & 0x40) == 0 else True + pusher["schansw"] = False if ( + notice['noticeflg'] & 0x20) == 0 else True + pusher["wxpushersw"] = False if ( + notice['noticeflg'] & 0x10) == 0 else True + pusher["cuspushersw"] = False if ( + notice['noticeflg'] & 0x100) == 0 else True + pusher["qywxpushersw"] = False if ( + notice['noticeflg'] & 0x200) == 0 else True + pusher["tgpushersw"] = False if ( + notice['noticeflg'] & 0x400) == 0 else True + pusher["dingdingpushersw"] = False if ( + notice['noticeflg'] & 0x800) == 0 else True + pusher["qywxwebhooksw"] = False if ( + notice['noticeflg'] & 0x1000) == 0 else True + + def nonepush(*args, **kwargs): return if (pushsw['pushen']): send2bark = self.send2bark if (pusher["barksw"]) else nonepush send2s = self.send2s if (pusher["schansw"]) else nonepush - send2wxpusher = self.send2wxpusher if (pusher["wxpushersw"]) else nonepush - sendmail = self.sendmail if (pusher["mailpushersw"]) else nonepush - cus_pusher_send = self.cus_pusher_send if (pusher["cuspushersw"]) else nonepush - qywx_pusher_send = self.qywx_pusher_send if (pusher["qywxpushersw"]) else nonepush + send2wxpusher = self.send2wxpusher if ( + pusher["wxpushersw"]) else nonepush + sendmail = self.sendmail if ( + pusher["mailpushersw"]) else nonepush + cus_pusher_send = self.cus_pusher_send if ( + pusher["cuspushersw"]) else nonepush + qywx_pusher_send = self.qywx_pusher_send if ( + pusher["qywxpushersw"]) else nonepush send2tg = self.send2tg if (pusher["tgpushersw"]) else nonepush - send2dingding = self.send2dingding if (pusher["dingdingpushersw"]) else nonepush - qywx_webhook_send = self.qywx_webhook_send if (pusher["qywxwebhooksw"]) else nonepush + send2dingding = self.send2dingding if ( + pusher["dingdingpushersw"]) else nonepush + qywx_webhook_send = self.qywx_webhook_send if ( + pusher["qywxwebhooksw"]) else nonepush await gen.convert_yielded([send2bark(notice['barkurl'], title, content), - send2s(notice['skey'], title, content), - send2wxpusher( notice['wxpusher'], title+u" "+content), - sendmail( user['email'], title, content, sql_session=sql_session), - cus_pusher_send( diypusher, title, content), - qywx_pusher_send( notice['qywx_token'], title, content), - send2tg( notice['tg_token'], title, content), - send2dingding(notice['dingding_token'], title, content), - qywx_webhook_send(notice['qywx_webhook'], title, content) - ]) + send2s(notice['skey'], + title, content), + send2wxpusher( + notice['wxpusher'], title+u" "+content), + sendmail( + user['email'], title, content, sql_session=sql_session), + cus_pusher_send( + diypusher, title, content), + qywx_pusher_send( + notice['qywx_token'], title, content), + send2tg( + notice['tg_token'], title, content), + send2dingding( + notice['dingding_token'], title, content), + qywx_webhook_send( + notice['qywx_webhook'], title, content) + ]) async def send2bark(self, barklink, title, content): r = 'False' try: link = barklink - if (link[-1] != '/'): link=link+'/' - content = content.replace('\\r\\n','\n') - d = {"title":title,"body":content} + if (link[-1] != '/'): + link = link+'/' + content = content.replace('\\r\\n', '\n') + d = {"title": title, "body": content} async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: - async with session.post(link, json=d, verify_ssl=False, timeout=config.request_timeout) as res: - r = await self.judge_res(res) + async with session.post(link, json=d, verify_ssl=False, timeout=config.request_timeout) as res: + r = await self.judge_res(res) except Exception as e: r = traceback.format_exc() @@ -107,8 +134,9 @@ async def send2s(self, skey, title, content): r = 'False' if (skey != ""): try: - link = u"https://sctapi.ftqq.com/{0}.send".format(skey.replace(".send", "")) - content = content.replace('\\r\\n','\n\n') + link = u"https://sctapi.ftqq.com/{0}.send".format( + skey.replace(".send", "")) + content = content.replace('\\r\\n', '\n\n') d = {'text': title, 'desp': content} async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: async with session.post(link, json=d, verify_ssl=False, timeout=config.request_timeout) as res: @@ -135,19 +163,21 @@ async def send2tg(self, tg_token, title, content): try: token = tgToken chat_id = tgUserId - #TG_BOT的token - #token = os.environ.get('TG_TOKEN') - #用户的ID - #chat_id = os.environ.get('TG_USERID') + # TG_BOT的token + # token = os.environ.get('TG_TOKEN') + # 用户的ID + # chat_id = os.environ.get('TG_USERID') if not tgHost: - link = u'https://api.telegram.org/bot{0}/sendMessage'.format(token) + link = u'https://api.telegram.org/bot{0}/sendMessage'.format( + token) else: - if tgHost[-1]!='/': + if tgHost[-1] != '/': tgHost = tgHost + '/' if 'http://' in tgHost or 'https://' in tgHost: - link = u'{0}bot{1}/sendMessage'.format(tgHost,token) + link = u'{0}bot{1}/sendMessage'.format(tgHost, token) else: - link = u'https://{0}bot{1}/sendMessage'.format(tgHost,token) + link = u'https://{0}bot{1}/sendMessage'.format( + tgHost, token) picurl = config.push_pic if pic == '' else pic # 匹配标题"QD[定时]任务 {0}-{1} 成功|失败" 的 {0} 部分, 获取 hashtag @@ -159,8 +189,9 @@ async def send2tg(self, tg_token, title, content): title_sp[1] = '-'.join(title1) title = ' '.join(title_sp) - content = content.replace('\\r\\n','\n
') - d = {'chat_id': str(chat_id), 'text': '' + title + '' + '\n\n' + content + '\n' + '------QD提醒------', 'disable_web_page_preview':'false', 'parse_mode': 'HTML'} + content = content.replace('\\r\\n', '
') + d = {'chat_id': str(chat_id), 'text': '' + title + '' + '\n' + content + '\n' + + '------QD提醒------', 'disable_web_page_preview': 'false', 'parse_mode': 'HTML'} if proxy: async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: async with session.post(link, json=d, verify_ssl=False, proxy=proxy, timeout=config.request_timeout) as res: @@ -183,10 +214,12 @@ async def send2dingding(self, dingding_token, title, content): pic = tmp[1] if len(tmp) >= 2 else '' if (dingding_token != ""): try: - link = u"https://oapi.dingtalk.com/robot/send?access_token={0}".format(dingding_token) + link = u"https://oapi.dingtalk.com/robot/send?access_token={0}".format( + dingding_token) picurl = config.push_pic if pic == '' else pic - content = content.replace('\\r\\n','\n\n > ') - d = {"msgtype": "markdown", "markdown": {"title": title, "text": "![QD](" + picurl + ")\n " + "#### " + title + "\n > " + content}} + content = content.replace('\\r\\n', '\n\n > ') + d = {"msgtype": "markdown", "markdown": { + "title": title, "text": "![QD](" + picurl + ")\n " + "#### " + title + "\n > " + content}} async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: async with session.post(link, json=d, verify_ssl=False, timeout=config.request_timeout) as res: r = await self.judge_res(res) @@ -207,15 +240,15 @@ async def send2wxpusher(self, wxpusher, content): if (wxpusher_token != "") and (wxpusher_uid != ""): try: link = "http://wxpusher.zjiecode.com/api/send/message" - content = content.replace('\\r\\n','\n') + content = content.replace('\\r\\n', '\n') d = { - "appToken":wxpusher_token, - "content":content, - "contentType":3, - "uids":[ - wxpusher_uid - ] - } + "appToken": wxpusher_token, + "content": content, + "contentType": 3, + "uids": [ + wxpusher_uid + ] + } async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: async with session.post(link, json=d, verify_ssl=False, timeout=config.request_timeout) as res: r = await self.judge_res(res) @@ -229,25 +262,26 @@ async def send2wxpusher(self, wxpusher, content): else: return Exception("参数不完整! ") - return r - + return r async def cus_pusher_send(self, diypusher, t, log): r = 'False' try: - log = log.replace('"','\\"').replace('\\\\"','\\"') + log = log.replace('"', '\\"').replace('\\\\"', '\\"') curltmp = diypusher['curl'].format(log=log, t=t) if (diypusher['headers']): - headerstmp = json.loads(diypusher['headers'].replace('{log}', log).replace("{t}", t)) + headerstmp = json.loads(diypusher['headers'].replace( + '{log}', log).replace("{t}", t)) else: headerstmp = {} if (diypusher['mode'] == 'POST'): - postDatatmp = diypusher['postData'].replace('{log}', log).replace("{t}", t) + postDatatmp = diypusher['postData'].replace( + '{log}', log).replace("{t}", t) if headerstmp: - headerstmp.pop('content-type','') - headerstmp.pop('Content-Type','') + headerstmp.pop('content-type', '') + headerstmp.pop('Content-Type', '') if (diypusher['postMethod'] == 'x-www-form-urlencoded'): headerstmp['Content-Type'] = "application/x-www-form-urlencoded; charset=UTF-8" if (postDatatmp != ''): @@ -280,29 +314,32 @@ async def cus_pusher_send(self, diypusher, t, log): return r # 获取Access_Token - async def get_access_token(self, qywx:dict): - access_url = '{qywxProxy}cgi-bin/gettoken?corpid={id}&corpsecret={secret}'.format(qywxProxy=qywx[u'代理'], id=qywx[u'企业ID'], secret=qywx[u'应用密钥']) + async def get_access_token(self, qywx: dict): + access_url = '{qywxProxy}cgi-bin/gettoken?corpid={id}&corpsecret={secret}'.format( + qywxProxy=qywx[u'代理'], id=qywx[u'企业ID'], secret=qywx[u'应用密钥']) async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: async with session.get(access_url, verify_ssl=False, timeout=config.request_timeout) as res: get_access_token_res = await res.json() return get_access_token_res - #上传临时素材,返回素材id - async def get_ShortTimeMedia(self,pic_url,access_token,qywxProxy): + # 上传临时素材,返回素材id + async def get_ShortTimeMedia(self, pic_url, access_token, qywxProxy): async with aiohttp.ClientSession(conn_timeout=config.connect_timeout) as session: if pic_url == config.push_pic: - with open(os.path.join(os.path.abspath(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),'web','static','img','push_pic.png'),'rb') as f: + with open(os.path.join(os.path.abspath(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'web', 'static', 'img', 'push_pic.png'), 'rb') as f: img = f.read() else: async with session.get(pic_url, verify_ssl=False, timeout=config.request_timeout) as res: img = await res.read() - url=f'{qywxProxy}cgi-bin/media/upload?access_token={access_token}&type=image' - async with session.post(url, data={'image':img}, verify_ssl=False, timeout=config.request_timeout) as res: + url = f'{qywxProxy}cgi-bin/media/upload?access_token={access_token}&type=image' + async with session.post(url, data={'image': img}, verify_ssl=False, timeout=config.request_timeout) as res: await self.judge_res(res) _json = await res.json() + if _json.get('errcode', 0) > 0: + raise Exception(_json.get('errmsg', '')) return _json['media_id'] - async def qywx_pusher_send(self, qywx_token, title:str, log:str): + async def qywx_pusher_send(self, qywx_token, title: str, log: str): r = 'False' try: qywx = {} @@ -312,11 +349,12 @@ async def qywx_pusher_send(self, qywx_token, title:str, log:str): qywx[u'应用ID'] = tmp[1] qywx[u'应用密钥'] = tmp[2] qywx[u'图片'] = tmp[3] if len(tmp) >= 4 else '' - qywx[u'代理'] = tmp[4] if len(tmp) >= 5 else 'https://qyapi.weixin.qq.com/' + qywx[u'代理'] = tmp[4] if len( + tmp) >= 5 else 'https://qyapi.weixin.qq.com/' else: raise Exception(u'企业微信Pusher获取AccessToken失败或参数不完整!') - if qywx[u'代理'][-1]!='/': + if qywx[u'代理'][-1] != '/': qywx[u'代理'] = qywx[u'代理'] + '/' if qywx[u'代理'][:4] != 'http': if qywx[u'代理'] == 'qyapi.weixin.qq.com/': @@ -325,13 +363,14 @@ async def qywx_pusher_send(self, qywx_token, title:str, log:str): qywx[u'代理'] = u'http://{0}'.format(qywx[u'代理']) get_access_token_res = await self.get_access_token(qywx) pic_url = config.push_pic if qywx[u'图片'] == '' else qywx[u'图片'] - if (get_access_token_res.get('access_token','') != '' and get_access_token_res['errmsg'] == 'ok'): + if (get_access_token_res.get('access_token', '') != '' and get_access_token_res['errmsg'] == 'ok'): access_token = get_access_token_res["access_token"] if utils.urlMatchWithLimit(pic_url) or utils.domainMatch(pic_url.split('/')[0]): - media_id = await self.get_ShortTimeMedia(pic_url,access_token,qywx[u'代理']) + media_id = await self.get_ShortTimeMedia(pic_url, access_token, qywx[u'代理']) else: media_id = pic_url - msgUrl = '{0}cgi-bin/message/send?access_token={1}'.format(qywx[u'代理'], access_token) + msgUrl = '{0}cgi-bin/message/send?access_token={1}'.format( + qywx[u'代理'], access_token) postData = {"touser": "@all", "toparty": "@all", "totag": "@all", @@ -358,9 +397,9 @@ async def qywx_pusher_send(self, qywx_token, title:str, log:str): async with session.post(msgUrl, json=postData, verify_ssl=False, timeout=config.request_timeout) as res: r = await self.judge_res(res) _json = await res.json() - if _json.get('errmsg','') == 'ok' and _json.get('errcode',0) == 0: + if _json.get('errmsg', '') == 'ok' and _json.get('errcode', 0) == 0: r = 'True' - elif _json.get('errmsg','') != '': + elif _json.get('errmsg', '') != '': raise Exception(_json['errmsg']) else: raise Exception("企业微信Pusher获取AccessToken失败或参数不完整! ") @@ -371,7 +410,7 @@ async def qywx_pusher_send(self, qywx_token, title:str, log:str): return e return r - async def qywx_webhook_send(self, qywx_webhook, title:str, log:str): + async def qywx_webhook_send(self, qywx_webhook, title: str, log: str): r = 'False' try: qywx = {} @@ -383,7 +422,8 @@ async def qywx_webhook_send(self, qywx_webhook, title:str, log:str): log = log.replace("\\r\\n", "\n") - msgUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={0}".format(qywx[u'Webhook']) + msgUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={0}".format( + qywx[u'Webhook']) postData = {"msgtype": "text", "text": { "content": f"{title}\n{log}" @@ -393,9 +433,9 @@ async def qywx_webhook_send(self, qywx_webhook, title:str, log:str): async with session.post(msgUrl, json=postData, verify_ssl=False, timeout=config.request_timeout) as res: r = await self.judge_res(res) _json = await res.json() - if _json.get('errmsg','') == 'ok' and _json.get('errcode',0) == 0: + if _json.get('errmsg', '') == 'ok' and _json.get('errcode', 0) == 0: r = 'True' - elif _json.get('errmsg','') != '': + elif _json.get('errmsg', '') != '': raise Exception(_json['errmsg']) except Exception as e: r = traceback.format_exc() @@ -403,7 +443,7 @@ async def qywx_webhook_send(self, qywx_webhook, title:str, log:str): return e return r - async def sendmail(self, email, title, content:str, sql_session=None): + async def sendmail(self, email, title, content: str, sql_session=None): if not config.domain: r = '请配置框架域名 domain, 以启用邮箱推送功能!' logger_Funcs.error('Send mail error: %s', r) @@ -411,26 +451,30 @@ async def sendmail(self, email, title, content:str, sql_session=None): user = await self.db.user.get(email=email, fields=('id', 'email', 'email_verified', 'nickname'), sql_session=sql_session) if user['email'] and user['email_verified']: try: - content = content.replace('\\r\\n','\n') - await utils.send_mail(to = email, - subject = u"在网站{0} {1}".format(config.domain, title), - text = content, - shark=True) + content = content.replace('\\r\\n', '\n') + await utils.send_mail(to=email, + subject=u"在网站{0} {1}".format( + config.domain, title), + text=content, + shark=True) except Exception as e: logger_Funcs.error('Send mail error: %r', e) + class cal(object): def __init__(self): pass def calNextTs(self, envs): - r = {"r":"True"} + r = {"r": "True"} try: if (envs['mode'] == 'ontime'): t = '{0} {1}'.format(envs['date'], envs['time']) elif (envs['mode'] == 'cron'): - cron = croniter.croniter(envs['cron_val'], datetime.datetime.now()) - t = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M:%S") + cron = croniter.croniter( + envs['cron_val'], datetime.datetime.now()) + t = cron.get_next(datetime.datetime).strftime( + "%Y-%m-%d %H:%M:%S") else: raise Exception(u'参数错误') @@ -449,5 +493,5 @@ def calNextTs(self, envs): r['ts'] = ts except Exception as e: r['r'] = e - logger_Funcs.error('Calculate Next Timestamp error: %s',r['r']) + logger_Funcs.error('Calculate Next Timestamp error: %s', r['r']) return r