Skip to content

Commit

Permalink
Add watch dog
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Jun 6, 2024
1 parent b9d4081 commit b2ad543
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion github_app_geo_project/scripts/process_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import subprocess # nosec
import sys
import urllib.parse
from typing import Any, cast
from typing import Any, NamedTuple, cast

import c2cwsgiutils.loader
import c2cwsgiutils.setup_process
Expand All @@ -33,6 +33,16 @@
_NB_JOBS = Gauge("ghci_jobs_number", "Number of jobs", ["status"])


class _JobInfo(NamedTuple):
module: str
event_name: str
priority: int
repository: str


_RUNNING_JOBS: dict[int, _JobInfo] = {}


class _Handler(logging.Handler):
context_var: contextvars.ContextVar[int] = contextvars.ContextVar("job_id")

Expand Down Expand Up @@ -656,6 +666,7 @@ async def _process_one_job(
message.title = f"Start process job '{job.event_name}' id: {job.id}, on {job.owner}/{job.repository} on module: {job.module}, on application {job.application}"
root_logger.addHandler(handler)
_LOGGER.info(message)
_RUNNING_JOBS[job.id] = _JobInfo(job.module or "-", job.event_name, job.priority, job.repository)
root_logger.removeHandler(handler)

if make_pending:
Expand Down Expand Up @@ -722,6 +733,7 @@ async def _process_one_job(
job.status = models.JobStatus.ERROR
job.finished_at = datetime.datetime.now(tz=datetime.timezone.utc)
session.commit()
_RUNNING_JOBS.pop(job.id)

_LOGGER.debug("Process one job (max priority: %i): Done", max_priority)
return False
Expand Down Expand Up @@ -784,6 +796,16 @@ async def __call__(self, *args: Any, **kwds: Any) -> Any:
await asyncio.sleep(10)


async def _watch_dog() -> None:
while True:
_LOGGER.debug("Watch dog: alive")
with open("/watch_dog", "w", encoding="utf-8") as file_:
file_.write(datetime.datetime.now().isoformat())
for id_, job in _RUNNING_JOBS.items():
file_.write(f"{id_}: {job.module} {job.event_name} {job.repository} [{job.priority}]\n")
await asyncio.sleep(60)


async def _async_main() -> None:
"""Process the jobs present in the database queue."""
parser = argparse.ArgumentParser(description=__doc__)
Expand Down Expand Up @@ -817,6 +839,8 @@ async def _async_main() -> None:
if not args.exit_when_empty:
threads_call.append(_UpdateCounter(Session)())

threads_call.append(_watch_dog())

for priority in priority_groups:
threads_call.append(_Run(config, Session, args.exit_when_empty, priority)())
await asyncio.gather(*threads_call)
Expand Down

0 comments on commit b2ad543

Please sign in to comment.