Skip to content

Commit

Permalink
배치 잡 API 개발 및 DependencySolver 수정 (#25)
Browse files Browse the repository at this point in the history
1. DependencySolver 에서 불필요한 argparse 사용으로 인해 발생하던 "unrecognized
arguments" 에러 해결
2. 배치 잡 스케줄러에서 DependencySolver 사용하여 잡 실행하도록 개선
3. 배치 잡 스케줄러 로깅문 강화
4. 배치 잡 리스트/강제실행 API 구현
  • Loading branch information
minkyu97 authored Jul 5, 2024
1 parent d9d78e8 commit 95b996c
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 31 deletions.
4 changes: 2 additions & 2 deletions waffledotcom/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi.middleware.cors import CORSMiddleware

from waffledotcom.src.apps.router import api_router
from waffledotcom.src.batch.scheduler import schedule_tasks
from waffledotcom.src.batch.scheduler import run_scheduling_service
from waffledotcom.src.database.connection import DBSessionFactory
from waffledotcom.src.settings import settings

Expand Down Expand Up @@ -35,7 +35,7 @@ def on_shutdown():
def _register_startup_event(app: FastAPI):
@app.on_event("startup")
def on_startup():
asyncio.create_task(schedule_tasks())
asyncio.create_task(run_scheduling_service())

return on_startup

Expand Down
3 changes: 3 additions & 0 deletions waffledotcom/src/apps/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .views import v1_router as router

__all__ = ["router"]
28 changes: 28 additions & 0 deletions waffledotcom/src/apps/batch/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from datetime import datetime, time

from pydantic import BaseModel


class JobDto(BaseModel):
name: str
interval: int
unit: str
start_day: str | None
at_time: time | None
next_run: datetime | None
last_run: datetime | None


class ScheduleResponse(BaseModel):
jobs: list[JobDto]


class ForceRunResult(BaseModel):
name: str
tags: list[str]
success: bool
reason: str | None = None


class ForceRunResponse(BaseModel):
results: list[ForceRunResult]
60 changes: 60 additions & 0 deletions waffledotcom/src/apps/batch/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from fastapi import APIRouter

from waffledotcom.src.batch.scheduler import get_job_name, scheduler

from .schemas import ForceRunResponse, ForceRunResult, JobDto, ScheduleResponse

v1_router = APIRouter(prefix="/v1/batch", tags=["batch"])


@v1_router.get("/schedule", response_model_exclude_none=True)
def get_schedule(
tag: str | None = None,
) -> ScheduleResponse:
job_dtos = [
JobDto(
name=getattr(job.job_func, "__qualname__", "Unknown"),
interval=job.interval,
unit=job.unit, # type: ignore
start_day=job.start_day,
at_time=job.at_time,
next_run=job.next_run,
last_run=job.last_run,
)
for job in scheduler.get_jobs(tag)
]
return ScheduleResponse(jobs=job_dtos)


@v1_router.post("/force-run", response_model_exclude_none=True)
def force_run_job(
name: str | None = None,
tag: str | None = None,
) -> ForceRunResponse:
if not ((name is None) ^ (tag is None)):
raise ValueError("Either name or tag should be provided")
jobs = scheduler.get_jobs(tag)
if name is not None:
jobs = [job for job in jobs if get_job_name(job) == name]

results: list[ForceRunResult] = []
for job in jobs:
try:
job.run()
results.append(
ForceRunResult(
name=get_job_name(job),
tags=list(map(str, job.tags)),
success=True,
)
)
except Exception as e:
results.append(
ForceRunResult(
name=get_job_name(job),
tags=list(map(str, job.tags)),
success=False,
reason=str(e),
)
)
return ForceRunResponse(results=results)
3 changes: 2 additions & 1 deletion waffledotcom/src/apps/router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fastapi.routing import APIRouter

from waffledotcom.src.apps import user
from waffledotcom.src.apps import batch, user

api_router = APIRouter(prefix="/api")
api_router.include_router(user.router)
api_router.include_router(batch.router)
76 changes: 68 additions & 8 deletions waffledotcom/src/batch/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,80 @@
"""

import asyncio
from collections.abc import Callable
from datetime import datetime
from functools import wraps

from schedule import Scheduler
from loguru import logger
from schedule import Job, Scheduler

from waffledotcom.src.batch.slack.main import main as slack_main
from waffledotcom.src.batch.slack.main import create_users_from_slack
from waffledotcom.src.settings import settings
from waffledotcom.src.utils.dependency_solver import solver

scheduler = Scheduler()

async def schedule_tasks():
scheduler = Scheduler()

def get_job_name(job: Job) -> str:
return getattr(job.job_func, "__qualname__", "Unknown")


def job_wrapper(job_func: Callable):
job_name = job_func.__qualname__

async def run_with_dependencies():
try:
logger.info(f"Start scheduled job [{job_name}]")
await solver.run(job_func)
logger.info(f"End scheduled job [{job_name}]")
except Exception as e:
logger.opt(exception=e).error(f"Error occurred in job [{job_name}]")

@wraps(job_func)
def run_in_loop():
try:
asyncio.create_task(run_with_dependencies())
except RuntimeError:
asyncio.run(run_with_dependencies())

return run_in_loop


def setup_job_schedule():
if settings.is_dev:
scheduler.every().saturday.at("00:00", "Asia/Seoul").do(slack_main)
scheduler.every().saturday.at("12:00", "Asia/Seoul").do(
job_wrapper(create_users_from_slack)
).tag("slack")
if settings.is_prod:
scheduler.every().sunday.at("00:00", "Asia/Seoul").do(slack_main)
scheduler.every().sunday.at("00:00", "Asia/Seoul").do(
job_wrapper(create_users_from_slack)
).tag("slack")

for job in scheduler.get_jobs():
job_name = get_job_name(job)

logger.info(
"Job [{job_name}] is scheduled every {interval} {unit}. Next run at"
" {next_run}",
job_name=job_name,
interval=job.interval,
unit=job.unit,
next_run=job.next_run,
)


async def run_scheduling_service():
setup_job_schedule()
while True:
# 최소 주기를 60초로 설정
await asyncio.sleep(60)
next_run = scheduler.get_next_run()

# 앞으로 더 이상 스케줄된 작업이 없다면 스케줄링 서비스를 종료한다.
if next_run is None:
logger.info("No jobs scheduled. Exiting scheduling service")
break

# 다음 작업이 실행되기까지 대기해야할 시간을 계산한다.
now = datetime.now()
delay = max((next_run - now).total_seconds(), 1)
await asyncio.sleep(delay)
scheduler.run_pending()
3 changes: 1 addition & 2 deletions waffledotcom/src/batch/slack/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from waffledotcom.src.apps.user.services import UserService
from waffledotcom.src.batch.slack.services import AsyncSlackApiService
from waffledotcom.src.utils.dependency_solver import DependencySolver
from waffledotcom.src.utils.dependency_solver import solver


async def create_users_from_slack(
Expand All @@ -21,7 +21,6 @@ async def create_users_from_slack(


def main():
solver = DependencySolver()
try:
asyncio.create_task(solver.run(create_users_from_slack))
except RuntimeError:
Expand Down
24 changes: 6 additions & 18 deletions waffledotcom/src/utils/dependency_solver.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
import argparse
from contextlib import AsyncExitStack
import inspect
from contextlib import AsyncExitStack
from typing import Callable

from fastapi import Request
from fastapi.dependencies.models import Dependant
from fastapi.dependencies.utils import get_dependant
from fastapi.dependencies.utils import solve_dependencies
from fastapi.dependencies.utils import get_dependant, solve_dependencies
from pydantic import ValidationError


class DependencySolver:
"""
A utility class to solve FastAPI dependencies in an ad-hoc manner.
"""

def __init__(self, name: str | None = None, namespace=None):
self.name = name or self.__class__.__name__
self.parser = self.make_parser()
self.args = self.parser.parse_args(namespace=namespace)

# override to add command-line arguments
def make_parser(self) -> argparse.ArgumentParser:
return argparse.ArgumentParser(prog=self.name)

async def solve_command(self, request: Request, dependant: Dependant):
values, errors, _1, _2, _3 = await solve_dependencies(
request=request, dependant=dependant
Expand All @@ -48,6 +33,9 @@ async def run(self, command: Callable):
}
)

dependant = get_dependant(path=f"command:{self.name}", call=command)
dependant = get_dependant(path=f"command:{command.__name__}", call=command)

return await self.solve_command(request, dependant)


solver = DependencySolver()

0 comments on commit 95b996c

Please sign in to comment.