Skip to content

Commit

Permalink
RDISCROWD-7806: Remove bfs, dfs unused schedulers (#1013)
Browse files Browse the repository at this point in the history
* remove bfs, dfs unused schedulers

* remove incremental sched
  • Loading branch information
dchhabda authored Dec 11, 2024
1 parent 78d5c56 commit 4b9ddf7
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 1,070 deletions.
44 changes: 0 additions & 44 deletions pybossa/model/counter.py

This file was deleted.

157 changes: 1 addition & 156 deletions pybossa/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from pybossa.model import DomainObject
from pybossa.model.task import Task
from pybossa.model.task_run import TaskRun
from pybossa.model.counter import Counter
from pybossa.core import db, sentinel, project_repo, task_repo
from .redis_lock import (LockManager, get_active_user_key, get_user_tasks_key,
get_task_users_key, get_task_id_project_id_key,
Expand Down Expand Up @@ -64,12 +63,8 @@ def new_task(project_id, sched, user_id=None, user_ip=None,
"""Get a new task by calling the appropriate scheduler function."""
sched_map = {
'default': get_locked_task,
'breadth_first': get_breadth_first_task,
'depth_first': get_depth_first_task,
Schedulers.locked: get_locked_task,
'incremental': get_incremental_task,
Schedulers.user_pref: get_user_pref_task,
'depth_first_all': get_depth_first_all_task,
Schedulers.task_queue: get_user_pref_task
}
scheduler = sched_map.get(sched, sched_map['default'])
Expand Down Expand Up @@ -138,125 +133,6 @@ def after_save(task_run, conn):
release_reserve_task_lock_by_id(task_run.project_id, task_run.task_id, uid, TIMEOUT)


def get_breadth_first_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1, orderby='id',
desc=False, **kwargs):
"""Get a new task which have the least number of task runs."""
project_query = session.query(Task.id).filter(Task.project_id==project_id,
Task.state!='completed',
Task.state!='enrich')
if user_id and not user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id,
user_id=user_id)
else:
if not user_ip: # pragma: no cover
user_ip = '127.0.0.1'
if user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id,
user_ip=user_ip)
else:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id,
external_uid=external_uid)

tmp = project_query.except_(subquery)
query = session.query(Task, func.sum(Counter.n_task_runs).label('n_task_runs'))\
.filter(Task.id==Counter.task_id)\
.filter(Counter.task_id.in_(tmp))\
.filter(or_(Task.expiration == None, Task.expiration > datetime.utcnow()))\
.group_by(Task.id)\
.order_by(text('n_task_runs ASC'))\

query = _set_orderby_desc(query, orderby, desc)
data = query.limit(limit).offset(offset).all()
return _handle_tuples(data)


def get_depth_first_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1,
orderby='priority_0', desc=True, **kwargs):
"""Get a new task for a given project."""
tasks = get_candidate_task_ids(project_id, user_id,
user_ip, external_uid, limit, offset,
orderby=orderby, desc=desc)
return tasks


def get_depth_first_all_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1,
orderby='priority_0', desc=True, **kwargs):
"""Get a new task for a given project."""
tasks = get_candidate_task_ids(project_id, user_id,
user_ip, external_uid, limit, offset,
orderby=orderby, desc=desc, completed=False)
return tasks


def get_incremental_task(project_id, user_id=None, user_ip=None,
external_uid=None, offset=0, limit=1, orderby='id',
desc=False, **kwargs):
"""Get a new task for a given project with its last given answer.
It is an important strategy when dealing with large tasks, as
transcriptions.
"""
candidate_tasks = get_candidate_task_ids(project_id, user_id, user_ip,
external_uid, limit, offset,
orderby='priority_0', desc=True)
total_remaining = len(candidate_tasks)
if total_remaining == 0:
return None
rand = random.randrange(0, total_remaining)
task = candidate_tasks[rand]
# Find last answer for the task
q = session.query(TaskRun)\
.filter(TaskRun.task_id == task.id)\
.order_by(TaskRun.finish_time.desc())
last_task_run = q.first()
if last_task_run:
task.info['last_answer'] = last_task_run.info
# TODO: As discussed in GitHub #53
# it is necessary to create a lock in the task!
return [task]


