Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDISCROWD-7806: Remove bfs, dfs unused schedulers #1013

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions pybossa/model/counter.py

This file was deleted.

157 changes: 1 addition & 156 deletions pybossa/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from pybossa.model import DomainObject
from pybossa.model.task import Task
from pybossa.model.task_run import TaskRun
from pybossa.model.counter import Counter
from pybossa.core import db, sentinel, project_repo, task_repo
from .redis_lock import (LockManager, get_active_user_key, get_user_tasks_key,
get_task_users_key, get_task_id_project_id_key,
Expand Down Expand Up @@ -64,12 +63,8 @@ def new_task(project_id, sched, user_id=None, user_ip=None,
"""Get a new task by calling the appropriate scheduler function."""
sched_map = {
'default': get_locked_task,
'breadth_first': get_breadth_first_task,
'depth_first': get_depth_first_task,
Schedulers.locked: get_locked_task,
'incremental': get_incremental_task,
Schedulers.user_pref: get_user_pref_task,
'depth_first_all': get_depth_first_all_task,
Schedulers.task_queue: get_user_pref_task
}
scheduler = sched_map.get(sched, sched_map['default'])
Expand Down Expand Up @@ -138,125 +133,6 @@ def after_save(task_run, conn):
release_reserve_task_lock_by_id(task_run.project_id, task_run.task_id, uid, TIMEOUT)


def get_breadth_first_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1, orderby='id',
desc=False, **kwargs):
"""Get a new task which have the least number of task runs."""
project_query = session.query(Task.id).filter(Task.project_id==project_id,
Task.state!='completed',
Task.state!='enrich')
if user_id and not user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id,
user_id=user_id)
else:
if not user_ip: # pragma: no cover
user_ip = '127.0.0.1'
if user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id,
user_ip=user_ip)
else:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id,
external_uid=external_uid)

tmp = project_query.except_(subquery)
query = session.query(Task, func.sum(Counter.n_task_runs).label('n_task_runs'))\
.filter(Task.id==Counter.task_id)\
.filter(Counter.task_id.in_(tmp))\
.filter(or_(Task.expiration == None, Task.expiration > datetime.utcnow()))\
.group_by(Task.id)\
.order_by(text('n_task_runs ASC'))\

query = _set_orderby_desc(query, orderby, desc)
data = query.limit(limit).offset(offset).all()
return _handle_tuples(data)


def get_depth_first_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1,
orderby='priority_0', desc=True, **kwargs):
"""Get a new task for a given project."""
tasks = get_candidate_task_ids(project_id, user_id,
user_ip, external_uid, limit, offset,
orderby=orderby, desc=desc)
return tasks


def get_depth_first_all_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1,
orderby='priority_0', desc=True, **kwargs):
"""Get a new task for a given project."""
tasks = get_candidate_task_ids(project_id, user_id,
user_ip, external_uid, limit, offset,
orderby=orderby, desc=desc, completed=False)
return tasks


def get_incremental_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1, orderby='id',
desc=False, **kwargs):
"""Get a new task for a given project with its last given answer.

It is an important strategy when dealing with large tasks, as
transcriptions.
"""
candidate_tasks = get_candidate_task_ids(project_id, user_id, user_ip,
external_uid, limit, offset,
orderby='priority_0', desc=True)
total_remaining = len(candidate_tasks)
if total_remaining == 0:
return None
rand = random.randrange(0, total_remaining)
task = candidate_tasks[rand]
# Find last answer for the task
q = session.query(TaskRun)\
.filter(TaskRun.task_id == task.id)\
.order_by(TaskRun.finish_time.desc())
last_task_run = q.first()
if last_task_run:
task.info['last_answer'] = last_task_run.info
# TODO: As discussed in GitHub #53
# it is necessary to create a lock in the task!
return [task]


def get_candidate_task_ids(project_id, user_id=None, user_ip=None,
external_uid=None, limit=1, offset=0,
orderby='priority_0', desc=True, completed=True):
"""Get all available tasks for a given project and user."""
data = None
if user_id and not user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_id=user_id)
else:
if not user_ip:
user_ip = '127.0.0.1'
if user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_ip=user_ip)
else:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, external_uid=external_uid)

query = (
session.query(Task)
.filter(and_(~Task.id.in_(subquery.subquery()),
Task.project_id == project_id,
Task.state != 'completed',
Task.state != 'enrich'))
.filter(or_(Task.expiration == None, Task.expiration > datetime.utcnow()))

if completed else # completed means filter out completed

session.query(Task)
.filter(and_(
~Task.id.in_(subquery.subquery()),
Task.project_id == project_id,
Task.state != 'enrich'
))
)

query = _set_orderby_desc(query, orderby, desc)
data = query.limit(limit).offset(offset).all()
return _handle_tuples(data)


