Skip to content

Commit

Permalink
porting to python3
Browse files Browse the repository at this point in the history
  • Loading branch information
cdhigh committed Feb 22, 2024
1 parent c6fe850 commit 3a68439
Show file tree
Hide file tree
Showing 18 changed files with 387 additions and 186 deletions.
2 changes: 1 addition & 1 deletion application/back_end/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from operator import attrgetter
from ..utils import ke_encrypt, ke_decrypt, tz_now

if os.getenv('DATABASE_ENGINE') in ("datastore", "mongodb"):
if os.getenv('DATABASE_ENGINE') in ("datastore", "mongodb", "redis"):
from .db_models_nosql import *
else:
from .db_models_sql import *
Expand Down
9 changes: 9 additions & 0 deletions application/back_end/db_models_nosql.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,20 @@

if __DB_NAME.startswith('mongodb://'):
dbInstance = MongoDbClient(__APP_ID, __DB_NAME)
elif __DB_NAME.startswith('redis://'):
dbInstance = RedisDbClient(__APP_ID, __DB_NAME)
elif __DB_ENGINE == "datastore":
dbInstance = DatastoreClient(project=__APP_ID)
elif __DB_ENGINE == "mongodb":
dbInstance = MongoDbClient(__APP_ID, host=os.getenv('DATABASE_HOST'), port=int(os.getenv('DATABASE_PORT')),
username=(os.getenv('DATABASE_USERNAME') or None), password=(os.getenv('DATABASE_PASSWORD') or None))
elif __DB_ENGINE == "redis":
try:
db_no = int(os.getenv('DATABASE_NAME') or '0')
except:
db_no = 0
dbInstance = RedisDbClient(__APP_ID, host=os.getenv('DATABASE_HOST'), port=int(os.getenv('DATABASE_PORT')),
db=db_no, password=(os.getenv('DATABASE_PASSWORD') or None))
else:
raise Exception("database engine '{}' not supported yet".format(__DB_ENGINE))

Expand Down
142 changes: 7 additions & 135 deletions application/back_end/task_queue_adpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,146 +6,18 @@

__TASK_QUEUE_SERVICE = os.getenv('TASK_QUEUE_SERVICE')
if __TASK_QUEUE_SERVICE == "gae":
from google.cloud import tasks_v2
DEFAULT_QUEUE_NAME = "default"

def init_task_queue_service(app):
pass

#外部调用此接口即可
def create_delivery_task(payload: dict):
create_http_task('/worker', payload)

def create_url2book_task(payload: dict):
create_http_task('/url2book', payload)

#创建一个任务
#url: 任务要调用的链接
#payload: 要传递给url的参数,为一个Python字典
#返回创建的任务实例
def create_http_task(url, payload):
client = tasks_v2.CloudTasksClient()

task = {"app_engine_http_request": {
"http_method": tasks_v2.HttpMethod.GET,
"relative_uri": url,}
}
if payload:
task["app_engine_http_request"]["headers"] = {"Content-type": "application/json"}
task["app_engine_http_request"]["body"] = json.dumps(payload).encode()
return client.create_task(task=task)

#httpRequest = tasks_v2.HttpRequest(http_method=tasks_v2.HttpMethod.GET, url=url,
# headers={"Content-type": "application/json"}, body=json.dumps(payload).encode(),)
#task = tasks_v2.Task(httpRequest=httpRequest)
#taskParent = client.queue_path(APP_ID, SERVER_LOCATION, DEFAULT_QUEUE_NAME)
#return client.create_task(tasks_v2.CreateTaskRequest(parent=taskParent, task=task))
from .task_queue_gae import *

elif __TASK_QUEUE_SERVICE == 'celery':
#启动celery
#celery -A main.celery_app worker --loglevel=info --logfile=d:\celery.log --concurrency=2 -P eventlet
#celery -A main.celery_app beat -s /home/celery/var/run/celerybeat-schedule --loglevel=info --logfile=d:\celery.log --concurrency=2 -P eventlet
from celery import Celery, Task, shared_task
from celery.schedules import crontab
from ..work.worker import WorkerImpl
from ..work.url2book import Url2BookImpl