def get_candidate_task_ids(project_id, user_id=None, user_ip=None,
external_uid=None, limit=1, offset=0,
orderby='priority_0', desc=True, completed=True):
"""Get all available tasks for a given project and user."""
data = None
if user_id and not user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_id=user_id)
else:
if not user_ip:
user_ip = '127.0.0.1'
if user_ip and not external_uid:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, user_ip=user_ip)
else:
subquery = session.query(TaskRun.task_id).filter_by(project_id=project_id, external_uid=external_uid)

query = (
session.query(Task)
.filter(and_(~Task.id.in_(subquery.subquery()),
Task.project_id == project_id,
Task.state != 'completed',
Task.state != 'enrich'))
.filter(or_(Task.expiration == None, Task.expiration > datetime.utcnow()))

if completed else # completed means filter out completed

session.query(Task)
.filter(and_(
~Task.id.in_(subquery.subquery()),
Task.project_id == project_id,
Task.state != 'enrich'
))
)

query = _set_orderby_desc(query, orderby, desc)
data = query.limit(limit).offset(offset).all()
return _handle_tuples(data)


def locked_scheduler(query_factory):
@wraps(query_factory)
def template_get_locked_task(project_id, user_id=None, user_ip=None,
Expand Down Expand Up @@ -850,11 +726,9 @@ def get_project_scheduler(project_id, conn):


def sched_variants():
return [('default', 'Default'), ('breadth_first', 'Breadth First'),
('depth_first', 'Depth First'),
return [('default', 'Default'),
(Schedulers.locked, 'Locked'),
(Schedulers.user_pref, 'User Preference Scheduler'),
('depth_first_all', 'Depth First All'),
(Schedulers.task_queue, 'Task Queues')
]

Expand All @@ -864,32 +738,3 @@ def randomizable_scheds():
if DEFAULT_SCHEDULER in scheds:
scheds.append('default')
return scheds


def _set_orderby_desc(query, orderby, descending):
"""Set order by to query."""
if orderby == 'fav_user_ids':
n_favs = func.coalesce(func.array_length(Task.fav_user_ids, 1), 0).label('n_favs')
query = query.add_column(n_favs)
if descending:
query = query.order_by(desc("n_favs"))
else:
query = query.order_by("n_favs")
else:
if descending:
query = query.order_by(getattr(Task, orderby).desc())
else:
query = query.order_by(getattr(Task, orderby).asc())
query = query.order_by(Task.id.asc())
return query


def _handle_tuples(data):
"""Handle tuples when query returns several columns."""
tmp = []
for datum in data:
if isinstance(datum, DomainObject):
tmp.append(datum)
else:
tmp.append(datum[0])
return tmp
1 change: 0 additions & 1 deletion test/test_api/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from nose.tools import assert_equal

from pybossa.api.task import TaskAPI
from pybossa.model.counter import Counter
from pybossa.repositories import ProjectRepository
from pybossa.repositories import ResultRepository
from pybossa.repositories import TaskRepository
Expand Down
7 changes: 3 additions & 4 deletions test/test_auditlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,10 @@ def test_project_update_scheduler(self):

owner_id = project.owner.id
owner_name = project.owner.name
data = {'info': {'sched': 'depth_first', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
data = {'info': {'sched': 'task_queue_scheduler', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
url = '/api/project/%s?api_key=%s' % (project.id, project.owner.api_key)
self.app.put(url, data=json.dumps(data))
logs = auditlog_repo.filter_by(project_id=project.id)

assert len(logs) == 1, logs
for log in logs:
assert log.user_id == owner_id, log.user_id
Expand All @@ -221,7 +220,7 @@ def test_project_update_two_info_objects(self):

owner_id = project.owner.id
owner_name = project.owner.name
data = {'info': {'sched': 'depth_first', 'task_presenter': 'new', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
data = {'info': {'sched': 'task_queue_scheduler', 'task_presenter': 'new', 'data_classification': dict(input_data="L4 - public", output_data="L4 - public")}}
attributes = list(data['info'].keys())
url = '/api/project/%s?api_key=%s' % (project.id, project.owner.api_key)
self.app.put(url, data=json.dumps(data))
Expand Down Expand Up @@ -564,7 +563,7 @@ def test_project_task_scheduler(self):

attribute = 'sched'

new_string = 'depth_first'
new_string = 'locked_scheduler'

old_value = 'default'

Expand Down
40 changes: 0 additions & 40 deletions test/test_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,46 +119,6 @@ def test_anonymous_03_respects_limit_tasks(self):
assert data.get('id'), data


@with_context
def test_newtask_default_orderby(self):
"""Test SCHED depth first works with orderby."""
project = ProjectFactory.create(info=dict(sched="depth_first"))
task1 = TaskFactory.create(project=project, fav_user_ids=None)
task2 = TaskFactory.create(project=project, fav_user_ids=[1,2,3])
api_key = project.owner.api_key

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'id', False, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task1.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'id', True, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task2.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'created', False, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task1.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'created', True, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task2.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'fav_user_ids', False, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task1.id, data

url = "/api/project/%s/newtask?orderby=%s&desc=%s&api_key=%s" % (project.id, 'fav_user_ids', True, api_key)
res = self.app.get(url)
data = json.loads(res.data)
assert data['id'] == task2.id, data
assert data['fav_user_ids'] == task2.fav_user_ids, data


@with_context
def test_user_01_newtask(self):
""" Test SCHED newtask returns a Task for John Doe User"""
Expand Down
70 changes: 0 additions & 70 deletions test/test_sched_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,73 +55,3 @@ def test_get_active_users_lock(self):
time.sleep(EXPIRE_LOCK_DELAY + 1)
count = get_active_user_count(project.id, sentinel.master)
assert not count

# Tests
@with_request_context
@patch('pybossa.api.pwd_manager.ProjectPasswdManager.password_needed')
@patch('pybossa.api.task_run.request')
def test_incremental_tasks(self, mock_request, passwd_needed):
""" Test incremental SCHED strategy - second TaskRun receives first given answer"""
passwd_needed.return_value = False
self.create_2(sched='incremental')
mock_request.remote_addr = '127.0.0.0'

# Del previous TaskRuns
self.del_task_runs()

# Register
self.register(fullname="John Doe", name="johndoe", password="p4ssw0rd")
self.signout()
self.register(fullname="Marie Doe", name="mariedoe", password="dr0wss4p")
self.signout()
self.register(fullname="Mario Doe", name="mariodoe", password="dr0wss4p")
self.signout()
self.signin()

# Get the only task with no runs!
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
print("Task:%s" % data['id'])
# Check that we received a clean Task
assert data.get('info'), data
assert not data.get('info').get('last_answer')

# Submit an Answer for the assigned task
tr = dict(project_id=data['project_id'], task_id=data['id'], info={'answer': 'No'})
tr = json.dumps(tr)

res = self.app.post('/api/taskrun', data=tr)

# No more tasks available for this user!
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)
assert not data, data

#### Get the only task now with an answer as Mario!
self.signout()
self.signin(email="[email protected]", password="dr0wss4p")
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)

# Check that we received a Task with answer
assert data.get('info'), data
assert data.get('info').get('last_answer').get('answer') == 'No'

# Submit a second Answer as Mario
tr = dict(project_id=data['project_id'], task_id=data['id'],
info={'answer': 'No No'})
tr = json.dumps(tr)

res = self.app.post('/api/taskrun', data=tr)
# no anonymous contributions
assert res.status_code == 200
self.signout()

#### Get the only task now with an answer as User2!
self.signin(email="[email protected]", password="dr0wss4p")
res = self.app.get('api/project/1/newtask')
data = json.loads(res.data)

# Check that we received a Task with answer
assert data.get('info'), data
assert data.get('info').get('last_answer').get('answer') == 'No No'
Loading

0 comments on commit 4b9ddf7

Please sign in to comment.