Skip to content

Commit

Permalink
Fix #462: 修复删除模板导致无限推送问题
Browse files Browse the repository at this point in the history
  • Loading branch information
a76yyyy committed Aug 19, 2023
1 parent f1d76cb commit db97b67
Showing 1 changed file with 93 additions and 59 deletions.
152 changes: 93 additions & 59 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@

logger_Worker = Log('QD.Worker').getlogger()


class BaseWorker(object):
def __init__(self, db:DB):
def __init__(self, db: DB):
self.running = False
self.db = db
self.fetcher = Fetcher()

async def ClearLog(self, taskid, sql_session=None):
logDay = int((await self.db.site.get(1, fields=('logDay',), sql_session=sql_session))['logDay'])
for log in await self.db.tasklog.list(taskid = taskid, fields=('id', 'ctime'), sql_session=sql_session):
for log in await self.db.tasklog.list(taskid=taskid, fields=('id', 'ctime'), sql_session=sql_session):
if (time.time() - log['ctime']) > (logDay * 24 * 60 * 60):
await self.db.tasklog.delete(log['id'], sql_session=sql_session)

Expand All @@ -44,11 +45,13 @@ async def push_batch(self):
for user in userlist:
userid = user['id']
push_batch = json.loads(user['push_batch'])
if user['status'] == "Enable" and push_batch["sw"] and isinstance(push_batch['time'],(float,int)) and time.time() >= push_batch['time']:
logger_Worker.debug('User %d check push_batch task, waiting...' % userid)
if user['status'] == "Enable" and push_batch["sw"] and isinstance(push_batch['time'], (float, int)) and time.time() >= push_batch['time']:
logger_Worker.debug(
'User %d check push_batch task, waiting...' % userid)
title = u"QD任务日志定期推送"
delta = push_batch.get("delta", 86400)
logtemp = "{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(push_batch['time'])))
logtemp = "{}".format(time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(push_batch['time'])))
tmpdict = {}
tmp = ""
numlog = 0
Expand All @@ -60,28 +63,37 @@ async def push_batch(self):
tasklog_list = await self.db.tasklog.list(taskid=task["id"], fields=('success', 'ctime', 'msg'), sql_session=sql_session)
for log in tasklog_list:
if (push_batch['time'] - delta) < log['ctime'] <= push_batch['time']:
tmp0 += "\\r\\n时间: {}\\r\\n日志: {}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(log['ctime'])), log['msg'])
tmp0 += "\\r\\n时间: {}\\r\\n日志: {}".format(time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(log['ctime'])), log['msg'])
numlog += 1
tmplist = tmpdict.get(task['tplid'],[])
tmplist = tmpdict.get(task['tplid'], [])
if tmp0:
tmplist.append("\\r\\n-----任务{0}-{1}-----{2}\\r\\n".format(len(tmplist)+1, task['note'], tmp0))
tmplist.append(
"\\r\\n-----任务{0}-{1}-----{2}\\r\\n".format(len(tmplist)+1, task['note'], tmp0))
else:
tmplist.append("\\r\\n-----任务{0}-{1}-----\\r\\n记录期间未执行定时任务,请检查任务! \\r\\n".format(len(tmplist)+1, task['note']))
tmplist.append(
"\\r\\n-----任务{0}-{1}-----\\r\\n记录期间未执行定时任务,请检查任务! \\r\\n".format(len(tmplist)+1, task['note']))
tmpdict[task['tplid']] = tmplist

for tmpkey in tmpdict:
tmp = "\\r\\n\\r\\n=====QD: {0}=====".format((await self.db.tpl.get(tmpkey, fields=('sitename',), sql_session=sql_session))['sitename'])
tmp += ''.join(tmpdict[tmpkey])
logtemp += tmp
tmp_sitename = await self.db.tpl.get(tmpkey, fields=('sitename',), sql_session=sql_session)
if tmp_sitename:
tmp = "\\r\\n\\r\\n=====QD: {0}=====".format(
tmp_sitename['sitename'])
tmp += ''.join(tmpdict[tmpkey])
logtemp += tmp
push_batch["time"] = push_batch['time'] + delta
await self.db.user.mod(userid, push_batch=json.dumps(push_batch), sql_session=sql_session)
if tmp and numlog:
user_email = user.get('email','Unkown')
logger_Worker.debug("Start push batch log for user {}, email:{}".format(userid,user_email))
await pushtool.pusher(userid, {"pushen": bool(push_batch.get("sw",False))}, 4080, title, logtemp)
logger_Worker.info("Success push batch log for user {}, email:{}".format(userid,user_email))
user_email = user.get('email', 'Unkown')
logger_Worker.debug(
"Start push batch log for user {}, email:{}".format(userid, user_email))
await pushtool.pusher(userid, {"pushen": bool(push_batch.get("sw", False))}, 4080, title, logtemp)
logger_Worker.info(
"Success push batch log for user {}, email:{}".format(userid, user_email))
else:
logger_Worker.debug('User %d does not need to perform push_batch task, stop.' % userid)
logger_Worker.debug(
'User %d does not need to perform push_batch task, stop.' % userid)
except Exception as e:
if config.traceback_print:
traceback.print_exc()
Expand Down Expand Up @@ -172,9 +184,9 @@ async def do(self, task):
try:
fetch_tpl = await self.db.user.decrypt(0 if not tpl['userid'] else task['userid'], tpl['tpl'], sql_session=sql_session)
env = dict(
variables = await self.db.user.decrypt(task['userid'], task['init_env'], sql_session=sql_session),
session = [],
)
variables=await self.db.user.decrypt(task['userid'], task['init_env'], sql_session=sql_session),
session=[],
)