def init_task_queue_service(app):
class FlaskTask(Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

app.config.from_mapping(
CELERY={'broker_url': app.config['TASK_QUEUE_BROKER_URL'],
'result_backend': app.config['TASK_QUEUE_RESULT_BACKEND'],
'task_ignore_result': True,
},)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()

celery_app.conf.beat_schedule = {
'check_deliver': {
'task': 'check_deliver',
'schedule': crontab(minute=0, hour='*/1'), #每个小时
'args': []
},
'remove_logs': {
'task': 'remove_logs', #每天凌晨
'schedule': crontab(minute=0, hour=0, day_of_month='*/1'),
'args': []
},
}
app.extensions["celery"] = celery_app
return celery_app

@shared_task(name="check_deliver", ignore_result=True)
def check_deliver():
MultiUserDelivery()

@shared_task(name="remove_logs", ignore_result=True)
def remove_logs():
RemoveLogs()

@shared_task(ignore_result=True)
def start_celery_worker_impl(**payload):
return WorkerImpl(**payload)
elif __TASK_QUEUE_SERVICE == 'apscheduler':
from .task_queue_apscheduler import *

@shared_task(ignore_result=True)
def start_celery_url2book(**payload):
return Url2BookImpl(**payload)

def create_delivery_task(payload: dict):
start_celery_worker_impl.delay(**payload)

def create_url2book_task(payload: dict):
start_celery_url2book.delay(**payload)
elif __TASK_QUEUE_SERVICE == 'celery':
from .task_queue_celery import *

elif __TASK_QUEUE_SERVICE == 'rq':
#启动rq
#set FLASK_APP=main.py
#flask rq worker

from flask_rq2 import RQ

rq = RQ()

def init_task_queue_service(app):
app.config['RQ_REDIS_URL'] = app.config['TASK_QUEUE_BROKER_URL']
rq.init_app(app)
#check_deliver.cron('0 */1 * * *', 'check_deliver') #每隔一个小时执行一次
#remove_logs.cron('0 0 */1 * *', 'check_deliver') #每隔24小时执行一次
return rq

@rq.job
def check_deliver():
from ..view.deliver import MultiUserDelivery
MultiUserDelivery()

@rq.job
def remove_logs():
from ..view.logs import RemoveLogs
RemoveLogs()

@rq.job
def start_rq_worker_impl(**payload):
from ..work.worker import WorkerImpl
return WorkerImpl(**payload)

@rq.job
def start_rq_url2book(**payload):
from ..work.url2book import Url2BookImpl
return Url2BookImpl(**payload)

def create_delivery_task(payload: dict):
start_rq_worker_impl.queue(**payload)

def create_url2book_task(payload: dict):
start_rq_url2book.queue(**payload)
from .task_queue_rq import *

elif not __TASK_QUEUE_SERVICE:
elif not __TASK_QUEUE_SERVICE: #直接调用,并且不支持定时任务
from ..work.worker import WorkerImpl
from ..work.url2book import Url2BookImpl
def create_delivery_task(payload: dict):
Expand Down
52 changes: 52 additions & 0 deletions application/back_end/task_queue_apscheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#任务队列APScheduler
#Author: cdhigh <https://github.com/cdhigh>
import random

from flask_apscheduler import APScheduler
#from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED

scheduler = APScheduler()

#https://viniciuschiele.github.io/flask-apscheduler/rst/api.html
scheduler.api_enabled = True #提供/scheduler/jobs等几个有用的url

def init_task_queue_service(app):
scheduler.init_app(app)
#scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
scheduler.start()
app.extensions["scheduler"] = scheduler
return scheduler

#APScheduler会自动删除trigger为date的任务,这个函数不需要了
#def job_listener(event):
# scheduler.remove_job(event.job_id)

#@scheduler.task('interval', id='check_deliver', hours=1, misfire_grace_time=20*60, coalesce=True)
@scheduler.task('cron', minute=50, id='check_deliver', misfire_grace_time=20*60, coalesce=True)
def check_deliver():
from ..view.deliver import MultiUserDelivery
MultiUserDelivery()

@scheduler.task('interval', id='remove_logs', days=1, misfire_grace_time=20*60, coalesce=True)
def remove_logs():
from ..view.logs import RemoveLogs
RemoveLogs()

def create_delivery_task(payload: dict):
from ..work.worker import WorkerImpl
userName = payload.get('userName', '')
recipeId = payload.get('recipeId', '')
scheduler.add_job(f'Worker{random.randint(0, 1000)}', WorkerImpl, args=[userName, recipeId],
misfire_grace_time=20*60, replace_existing=True)

def create_url2book_task(payload: dict):
from ..work.url2book import Url2BookImpl
userName = payload.get('userName', '')
urls = payload.get('urls', '')
subject = payload.get('subject', '')
action = payload.get('action', '')
args = [userName, urls, subject, action]
scheduler.add_job(f'Url2Book{random.randint(0, 1000)}', Url2BookImpl, args=args, misfire_grace_time=20*60,
replace_existing=True)
67 changes: 67 additions & 0 deletions application/back_end/task_queue_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#任务队列celery
#Author: cdhigh <https://github.com/cdhigh>

#启动celery
#celery -A main.celery_app worker --loglevel=info --logfile=d:\celery.log --concurrency=2 -P eventlet
#celery -A main.celery_app beat -s /home/celery/var/run/celerybeat-schedule --loglevel=info --logfile=d:\celery.log --concurrency=2 -P eventlet
from celery import Celery, Task, shared_task
from celery.schedules import crontab

def init_task_queue_service(app):
class FlaskTask(Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

app.config.from_mapping(
CELERY={'broker_url': app.config['TASK_QUEUE_BROKER_URL'],
'result_backend': app.config['TASK_QUEUE_RESULT_BACKEND'],
'task_ignore_result': True,
},)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()

celery_app.conf.beat_schedule = {
'check_deliver': {
'task': 'check_deliver',
'schedule': crontab(minute=0, hour='*/1'), #每个小时
'args': []
},
'remove_logs': {
'task': 'remove_logs', #每天凌晨
'schedule': crontab(minute=0, hour=0, day_of_month='*/1'),
'args': []
},
}
app.extensions["celery"] = celery_app
return celery_app

@shared_task(name="check_deliver", ignore_result=True)
def check_deliver():
from ..view.deliver import MultiUserDelivery
MultiUserDelivery()

@shared_task(name="remove_logs", ignore_result=True)
def remove_logs():
from ..view.logs import RemoveLogs
RemoveLogs()

@shared_task(ignore_result=True)
def start_celery_worker_impl(**payload):
from ..work.worker import WorkerImpl
return WorkerImpl(**payload)

@shared_task(ignore_result=True)
def start_celery_url2book(**payload):
from ..work.url2book import Url2BookImpl
return Url2BookImpl(**payload)

def create_delivery_task(payload: dict):
start_celery_worker_impl.delay(**payload)

def create_url2book_task(payload: dict):
start_celery_url2book.delay(**payload)
40 changes: 40 additions & 0 deletions application/back_end/task_queue_gae.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#任务队列GAE
#Author: cdhigh <https://github.com/cdhigh>
import json
from google.cloud import tasks_v2
DEFAULT_QUEUE_NAME = "default"

def init_task_queue_service(app):
pass

#外部调用此接口即可
def create_delivery_task(payload: dict):
create_http_task('/worker', payload)

def create_url2book_task(payload: dict):
create_http_task('/url2book', payload)

#创建一个任务
#url: 任务要调用的链接
#payload: 要传递给url的参数,为一个Python字典
#返回创建的任务实例
def create_http_task(url, payload):
client = tasks_v2.CloudTasksClient()

task = {"app_engine_http_request": {
"http_method": tasks_v2.HttpMethod.GET,
"relative_uri": url,}
}
if payload:
task["app_engine_http_request"]["headers"] = {"Content-type": "application/json"}
task["app_engine_http_request"]["body"] = json.dumps(payload).encode()
return client.create_task(task=task)

#httpRequest = tasks_v2.HttpRequest(http_method=tasks_v2.HttpMethod.GET, url=url,
# headers={"Content-type": "application/json"}, body=json.dumps(payload).encode(),)
#task = tasks_v2.Task(httpRequest=httpRequest)
#taskParent = client.queue_path(APP_ID, SERVER_LOCATION, DEFAULT_QUEUE_NAME)
#return client.create_task(tasks_v2.CreateTaskRequest(parent=taskParent, task=task))

46 changes: 46 additions & 0 deletions application/back_end/task_queue_rq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#任务队列rq
#Author: cdhigh <https://github.com/cdhigh>
import os, sys, json

#启动rq
#set FLASK_APP=main.py
#flask rq worker

from flask_rq2 import RQ

rq = RQ()

def init_task_queue_service(app):
app.config['RQ_REDIS_URL'] = app.config['TASK_QUEUE_BROKER_URL']
rq.init_app(app)
#check_deliver.cron('0 */1 * * *', 'check_deliver') #每隔一个小时执行一次
#remove_logs.cron('0 0 */1 * *', 'check_deliver') #每隔24小时执行一次
return rq

@rq.job
def check_deliver():
from ..view.deliver import MultiUserDelivery
MultiUserDelivery()

@rq.job
def remove_logs():
from ..view.logs import RemoveLogs
RemoveLogs()

@rq.job
def start_rq_worker_impl(**payload):
from ..work.worker import WorkerImpl
return WorkerImpl(**payload)

@rq.job
def start_rq_url2book(**payload):
from ..work.url2book import Url2BookImpl
return Url2BookImpl(**payload)

def create_delivery_task(payload: dict):
start_rq_worker_impl.queue(**payload)

def create_url2book_task(payload: dict):
start_rq_url2book.queue(**payload)
Loading

0 comments on commit 3a68439

Please sign in to comment.