From db97b676f1eac9dd36fb4f0239fa02f1ad617adb Mon Sep 17 00:00:00 2001 From: a76yyyy Date: Sat, 19 Aug 2023 22:48:17 +0800 Subject: [PATCH] =?UTF-8?q?Fix=20qd-today/qd#462:=20=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=A8=A1=E6=9D=BF=E5=AF=BC=E8=87=B4=E6=97=A0?= =?UTF-8?q?=E9=99=90=E6=8E=A8=E9=80=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- worker.py | 152 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 93 insertions(+), 59 deletions(-) diff --git a/worker.py b/worker.py index 23b848ab704..9d3098b9987 100644 --- a/worker.py +++ b/worker.py @@ -23,15 +23,16 @@ logger_Worker = Log('QD.Worker').getlogger() + class BaseWorker(object): - def __init__(self, db:DB): + def __init__(self, db: DB): self.running = False self.db = db self.fetcher = Fetcher() async def ClearLog(self, taskid, sql_session=None): logDay = int((await self.db.site.get(1, fields=('logDay',), sql_session=sql_session))['logDay']) - for log in await self.db.tasklog.list(taskid = taskid, fields=('id', 'ctime'), sql_session=sql_session): + for log in await self.db.tasklog.list(taskid=taskid, fields=('id', 'ctime'), sql_session=sql_session): if (time.time() - log['ctime']) > (logDay * 24 * 60 * 60): await self.db.tasklog.delete(log['id'], sql_session=sql_session) @@ -44,11 +45,13 @@ async def push_batch(self): for user in userlist: userid = user['id'] push_batch = json.loads(user['push_batch']) - if user['status'] == "Enable" and push_batch["sw"] and isinstance(push_batch['time'],(float,int)) and time.time() >= push_batch['time']: - logger_Worker.debug('User %d check push_batch task, waiting...' % userid) + if user['status'] == "Enable" and push_batch["sw"] and isinstance(push_batch['time'], (float, int)) and time.time() >= push_batch['time']: + logger_Worker.debug( + 'User %d check push_batch task, waiting...' % userid) title = u"QD任务日志定期推送" delta = push_batch.get("delta", 86400) - logtemp = "{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(push_batch['time']))) + logtemp = "{}".format(time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(push_batch['time']))) tmpdict = {} tmp = "" numlog = 0 @@ -60,28 +63,37 @@ async def push_batch(self): tasklog_list = await self.db.tasklog.list(taskid=task["id"], fields=('success', 'ctime', 'msg'), sql_session=sql_session) for log in tasklog_list: if (push_batch['time'] - delta) < log['ctime'] <= push_batch['time']: - tmp0 += "\\r\\n时间: {}\\r\\n日志: {}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(log['ctime'])), log['msg']) + tmp0 += "\\r\\n时间: {}\\r\\n日志: {}".format(time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(log['ctime'])), log['msg']) numlog += 1 - tmplist = tmpdict.get(task['tplid'],[]) + tmplist = tmpdict.get(task['tplid'], []) if tmp0: - tmplist.append("\\r\\n-----任务{0}-{1}-----{2}\\r\\n".format(len(tmplist)+1, task['note'], tmp0)) + tmplist.append( + "\\r\\n-----任务{0}-{1}-----{2}\\r\\n".format(len(tmplist)+1, task['note'], tmp0)) else: - tmplist.append("\\r\\n-----任务{0}-{1}-----\\r\\n记录期间未执行定时任务,请检查任务! \\r\\n".format(len(tmplist)+1, task['note'])) + tmplist.append( + "\\r\\n-----任务{0}-{1}-----\\r\\n记录期间未执行定时任务,请检查任务! \\r\\n".format(len(tmplist)+1, task['note'])) tmpdict[task['tplid']] = tmplist for tmpkey in tmpdict: - tmp = "\\r\\n\\r\\n=====QD: {0}=====".format((await self.db.tpl.get(tmpkey, fields=('sitename',), sql_session=sql_session))['sitename']) - tmp += ''.join(tmpdict[tmpkey]) - logtemp += tmp + tmp_sitename = await self.db.tpl.get(tmpkey, fields=('sitename',), sql_session=sql_session) + if tmp_sitename: + tmp = "\\r\\n\\r\\n=====QD: {0}=====".format( + tmp_sitename['sitename']) + tmp += ''.join(tmpdict[tmpkey]) + logtemp += tmp push_batch["time"] = push_batch['time'] + delta await self.db.user.mod(userid, push_batch=json.dumps(push_batch), sql_session=sql_session) if tmp and numlog: - user_email = user.get('email','Unkown') - logger_Worker.debug("Start push batch log for user {}, email:{}".format(userid,user_email)) - await pushtool.pusher(userid, {"pushen": bool(push_batch.get("sw",False))}, 4080, title, logtemp) - logger_Worker.info("Success push batch log for user {}, email:{}".format(userid,user_email)) + user_email = user.get('email', 'Unkown') + logger_Worker.debug( + "Start push batch log for user {}, email:{}".format(userid, user_email)) + await pushtool.pusher(userid, {"pushen": bool(push_batch.get("sw", False))}, 4080, title, logtemp) + logger_Worker.info( + "Success push batch log for user {}, email:{}".format(userid, user_email)) else: - logger_Worker.debug('User %d does not need to perform push_batch task, stop.' % userid) + logger_Worker.debug( + 'User %d does not need to perform push_batch task, stop.' % userid) except Exception as e: if config.traceback_print: traceback.print_exc() @@ -172,9 +184,9 @@ async def do(self, task): try: fetch_tpl = await self.db.user.decrypt(0 if not tpl['userid'] else task['userid'], tpl['tpl'], sql_session=sql_session) env = dict( - variables = await self.db.user.decrypt(task['userid'], task['init_env'], sql_session=sql_session), - session = [], - ) + variables=await self.db.user.decrypt(task['userid'], task['init_env'], sql_session=sql_session), + session=[], + ) url = parse_url(env['variables'].get('_proxy')) if not url: @@ -191,55 +203,65 @@ async def do(self, task): variables = await self.db.user.encrypt(task['userid'], new_env['variables'], sql_session=sql_session) session = await self.db.user.encrypt(task['userid'], - new_env['session'].to_json() if hasattr(new_env['session'], 'to_json') else new_env['session'], sql_session=sql_session) + new_env['session'].to_json() if hasattr(new_env['session'], 'to_json') else new_env['session'], sql_session=sql_session) if (newontime['sw']): if ('mode' not in newontime): newontime['mode'] = 'ontime' if (newontime['mode'] == 'ontime'): - newontime['date'] = (datetime.datetime.now()+datetime.timedelta(days=1)).strftime("%Y-%m-%d") + newontime['date'] = (datetime.datetime.now( + )+datetime.timedelta(days=1)).strftime("%Y-%m-%d") next = caltool.calNextTs(newontime)['ts'] else: - next = time.time() + max((tpl['interval'] if tpl['interval'] else 24 * 60 * 60), 1*60) + next = time.time() + \ + max((tpl['interval'] if tpl['interval'] + else 24 * 60 * 60), 1*60) if tpl['interval'] is None: next = self.fix_next_time(next) # success feedback await self.db.tasklog.add(task['id'], success=True, msg=new_env['variables'].get('__log__'), sql_session=sql_session) await self.db.task.mod(task['id'], - last_success=time.time(), - last_failed_count=0, - success_count=task['success_count']+1, - env=variables, - session=session, - mtime=time.time(), - next=next, - sql_session=sql_session) + last_success=time.time(), + last_failed_count=0, + success_count=task['success_count']+1, + env=variables, + session=session, + mtime=time.time(), + next=next, + sql_session=sql_session) await self.db.tpl.incr_success(tpl['id'], sql_session=sql_session) t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') - title = u"QD定时任务 {0}-{1} 成功".format(tpl['sitename'], task['note']) + title = u"QD定时任务 {0}-{1} 成功".format( + tpl['sitename'], task['note']) logtemp = new_env['variables'].get('__log__') logtemp = u"{0} \\r\\n日志:{1}".format(t, logtemp) await pushtool.pusher(user['id'], pushsw, 0x2, title, logtemp) - logger_Worker.info('taskid:%d tplid:%d successed! %.5fs', task['id'], task['tplid'], time.perf_counter()-start) + logger_Worker.info('taskid:%d tplid:%d successed! %.5fs', + task['id'], task['tplid'], time.perf_counter()-start) # delete log - await self.ClearLog(task['id'],sql_session=sql_session) - logger_Worker.info('taskid:%d tplid:%d clear log.', task['id'], task['tplid']) + await self.ClearLog(task['id'], sql_session=sql_session) + logger_Worker.info( + 'taskid:%d tplid:%d clear log.', task['id'], task['tplid']) except Exception as e: # failed feedback if config.traceback_print: traceback.print_exc() - next_time_delta = self.failed_count_to_time(task['last_failed_count'], task['retry_count'], task['retry_interval'], tpl['interval']) + next_time_delta = self.failed_count_to_time( + task['last_failed_count'], task['retry_count'], task['retry_interval'], tpl['interval']) t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') - title = u"QD定时任务 {0}-{1} 失败".format(tpl['sitename'], task['note']) + title = u"QD定时任务 {0}-{1} 失败".format( + tpl['sitename'], task['note']) content = u"{0} \\r\\n日志:{1}".format(t, str(e)) disabled = False if next_time_delta: next = time.time() + next_time_delta - content = content + u" \\r\\n下次运行时间:{0}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(next))) + content = content + \ + u" \\r\\n下次运行时间:{0}".format(time.strftime( + "%Y-%m-%d %H:%M:%S", time.localtime(next))) if (logtime['ErrTolerateCnt'] <= task['last_failed_count']): await pushtool.pusher(user['id'], pushsw, 0x1, title, content) else: @@ -250,21 +272,23 @@ async def do(self, task): await self.db.tasklog.add(task['id'], success=False, msg=str(e), sql_session=sql_session) await self.db.task.mod(task['id'], - last_failed=time.time(), - failed_count=task['failed_count']+1, - last_failed_count=task['last_failed_count']+1, - disabled = disabled, - mtime = time.time(), - next=next, - sql_session=sql_session) + last_failed=time.time(), + failed_count=task['failed_count']+1, + last_failed_count=task['last_failed_count']+1, + disabled=disabled, + mtime=time.time(), + next=next, + sql_session=sql_session) await self.db.tpl.incr_failed(tpl['id'], sql_session=sql_session) - logger_Worker.error('taskid:%d tplid:%d failed! %.4fs \r\n%s', task['id'], task['tplid'], time.perf_counter()-start, str(e).replace('\\r\\n','\r\n')) + logger_Worker.error('taskid:%d tplid:%d failed! %.4fs \r\n%s', task['id'], task['tplid'], time.perf_counter( + )-start, str(e).replace('\\r\\n', '\r\n')) return False return True + class QueueWorker(BaseWorker): - def __init__(self, db:DB): + def __init__(self, db: DB): logger_Worker.info('Queue Worker start...') self.queue = asyncio.Queue(maxsize=config.queue_num) self.task_lock = {} @@ -279,26 +303,29 @@ async def __call__(self): asyncio.create_task(self.runner(i)) while True: - sleep = asyncio.sleep( config.push_batch_delta ) + sleep = asyncio.sleep(config.push_batch_delta) if self.success or self.failed: - logger_Worker.info('Last %d seconds, %d task done. %d success, %d failed' % (config.push_batch_delta, self.success+self.failed, self.success, self.failed)) + logger_Worker.info('Last %d seconds, %d task done. %d success, %d failed' % ( + config.push_batch_delta, self.success+self.failed, self.success, self.failed)) self.success = 0 self.failed = 0 if config.push_batch_sw: await self.push_batch() await sleep - async def runner(self,id): + async def runner(self, id): logger_Worker.debug('Runner %d started' % id) while True: sleep = asyncio.sleep(config.check_task_loop/1000.0) task = await self.queue.get() - logger_Worker.debug('Runner %d get task: %s, running...' % (id, task['id'])) + logger_Worker.debug( + 'Runner %d get task: %s, running...' % (id, task['id'])) done = False try: - done = await self.do(task) + done = await self.do(task) except Exception as e: - logger_Worker.error('Runner %d get task: %s, failed! %s' % (id, task['id'], str(e))) + logger_Worker.error( + 'Runner %d get task: %s, failed! %s' % (id, task['id'], str(e))) if config.traceback_print: traceback.print_exc() if done: @@ -310,7 +337,6 @@ async def runner(self,id): self.queue.task_done() await sleep - async def producer(self): logger_Worker.debug('Schedule Producer started') while True: @@ -325,17 +351,21 @@ async def producer(self): unlock_tasks += 1 await self.queue.put(task) if unlock_tasks > 0: - logger_Worker.debug('Scaned %d task, put in Queue...' % unlock_tasks) + logger_Worker.debug( + 'Scaned %d task, put in Queue...' % unlock_tasks) except Exception as e: - logger_Worker.error('Schedule Producer get tasks failed! %s' % str(e)) + logger_Worker.error( + 'Schedule Producer get tasks failed! %s' % str(e)) if config.traceback_print: traceback.print_exc() await sleep # 旧版本批量任务定时执行 # 建议仅当新版 Queue 生产者消费者定时执行功能失效时使用 + + class BatchWorker(BaseWorker): - def __init__(self, db:DB): + def __init__(self, db: DB): logger_Worker.info('Batch Worker start...') super().__init__(db) @@ -348,11 +378,13 @@ def __call__(self): if self.running: return self.running = gen.convert_yielded(self.run()) + def done(future: asyncio.Future): self.running = None success, failed = future.result() if success or failed: - logger_Worker.info('%d task done. %d success, %d failed' % (success+failed, success, failed)) + logger_Worker.info('%d task done. %d success, %d failed' % ( + success+failed, success, failed)) return self.running.add_done_callback(done) @@ -366,7 +398,8 @@ async def run(self): for task in tasks: running.append(asyncio.ensure_future(self.do(task))) if len(running) >= 50: - logger_Worker.debug('scaned %d task, waiting...', len(running)) + logger_Worker.debug( + 'scaned %d task, waiting...', len(running)) result = await asyncio.gather(*running[:10]) for each in result: if each: @@ -387,6 +420,7 @@ async def run(self): logger_Worker.exception(e) return (success, failed) + if __name__ == '__main__': from db import DB tornado.log.enable_pretty_logging()