From 95b996c97226706dce60b583636ecac6d7d41554 Mon Sep 17 00:00:00 2001 From: MINKYU LEE Date: Sat, 6 Jul 2024 05:38:59 +0900 Subject: [PATCH] =?UTF-8?q?=EB=B0=B0=EC=B9=98=20=EC=9E=A1=20API=20?= =?UTF-8?q?=EA=B0=9C=EB=B0=9C=20=EB=B0=8F=20DependencySolver=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=20(#25)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. DependencySolver 에서 불필요한 argparse 사용으로 인해 발생하던 "unrecognized arguments" 에러 해결 2. 배치 잡 스케줄러에서 DependencySolver 사용하여 잡 실행하도록 개선 3. 배치 잡 스케줄러 로깅문 강화 4. 배치 잡 리스트/강제실행 API 구현 --- waffledotcom/src/app.py | 4 +- waffledotcom/src/apps/batch/__init__.py | 3 + waffledotcom/src/apps/batch/schemas.py | 28 ++++++++ waffledotcom/src/apps/batch/views.py | 60 ++++++++++++++++ waffledotcom/src/apps/router.py | 3 +- waffledotcom/src/batch/scheduler.py | 76 ++++++++++++++++++--- waffledotcom/src/batch/slack/main.py | 3 +- waffledotcom/src/utils/dependency_solver.py | 24 ++----- 8 files changed, 170 insertions(+), 31 deletions(-) create mode 100644 waffledotcom/src/apps/batch/__init__.py create mode 100644 waffledotcom/src/apps/batch/schemas.py create mode 100644 waffledotcom/src/apps/batch/views.py diff --git a/waffledotcom/src/app.py b/waffledotcom/src/app.py index 022a6fa..fe6ed40 100644 --- a/waffledotcom/src/app.py +++ b/waffledotcom/src/app.py @@ -4,7 +4,7 @@ from fastapi.middleware.cors import CORSMiddleware from waffledotcom.src.apps.router import api_router -from waffledotcom.src.batch.scheduler import schedule_tasks +from waffledotcom.src.batch.scheduler import run_scheduling_service from waffledotcom.src.database.connection import DBSessionFactory from waffledotcom.src.settings import settings @@ -35,7 +35,7 @@ def on_shutdown(): def _register_startup_event(app: FastAPI): @app.on_event("startup") def on_startup(): - asyncio.create_task(schedule_tasks()) + asyncio.create_task(run_scheduling_service()) return on_startup diff --git a/waffledotcom/src/apps/batch/__init__.py b/waffledotcom/src/apps/batch/__init__.py new file mode 100644 index 0000000..7b624d4 --- /dev/null +++ b/waffledotcom/src/apps/batch/__init__.py @@ -0,0 +1,3 @@ +from .views import v1_router as router + +__all__ = ["router"] diff --git a/waffledotcom/src/apps/batch/schemas.py b/waffledotcom/src/apps/batch/schemas.py new file mode 100644 index 0000000..537ced1 --- /dev/null +++ b/waffledotcom/src/apps/batch/schemas.py @@ -0,0 +1,28 @@ +from datetime import datetime, time + +from pydantic import BaseModel + + +class JobDto(BaseModel): + name: str + interval: int + unit: str + start_day: str | None + at_time: time | None + next_run: datetime | None + last_run: datetime | None + + +class ScheduleResponse(BaseModel): + jobs: list[JobDto] + + +class ForceRunResult(BaseModel): + name: str + tags: list[str] + success: bool + reason: str | None = None + + +class ForceRunResponse(BaseModel): + results: list[ForceRunResult] diff --git a/waffledotcom/src/apps/batch/views.py b/waffledotcom/src/apps/batch/views.py new file mode 100644 index 0000000..796f549 --- /dev/null +++ b/waffledotcom/src/apps/batch/views.py @@ -0,0 +1,60 @@ +from fastapi import APIRouter + +from waffledotcom.src.batch.scheduler import get_job_name, scheduler + +from .schemas import ForceRunResponse, ForceRunResult, JobDto, ScheduleResponse + +v1_router = APIRouter(prefix="/v1/batch", tags=["batch"]) + + +@v1_router.get("/schedule", response_model_exclude_none=True) +def get_schedule( + tag: str | None = None, +) -> ScheduleResponse: + job_dtos = [ + JobDto( + name=getattr(job.job_func, "__qualname__", "Unknown"), + interval=job.interval, + unit=job.unit, # type: ignore + start_day=job.start_day, + at_time=job.at_time, + next_run=job.next_run, + last_run=job.last_run, + ) + for job in scheduler.get_jobs(tag) + ] + return ScheduleResponse(jobs=job_dtos) + + +@v1_router.post("/force-run", response_model_exclude_none=True) +def force_run_job( + name: str | None = None, + tag: str | None = None, +) -> ForceRunResponse: + if not ((name is None) ^ (tag is None)): + raise ValueError("Either name or tag should be provided") + jobs = scheduler.get_jobs(tag) + if name is not None: + jobs = [job for job in jobs if get_job_name(job) == name] + + results: list[ForceRunResult] = [] + for job in jobs: + try: + job.run() + results.append( + ForceRunResult( + name=get_job_name(job), + tags=list(map(str, job.tags)), + success=True, + ) + ) + except Exception as e: + results.append( + ForceRunResult( + name=get_job_name(job), + tags=list(map(str, job.tags)), + success=False, + reason=str(e), + ) + ) + return ForceRunResponse(results=results) diff --git a/waffledotcom/src/apps/router.py b/waffledotcom/src/apps/router.py index 6196ba7..cc51efd 100644 --- a/waffledotcom/src/apps/router.py +++ b/waffledotcom/src/apps/router.py @@ -1,6 +1,7 @@ from fastapi.routing import APIRouter -from waffledotcom.src.apps import user +from waffledotcom.src.apps import batch, user api_router = APIRouter(prefix="/api") api_router.include_router(user.router) +api_router.include_router(batch.router) diff --git a/waffledotcom/src/batch/scheduler.py b/waffledotcom/src/batch/scheduler.py index c68082c..f70a368 100644 --- a/waffledotcom/src/batch/scheduler.py +++ b/waffledotcom/src/batch/scheduler.py @@ -3,20 +3,80 @@ """ import asyncio +from collections.abc import Callable +from datetime import datetime +from functools import wraps -from schedule import Scheduler +from loguru import logger +from schedule import Job, Scheduler -from waffledotcom.src.batch.slack.main import main as slack_main +from waffledotcom.src.batch.slack.main import create_users_from_slack from waffledotcom.src.settings import settings +from waffledotcom.src.utils.dependency_solver import solver +scheduler = Scheduler() -async def schedule_tasks(): - scheduler = Scheduler() + +def get_job_name(job: Job) -> str: + return getattr(job.job_func, "__qualname__", "Unknown") + + +def job_wrapper(job_func: Callable): + job_name = job_func.__qualname__ + + async def run_with_dependencies(): + try: + logger.info(f"Start scheduled job [{job_name}]") + await solver.run(job_func) + logger.info(f"End scheduled job [{job_name}]") + except Exception as e: + logger.opt(exception=e).error(f"Error occurred in job [{job_name}]") + + @wraps(job_func) + def run_in_loop(): + try: + asyncio.create_task(run_with_dependencies()) + except RuntimeError: + asyncio.run(run_with_dependencies()) + + return run_in_loop + + +def setup_job_schedule(): if settings.is_dev: - scheduler.every().saturday.at("00:00", "Asia/Seoul").do(slack_main) + scheduler.every().saturday.at("12:00", "Asia/Seoul").do( + job_wrapper(create_users_from_slack) + ).tag("slack") if settings.is_prod: - scheduler.every().sunday.at("00:00", "Asia/Seoul").do(slack_main) + scheduler.every().sunday.at("00:00", "Asia/Seoul").do( + job_wrapper(create_users_from_slack) + ).tag("slack") + + for job in scheduler.get_jobs(): + job_name = get_job_name(job) + + logger.info( + "Job [{job_name}] is scheduled every {interval} {unit}. Next run at" + " {next_run}", + job_name=job_name, + interval=job.interval, + unit=job.unit, + next_run=job.next_run, + ) + + +async def run_scheduling_service(): + setup_job_schedule() while True: - # 최소 주기를 60초로 설정 - await asyncio.sleep(60) + next_run = scheduler.get_next_run() + + # 앞으로 더 이상 스케줄된 작업이 없다면 스케줄링 서비스를 종료한다. + if next_run is None: + logger.info("No jobs scheduled. Exiting scheduling service") + break + + # 다음 작업이 실행되기까지 대기해야할 시간을 계산한다. + now = datetime.now() + delay = max((next_run - now).total_seconds(), 1) + await asyncio.sleep(delay) scheduler.run_pending() diff --git a/waffledotcom/src/batch/slack/main.py b/waffledotcom/src/batch/slack/main.py index 8ad89c0..397c3d1 100644 --- a/waffledotcom/src/batch/slack/main.py +++ b/waffledotcom/src/batch/slack/main.py @@ -5,7 +5,7 @@ from waffledotcom.src.apps.user.services import UserService from waffledotcom.src.batch.slack.services import AsyncSlackApiService -from waffledotcom.src.utils.dependency_solver import DependencySolver +from waffledotcom.src.utils.dependency_solver import solver async def create_users_from_slack( @@ -21,7 +21,6 @@ async def create_users_from_slack( def main(): - solver = DependencySolver() try: asyncio.create_task(solver.run(create_users_from_slack)) except RuntimeError: diff --git a/waffledotcom/src/utils/dependency_solver.py b/waffledotcom/src/utils/dependency_solver.py index 7bf5715..8d663d5 100644 --- a/waffledotcom/src/utils/dependency_solver.py +++ b/waffledotcom/src/utils/dependency_solver.py @@ -1,29 +1,14 @@ -import argparse -from contextlib import AsyncExitStack import inspect +from contextlib import AsyncExitStack from typing import Callable from fastapi import Request from fastapi.dependencies.models import Dependant -from fastapi.dependencies.utils import get_dependant -from fastapi.dependencies.utils import solve_dependencies +from fastapi.dependencies.utils import get_dependant, solve_dependencies from pydantic import ValidationError class DependencySolver: - """ - A utility class to solve FastAPI dependencies in an ad-hoc manner. - """ - - def __init__(self, name: str | None = None, namespace=None): - self.name = name or self.__class__.__name__ - self.parser = self.make_parser() - self.args = self.parser.parse_args(namespace=namespace) - - # override to add command-line arguments - def make_parser(self) -> argparse.ArgumentParser: - return argparse.ArgumentParser(prog=self.name) - async def solve_command(self, request: Request, dependant: Dependant): values, errors, _1, _2, _3 = await solve_dependencies( request=request, dependant=dependant @@ -48,6 +33,9 @@ async def run(self, command: Callable): } ) - dependant = get_dependant(path=f"command:{self.name}", call=command) + dependant = get_dependant(path=f"command:{command.__name__}", call=command) return await self.solve_command(request, dependant) + + +solver = DependencySolver()