diff --git a/github_app_geo_project/scripts/process_queue.py b/github_app_geo_project/scripts/process_queue.py index e1c451e8a36..b1e465b6d08 100644 --- a/github_app_geo_project/scripts/process_queue.py +++ b/github_app_geo_project/scripts/process_queue.py @@ -12,7 +12,7 @@ import subprocess # nosec import sys import urllib.parse -from typing import Any, cast +from typing import Any, NamedTuple, cast import c2cwsgiutils.loader import c2cwsgiutils.setup_process @@ -33,6 +33,16 @@ _NB_JOBS = Gauge("ghci_jobs_number", "Number of jobs", ["status"]) +class _JobInfo(NamedTuple): + module: str + event_name: str + priority: int + repository: str + + +_RUNNING_JOBS: dict[int, _JobInfo] = {} + + class _Handler(logging.Handler): context_var: contextvars.ContextVar[int] = contextvars.ContextVar("job_id") @@ -656,6 +666,7 @@ async def _process_one_job( message.title = f"Start process job '{job.event_name}' id: {job.id}, on {job.owner}/{job.repository} on module: {job.module}, on application {job.application}" root_logger.addHandler(handler) _LOGGER.info(message) + _RUNNING_JOBS[job.id] = _JobInfo(job.module or "-", job.event_name, job.priority, job.repository) root_logger.removeHandler(handler) if make_pending: @@ -722,6 +733,7 @@ async def _process_one_job( job.status = models.JobStatus.ERROR job.finished_at = datetime.datetime.now(tz=datetime.timezone.utc) session.commit() + _RUNNING_JOBS.pop(job.id) _LOGGER.debug("Process one job (max priority: %i): Done", max_priority) return False @@ -784,6 +796,16 @@ async def __call__(self, *args: Any, **kwds: Any) -> Any: await asyncio.sleep(10) +async def _watch_dog() -> None: + while True: + _LOGGER.debug("Watch dog: alive") + with open("/watch_dog", "w", encoding="utf-8") as file_: + file_.write(datetime.datetime.now().isoformat()) + for id_, job in _RUNNING_JOBS.items(): + file_.write(f"{id_}: {job.module} {job.event_name} {job.repository} [{job.priority}]\n") + await asyncio.sleep(60) + + async def _async_main() -> None: """Process the jobs present in the database queue.""" parser = argparse.ArgumentParser(description=__doc__) @@ -817,6 +839,8 @@ async def _async_main() -> None: if not args.exit_when_empty: threads_call.append(_UpdateCounter(Session)()) + threads_call.append(_watch_dog()) + for priority in priority_groups: threads_call.append(_Run(config, Session, args.exit_when_empty, priority)()) await asyncio.gather(*threads_call)