def locked_scheduler(query_factory):
@wraps(query_factory)
def template_get_locked_task(project_id, user_id=None, user_ip=None,
Expand Down Expand Up @@ -850,11 +726,9 @@ def get_project_scheduler(project_id, conn):


def sched_variants():
return [('default', 'Default'), ('breadth_first', 'Breadth First'),
('depth_first', 'Depth First'),
return [('default', 'Default'),
(Schedulers.locked, 'Locked'),
(Schedulers.user_pref, 'User Preference Scheduler'),
('depth_first_all', 'Depth First All'),
(Schedulers.task_queue, 'Task Queues')
]

Expand All @@ -864,32 +738,3 @@ def randomizable_scheds():
if DEFAULT_SCHEDULER in scheds:
scheds.append('default')
return scheds


def _set_orderby_desc(query, orderby, descending):
"""Set order by to query."""
if orderby == 'fav_user_ids':
n_favs = func.coalesce(func.array_length(Task.fav_user_ids, 1), 0).label('n_favs')
query = query.add_column(n_favs)
if descending:
query = query.order_by(desc("n_favs"))
else:
query = query.order_by("n_favs")
else:
if descending:
query = query.order_by(getattr(Task, orderby).desc())
else:
query = query.order_by(getattr(Task, orderby).asc())
query = query.order_by(Task.id.asc())
return query


def _handle_tuples(data):
"""Handle tuples when query returns several columns."""
tmp = []
for datum in data:
if isinstance(datum, DomainObject):
tmp.append(datum)
else:
tmp.append(datum[0])
return tmp
1 change: 0 additions & 1 deletion test/test_api/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from nose.tools import assert_equal

from pybossa.api.task import TaskAPI
from pybossa.model.counter import Counter
from pybossa.repositories import ProjectRepository
from pybossa.repositories import ResultRepository
from pybossa.repositories import TaskRepository
Expand Down
7 changes: 3 additions & 4 deletions test/test_auditlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ def test_project_update_scheduler(self):

owner_id = project.owner.id
owner_name = project.owner.name
data = {'info': {'sched': 'depth_first', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
data = {'info': {'sched': 'task_queue_scheduler', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
url = '/api/project/%s?api_key=%s' % (project.id, project.owner.api_key)
self.app.put(url, data=json.dumps(data))
logs = auditlog_repo.filter_by(project_id=project.id)

assert len(logs) == 1, logs
for log in logs:
assert log.user_id == owner_id, log.user_id
Expand All @@ -221,7 +220,7 @@ def test_project_update_two_info_objects(self):

owner_id = project.owner.id
owner_name = project.owner.name
data = {'info': {'sched': 'depth_first', 'task_presenter': 'new', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
data = {'info': {'sched': 'task_queue_scheduler', 'task_presenter': 'new', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
attributes = list(data['info'].keys())
url = '/api/project/%s?api_key=%s' % (project.id, project.owner.api_key)
self.app.put(url, data=json.dumps(data))
Expand Down Expand Up @@ -564,7 +563,7 @@ def test_project_task_scheduler(self):

attribute = 'sched'

new_string = 'depth_first'
new_string = 'locked_scheduler'

old_value = 'default'

Expand Down
40 changes: 0 additions & 40 deletions test/test_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,46 +119,6 @@ def test_anonymous_03_respects_limit_tasks(self):
assert data.get('id'), data


@with_context
def test_newtask_default_orderby(self):
"""Test SCHED depth first works with orderby."""
project = ProjectFactory.create(info=dict(sched="depth_first"))
task1 = TaskFactory.create(project=project, fav_user_ids=None)
task2 = TaskFactory.create(project=project, fav_user_ids=[1,2,3])
api_key = project.owner.api_key

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'id', False, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task1.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'id', True, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task2.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'created', False, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task1.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'created', True, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task2.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'fav_user_ids', False, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task1.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'fav_user_ids', True, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task2.id, data
assert data['fav_user_ids'] == task2.fav_user_ids, data


@with_context
def test_user_01_newtask(self):
""" Test SCHED newtask returns a Task for John Doe User"""
Expand Down
70 changes: 0 additions & 70 deletions test/test_sched_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,73 +55,3 @@ def test_get_active_users_lock(self):
time.sleep(EXPIRE_LOCK_DELAY + 1)
count = get_active_user_count(project.id, sentinel.master)
assert not count

# Tests
@with_request_context
@patch('pybossa.api.pwd_manager.ProjectPasswdManager.password_needed')
@patch('pybossa.api.task_run.request')
def test_incremental_tasks(self, mock_request, passwd_needed):
""" Test incremental SCHED strategy - second TaskRun receives first given answer"""
passwd_needed.return_value = False
self.create_2(sched='incremental')
mock_request.remote_addr = '127.0.0.0'

# Del previous TaskRuns
self.del_task_runs()

# Register
self.register(fullname="John Doe", name="johndoe", password="p4ssw0rd")
self.signout()
self.register(fullname="Marie Doe", name="mariedoe", password="dr0wss4p")
self.signout()
self.register(fullname="Mario Doe", name="mariodoe", password="dr0wss4p")
self.signout()
self.signin()

# Get the only task with no runs!
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
print("Task:%s" % data['id'])
# Check that we received a clean Task
assert data.get('info'), data
assert not data.get('info').get('last_answer')

# Submit an Answer for the assigned task
tr = dict(project_id=data['project_id'], task_id=data['id'], info={'answer': 'No'})
tr = json.dumps(tr)

res = self.app.post('/api/taskrun', data=tr)

# No more tasks available for this user!
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
assert not data, data

#### Get the only task now with an answer as Mario!
self.signout()
self.signin(email="[email protected]", password="dr0wss4p")
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)

# Check that we received a Task with answer
assert data.get('info'), data
assert data.get('info').get('last_answer').get('answer') == 'No'

# Submit a second Answer as Mario
tr = dict(project_id=data['project_id'], task_id=data['id'],
info={'answer': 'No No'})
tr = json.dumps(tr)

res = self.app.post('/api/taskrun', data=tr)
# no anonymous contributions
assert res.status_code == 200
self.signout()

#### Get the only task now with an answer as User2!
self.signin(email="[email protected]", password="dr0wss4p")
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)

# Check that we received a Task with answer
assert data.get('info'), data
assert data.get('info').get('last_answer').get('answer') == 'No No'
Loading
Loading