Skip to content

Commit

Permalink
Merge pull request #833 from openzim/fix_periodic_scheduling
Browse files Browse the repository at this point in the history
Fix periodic scheduling + most_recent_task update
  • Loading branch information
rgaudin authored Sep 28, 2023
2 parents 04450d0 + a2fe1e0 commit 580128f
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 116 deletions.
2 changes: 1 addition & 1 deletion dispatcher/backend/src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def add_to_debug_if_present(
flag_modified(task, "files") # mark 'files' as modified

session.flush() # we have to flush first to avoid circular dependency
if schedule:
if schedule and code == TaskStatus.reserved:
schedule.most_recent_task = task

if code == TaskStatus.scraper_completed and schedule:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import datetime
from typing import Any, Dict

import sqlalchemy as sa
import sqlalchemy.orm as so

import db.models as dbm
from common import getnow
from common.enum import TaskStatus
from db import Session
from utils.scheduling import request_tasks_using_schedule


class TestPeriodicScheduling:
def get_requested_tasks_for_schedule(
self, session: so.Session, schedule: Dict[str, Any]
):
"""return the list of requested tasks for a given schedule"""
return session.execute(
sa.select(dbm.RequestedTask).where(
dbm.RequestedTask.schedule_id == schedule["_id"]
)
).all()

def assert_number_of_requested_tasks_for_schedule(
self, schedule: Dict[str, Any], expected: int
):
"""assert that the number of requested tasks for a given schedule is expected"""
with Session.begin() as session:
req_tasks = self.get_requested_tasks_for_schedule(session, schedule)
assert len(req_tasks) == expected

def test_periodic_scheduling_simple(self, temp_schedule):
"""simple tests of periodic scheduling"""
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0)
request_tasks_using_schedule()
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 1)
request_tasks_using_schedule() # request again should not duplicate requests
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 1)

def test_periodic_scheduling_most_recent_recent(self, temp_schedule, make_task):
"""most recent task is too recent => do not request it again"""
most_recent = make_task(temp_schedule["name"])
now = getnow()
with Session.begin() as session:
task = dbm.Task.get(session, most_recent["_id"])
task.timestamp = {
TaskStatus.requested: now - datetime.timedelta(hours=4),
TaskStatus.reserved: now - datetime.timedelta(hours=3, minutes=59),
TaskStatus.started: now - datetime.timedelta(hours=3, minutes=58),
TaskStatus.succeeded: now - datetime.timedelta(hours=1),
}
schedule_obj = dbm.Schedule.get(session, temp_schedule["name"])
schedule_obj.most_recent_task = task
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0)
request_tasks_using_schedule()
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0)

def test_periodic_scheduling_most_recent_overdue_complete(
self, temp_schedule, make_task
):
"""most recent task is overdue and not running => request a new execution"""
most_recent = make_task(temp_schedule["name"])
now = getnow()
with Session.begin() as session:
task = dbm.Task.get(session, most_recent["_id"])
task.timestamp = {
TaskStatus.requested: now - datetime.timedelta(days=40, hours=4),
TaskStatus.reserved: now
- datetime.timedelta(days=40, hours=3, minutes=59),
TaskStatus.started: now
- datetime.timedelta(days=40, hours=3, minutes=58),
TaskStatus.succeeded: now - datetime.timedelta(days=40, hours=1),
}
schedule_obj = dbm.Schedule.get(session, temp_schedule["name"])
schedule_obj.most_recent_task = task
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0)
request_tasks_using_schedule()
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 1)

def test_periodic_scheduling_most_recent_overdue_running(
self, temp_schedule, make_task
):
"""most recent task is overdue but still running => do not request it again"""
most_recent = make_task(temp_schedule["name"])
now = getnow()
with Session.begin() as session:
task = dbm.Task.get(session, most_recent["_id"])
task.timestamp = {
TaskStatus.requested: now - datetime.timedelta(days=40, hours=4),
TaskStatus.reserved: now
- datetime.timedelta(days=40, hours=3, minutes=59),
TaskStatus.started: now
- datetime.timedelta(days=40, hours=3, minutes=58),
}
task.status = TaskStatus.started
schedule_obj = dbm.Schedule.get(session, temp_schedule["name"])
schedule_obj.most_recent_task = task
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0)
request_tasks_using_schedule()
self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import db.models as dbm
from common.enum import TaskStatus
from common.utils import task_event_handler
from db import Session


class TestTaskEvents:
def test_task_event_reserved_updates_most_recent_task(
self, temp_schedule, make_task, worker
):
task = make_task(schedule_name=temp_schedule["name"])
with Session.begin() as session:
schedule = dbm.Schedule.get(session, temp_schedule["name"])
assert schedule.most_recent_task_id is None
task_event_handler(
session, task["_id"], TaskStatus.reserved, {"worker": worker["name"]}
)
assert schedule.most_recent_task_id == task["_id"]