url = parse_url(env['variables'].get('_proxy'))
if not url:
Expand All @@ -191,55 +203,65 @@ async def do(self, task):

variables = await self.db.user.encrypt(task['userid'], new_env['variables'], sql_session=sql_session)
session = await self.db.user.encrypt(task['userid'],
new_env['session'].to_json() if hasattr(new_env['session'], 'to_json') else new_env['session'], sql_session=sql_session)
new_env['session'].to_json() if hasattr(new_env['session'], 'to_json') else new_env['session'], sql_session=sql_session)

if (newontime['sw']):
if ('mode' not in newontime):
newontime['mode'] = 'ontime'
if (newontime['mode'] == 'ontime'):
newontime['date'] = (datetime.datetime.now()+datetime.timedelta(days=1)).strftime("%Y-%m-%d")
newontime['date'] = (datetime.datetime.now(
)+datetime.timedelta(days=1)).strftime("%Y-%m-%d")
next = caltool.calNextTs(newontime)['ts']
else:
next = time.time() + max((tpl['interval'] if tpl['interval'] else 24 * 60 * 60), 1*60)
next = time.time() + \
max((tpl['interval'] if tpl['interval']
else 24 * 60 * 60), 1*60)
if tpl['interval'] is None:
next = self.fix_next_time(next)

# success feedback
await self.db.tasklog.add(task['id'], success=True, msg=new_env['variables'].get('__log__'), sql_session=sql_session)
await self.db.task.mod(task['id'],
last_success=time.time(),
last_failed_count=0,
success_count=task['success_count']+1,
env=variables,
session=session,
mtime=time.time(),
next=next,
sql_session=sql_session)
last_success=time.time(),
last_failed_count=0,
success_count=task['success_count']+1,
env=variables,
session=session,
mtime=time.time(),
next=next,
sql_session=sql_session)
await self.db.tpl.incr_success(tpl['id'], sql_session=sql_session)

t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
title = u"QD定时任务 {0}-{1} 成功".format(tpl['sitename'], task['note'])
title = u"QD定时任务 {0}-{1} 成功".format(
tpl['sitename'], task['note'])
logtemp = new_env['variables'].get('__log__')
logtemp = u"{0} \\r\\n日志:{1}".format(t, logtemp)
await pushtool.pusher(user['id'], pushsw, 0x2, title, logtemp)

logger_Worker.info('taskid:%d tplid:%d successed! %.5fs', task['id'], task['tplid'], time.perf_counter()-start)
logger_Worker.info('taskid:%d tplid:%d successed! %.5fs',
task['id'], task['tplid'], time.perf_counter()-start)
# delete log
await self.ClearLog(task['id'],sql_session=sql_session)
logger_Worker.info('taskid:%d tplid:%d clear log.', task['id'], task['tplid'])
await self.ClearLog(task['id'], sql_session=sql_session)
logger_Worker.info(
'taskid:%d tplid:%d clear log.', task['id'], task['tplid'])
except Exception as e:
# failed feedback
if config.traceback_print:
traceback.print_exc()
next_time_delta = self.failed_count_to_time(task['last_failed_count'], task['retry_count'], task['retry_interval'], tpl['interval'])
next_time_delta = self.failed_count_to_time(
task['last_failed_count'], task['retry_count'], task['retry_interval'], tpl['interval'])

t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
title = u"QD定时任务 {0}-{1} 失败".format(tpl['sitename'], task['note'])
title = u"QD定时任务 {0}-{1} 失败".format(
tpl['sitename'], task['note'])
content = u"{0} \\r\\n日志:{1}".format(t, str(e))
disabled = False
if next_time_delta:
next = time.time() + next_time_delta
content = content + u" \\r\\n下次运行时间:{0}".format(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(next)))
content = content + \
u" \\r\\n下次运行时间:{0}".format(time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(next)))
if (logtime['ErrTolerateCnt'] <= task['last_failed_count']):
await pushtool.pusher(user['id'], pushsw, 0x1, title, content)
else:
Expand All @@ -250,21 +272,23 @@ async def do(self, task):