def test_task_event_started_does_not_updates_most_recent_task(
self, temp_schedule, make_task, worker
):
task = make_task(schedule_name=temp_schedule["name"])
with Session.begin() as session:
schedule = dbm.Schedule.get(session, temp_schedule["name"])
assert schedule.most_recent_task_id is None
task_event_handler(session, task["_id"], TaskStatus.started, {})
assert schedule.most_recent_task_id is None
136 changes: 136 additions & 0 deletions dispatcher/backend/src/tests/integration/routes/conftest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import datetime

import pytest
import sqlalchemy as sa
from werkzeug.security import generate_password_hash

import db.models as dbm
from common import getnow
from common.enum import TaskStatus
from common.roles import ROLES
from db import Session
from utils.offliners import expanded_config


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -70,6 +72,7 @@ def _make_schedule(
session.flush()
garbage_collector.add_schedule_id(schedule.id)
document = {
"_id": schedule.id,
"name": schedule.name,
"category": schedule.category,
"enabled": schedule.enabled,
Expand All @@ -93,6 +96,31 @@ def schedule(make_schedule):
return make_schedule()


@pytest.fixture
def temp_schedule(make_schedule):
"""build a temporary schedule which will be deleted at the end of the test
NB: associated tasks and requested tasks are also deleted
"""
schedule = make_schedule(name="periodic_sched_test")
yield schedule
with Session.begin() as session:
schedule_obj = dbm.Schedule.get(session, schedule["name"])
schedule_obj.most_recent_task = None
session.flush()
session.execute(
sa.delete(dbm.RequestedTask).where(
dbm.RequestedTask.schedule_id == schedule["_id"]
)
)
session.execute(
sa.delete(dbm.Task).where(dbm.Task.schedule_id == schedule["_id"])
)
session.execute(
sa.delete(dbm.Schedule).where(dbm.Schedule.id == schedule["_id"])
)


@pytest.fixture(scope="module")
def schedules(make_schedule, make_config, make_language):
schedules = []
Expand Down Expand Up @@ -403,3 +431,111 @@ def workers(make_worker, make_user, make_config, make_language, deleted_user):
workers.append(worker)

return workers


@pytest.fixture(scope="module")
def make_task(make_event, make_schedule, make_config, worker, garbage_collector):
def _make_task(
schedule_name="schedule_name",
status=TaskStatus.succeeded,
):
now = getnow()
if status == TaskStatus.requested:
events = [TaskStatus.requested]
elif status == TaskStatus.reserved:
events = [TaskStatus.requested, TaskStatus.reserved]
elif status == TaskStatus.started:
events = [TaskStatus.requested, TaskStatus.reserved, TaskStatus.started]
elif status == TaskStatus.succeeded:
events = [
TaskStatus.requested,
TaskStatus.reserved,
TaskStatus.started,
TaskStatus.succeeded,
]
else:
events = [
TaskStatus.requested,
TaskStatus.reserved,
TaskStatus.started,
TaskStatus.failed,
]

timestamp = {event: now for event in events}
events = [make_event(event, timestamp[event]) for event in events]
container = {
"command": "mwoffliner --mwUrl=https://example.com",
"image": {"name": "mwoffliner", "tag": "1.8.0"},
"exit_code": 0,
"stderr": "example_stderr",
"stdout": "example_stdout",
}
debug = {"args": [], "kwargs": {}}

if status == TaskStatus.failed:
debug["exception"] = "example_exception"
debug["traceback"] = "example_traceback"
files = {}
else:
files = {"mwoffliner_1.zim": {"name": "mwoffliner_1.zim", "size": 1000}}
config = expanded_config(make_config())
with Session.begin() as session:
worker_obj = dbm.Worker.get(session, worker["name"])
schedule = dbm.Schedule.get(session, schedule_name, run_checks=False)
if schedule is None:
make_schedule(schedule_name)
schedule = dbm.Schedule.get(session, schedule_name)
task = dbm.Task(
updated_at=now,
events=events,
debug=debug,
status=status,
timestamp=timestamp,
requested_by="bob",
canceled_by=None,
container=container,
priority=1,
config=config,
notification={},
files=files,
upload={},
original_schedule_name=schedule_name,
)
task.schedule_id = schedule.id
task.worker_id = worker_obj.id
session.add(task)
session.flush()
garbage_collector.add_task_id(task.id)

return {
"_id": task.id,
"status": task.status,
"worker": worker_obj.name,
"schedule_name": schedule.name,
"timestamp": task.timestamp,
"events": task.events,
"container": task.container,
"debug": task.debug,
"files": task.files,
}

yield _make_task


@pytest.fixture(scope="module")
def tasks(make_task):
tasks = []
for i in range(5):
tasks += [
make_task(status=TaskStatus.requested),
make_task(status=TaskStatus.reserved),
make_task(status=TaskStatus.started),
make_task(status=TaskStatus.succeeded),
make_task(status=TaskStatus.failed),
]
return tasks


@pytest.fixture(scope="module")
def task(make_task):
return make_task(status=TaskStatus.succeeded)
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def test_get_schedule_with_name(self, client, schedule):
assert "most_recent_task" in response_json
response_json.pop("duration", None)
response_json.pop("most_recent_task", None)
schedule.pop("_id")

assert response_json == schedule

Expand Down
Loading

0 comments on commit 580128f

Please sign in to comment.