await self.db.tasklog.add(task['id'], success=False, msg=str(e), sql_session=sql_session)
await self.db.task.mod(task['id'],
last_failed=time.time(),
failed_count=task['failed_count']+1,
last_failed_count=task['last_failed_count']+1,
disabled = disabled,
mtime = time.time(),
next=next,
sql_session=sql_session)
last_failed=time.time(),
failed_count=task['failed_count']+1,
last_failed_count=task['last_failed_count']+1,
disabled=disabled,
mtime=time.time(),
next=next,
sql_session=sql_session)
await self.db.tpl.incr_failed(tpl['id'], sql_session=sql_session)

logger_Worker.error('taskid:%d tplid:%d failed! %.4fs \r\n%s', task['id'], task['tplid'], time.perf_counter()-start, str(e).replace('\\r\\n','\r\n'))
logger_Worker.error('taskid:%d tplid:%d failed! %.4fs \r\n%s', task['id'], task['tplid'], time.perf_counter(
)-start, str(e).replace('\\r\\n', '\r\n'))
return False
return True


class QueueWorker(BaseWorker):
def __init__(self, db:DB):
def __init__(self, db: DB):
logger_Worker.info('Queue Worker start...')
self.queue = asyncio.Queue(maxsize=config.queue_num)
self.task_lock = {}
Expand All @@ -279,26 +303,29 @@ async def __call__(self):
asyncio.create_task(self.runner(i))

while True:
sleep = asyncio.sleep( config.push_batch_delta )
sleep = asyncio.sleep(config.push_batch_delta)
if self.success or self.failed:
logger_Worker.info('Last %d seconds, %d task done. %d success, %d failed' % (config.push_batch_delta, self.success+self.failed, self.success, self.failed))
logger_Worker.info('Last %d seconds, %d task done. %d success, %d failed' % (
config.push_batch_delta, self.success+self.failed, self.success, self.failed))
self.success = 0
self.failed = 0
if config.push_batch_sw:
await self.push_batch()
await sleep

async def runner(self,id):
async def runner(self, id):
logger_Worker.debug('Runner %d started' % id)
while True:
sleep = asyncio.sleep(config.check_task_loop/1000.0)
task = await self.queue.get()
logger_Worker.debug('Runner %d get task: %s, running...' % (id, task['id']))
logger_Worker.debug(
'Runner %d get task: %s, running...' % (id, task['id']))
done = False
try:
done = await self.do(task)
done = await self.do(task)
except Exception as e:
logger_Worker.error('Runner %d get task: %s, failed! %s' % (id, task['id'], str(e)))
logger_Worker.error(
'Runner %d get task: %s, failed! %s' % (id, task['id'], str(e)))
if config.traceback_print:
traceback.print_exc()
if done:
Expand All @@ -310,7 +337,6 @@ async def runner(self,id):
self.queue.task_done()
await sleep


async def producer(self):
logger_Worker.debug('Schedule Producer started')
while True:
Expand All @@ -325,17 +351,21 @@ async def producer(self):
unlock_tasks += 1
await self.queue.put(task)
if unlock_tasks > 0:
logger_Worker.debug('Scaned %d task, put in Queue...' % unlock_tasks)
logger_Worker.debug(
'Scaned %d task, put in Queue...' % unlock_tasks)
except Exception as e:
logger_Worker.error('Schedule Producer get tasks failed! %s' % str(e))
logger_Worker.error(
'Schedule Producer get tasks failed! %s' % str(e))
if config.traceback_print:
traceback.print_exc()
await sleep

# 旧版本批量任务定时执行
# 建议仅当新版 Queue 生产者消费者定时执行功能失效时使用


class BatchWorker(BaseWorker):
def __init__(self, db:DB):
def __init__(self, db: DB):
logger_Worker.info('Batch Worker start...')
super().__init__(db)

Expand All @@ -348,11 +378,13 @@ def __call__(self):
if self.running:
return
self.running = gen.convert_yielded(self.run())

def done(future: asyncio.Future):
self.running = None
success, failed = future.result()
if success or failed:
logger_Worker.info('%d task done. %d success, %d failed' % (success+failed, success, failed))
logger_Worker.info('%d task done. %d success, %d failed' % (
success+failed, success, failed))
return
self.running.add_done_callback(done)

Expand All @@ -366,7 +398,8 @@ async def run(self):
for task in tasks:
running.append(asyncio.ensure_future(self.do(task)))
if len(running) >= 50:
logger_Worker.debug('scaned %d task, waiting...', len(running))
logger_Worker.debug(
'scaned %d task, waiting...', len(running))
result = await asyncio.gather(*running[:10])
for each in result:
if each:
Expand All @@ -387,6 +420,7 @@ async def run(self):
logger_Worker.exception(e)
return (success, failed)


if __name__ == '__main__':
from db import DB
tornado.log.enable_pretty_logging()
Expand Down

0 comments on commit db97b67

Please sign in to comment.