Skip to content

Commit

Permalink
Perform duplicate check using checksum / task payload
Browse files Browse the repository at this point in the history
  • Loading branch information
dchhabda committed Dec 24, 2024
1 parent dac3580 commit 70f0a6d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 39 deletions.
5 changes: 5 additions & 0 deletions pybossa/api/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from pybossa.auditlogger import AuditLogger
from pybossa.data_access import ensure_user_assignment_to_project, set_default_amp_store
from sqlalchemy.orm.base import _entity_descriptor
from pybossa.cache import delete_memoized
from pybossa.cache.projects import get_project_data

auditlogger = AuditLogger(auditlog_repo, caller='api')

Expand Down Expand Up @@ -205,3 +207,6 @@ def _select_attributes(self, data):
else:
data = self._filter_private_data(data)
return data

def _after_save(self, original_data, instance):
delete_memoized(get_project_data, instance.id)
8 changes: 7 additions & 1 deletion pybossa/api/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import copy
from pybossa.task_creator_helper import get_task_expiration
from pybossa.model import make_timestamp
from pybossa.task_creator_helper import generate_checksum
from pybossa.cache.projects import get_project_data



class TaskAPI(APIBase):
Expand Down Expand Up @@ -90,7 +93,10 @@ def _preprocess_post_data(self, data):
hdfs_task = any([val.startswith("/fileproxy/hdfs/") for val in info.values() if isinstance(val, str)])
if hdfs_task:
raise BadRequest("Invalid task payload. HDFS is not supported")
duplicate = task_repo.find_duplicate(project_id=project_id, info=info)
project = get_project_data(project_id)
dup_checksum = generate_checksum(project=project, task=data)
data["dup_checksum"] = dup_checksum
duplicate = task_repo.find_duplicate(project_id=project_id, info=info, dup_checksum=dup_checksum)
if duplicate:
message = {
'reason': 'DUPLICATE_TASK',
Expand Down
8 changes: 4 additions & 4 deletions pybossa/importers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from pybossa.task_creator_helper import set_gold_answers, upload_files_priv, get_task_expiration
from pybossa.data_access import data_access_levels
from pybossa.model import make_timestamp
from pybossa.task_creator_helper import generate_checksum


def validate_s3_bucket(task, *args):
Expand Down Expand Up @@ -211,15 +212,14 @@ def create_tasks(self, task_repo, project, importer=None, **form_data):
# As tasks are getting created, pass current date as create_date
create_date = make_timestamp()
task_data['expiration'] = get_task_expiration(task_data.get('expiration'), create_date)

task = Task(project_id=project.id, n_answers=n_answers)
dup_checksum = generate_checksum(project=project, task=task_data)
task = Task(project_id=project.id, n_answers=n_answers, dup_checksum=dup_checksum)
[setattr(task, k, v) for k, v in task_data.items()]

gold_answers = task_data.pop('gold_answers', None)
set_gold_answers(task, gold_answers)

found = task_repo.find_duplicate(project_id=project.id,
info=task.info)
info=task.info, dup_checksum=task.dup_checksum)
if found is not None:
continue
if not validator.validate(task):
Expand Down
37 changes: 24 additions & 13 deletions pybossa/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from sqlalchemy import or_
from sqlalchemy.sql import case as sqlalchemy_case
from pybossa.task_creator_helper import get_task_expiration
from pybossa.task_creator_helper import generate_checksum
import time


Expand Down Expand Up @@ -185,7 +184,6 @@ def save(self, element, clean_project=True):
# set task default expiration
if element.__class__.__name__ == "Task":
element.expiration = get_task_expiration(element.expiration, make_timestamp())
element.dup_checksum = generate_checksum(element)
self.db.session.add(element)
self.db.session.commit()
if clean_project:
Expand Down Expand Up @@ -497,22 +495,35 @@ def update_priority(self, project_id, priority, filters):
self.db.session.commit()
cached_projects.clean_project(project_id)

def find_duplicate(self, project_id, info):
def find_duplicate(self, project_id, info, dup_checksum=None):
"""
Find a task id in the given project with the project info using md5
index on info column casted as text. Md5 is used to avoid key size
limitations in BTree indices
"""
sql = text('''
SELECT task.id as task_id
FROM task
WHERE task.project_id=:project_id
AND task.state='ongoing'
AND md5(task.info::text)=md5(((:info)::jsonb)::text)
''')
info = json.dumps(info, allow_nan=False)
row = self.db.session.execute(
sql, dict(info=info, project_id=project_id)).first()

# with task payload containing dup_checksum value, perform duplicate
# check based on checkum instead of comparing entire task payload info
if dup_checksum:
sql = text('''
SELECT task.id as task_id
FROM task
WHERE task.project_id=:project_id
AND task.dup_checksum=:dup_checksum
''')
row = self.db.session.execute(
sql, dict(project_id=project_id, dup_checksum=dup_checksum)).first()
else:
sql = text('''
SELECT task.id as task_id
FROM task
WHERE task.project_id=:project_id
AND task.state='ongoing'
AND md5(task.info::text)=md5(((:info)::jsonb)::text)
''')
info = json.dumps(info, allow_nan=False)
row = self.db.session.execute(
sql, dict(info=info, project_id=project_id)).first()
if row:
return row[0]

Expand Down
18 changes: 9 additions & 9 deletions pybossa/task_creator_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,29 +181,29 @@ def read_encrypted_file(store, project, bucket, key_name):
return decrypted, key


def generate_checksum(task):
def generate_checksum(project, task):
from pybossa.cache.projects import get_project_data
from pybossa.core import private_required_fields

if not (task and isinstance(task.info, dict)):
if not (task and isinstance(task, dict) and "info" in task):
return

project = get_project_data(task.project_id)
if not project:
return

task_info = task["info"]
dup_task_config = project.info.get("duplicate_task_check")
if not dup_task_config:
return

dup_fields_configured = dup_task_config.get("duplicate_fields", [])
# include all task.info fields with no field configured under duplicate_fields
# include all task_info fields with no field configured under duplicate_fields

secret = get_encryption_key(project) if current_app.config.get("PRIVATE_INSTANCE") else None
cipher = AESWithGCM(secret) if secret else None
task_contents = {}
if current_app.config.get("PRIVATE_INSTANCE"):
for field, value in task.info.items():
for field, value in task_info.items():
if field in private_required_fields:
continue
if field.endswith("__upload_url"):
Expand All @@ -216,19 +216,19 @@ def generate_checksum(task):
current_app.logger.info("duplicate task checksum error parsing task payload for project %d, %s", project.id, str(e))
return
elif field == "private_json__encrypted_payload":
encrypted_content = task.info.get("private_json__encrypted_payload")
encrypted_content = task_info.get("private_json__encrypted_payload")
content = cipher.decrypt(encrypted_content) if cipher else encrypted_content
content = json.loads(content)
task_contents.update(content)
else:
task_contents[field] = value
else:
# public instance has all task fields under task.info
task_contents = task.info
# public instance has all task fields under task_info
task_contents = task_info

checksum_fields = task_contents.keys() if not dup_fields_configured else dup_fields_configured
checksum_payload = {field:task_contents[field] for field in checksum_fields}
current_app.logger.info("Project %d duplicate check fields %s", task.project_id, str(list(checksum_fields)))
current_app.logger.info("Project %d duplicate check fields %s", project.id, str(list(checksum_fields)))
checksum = hashlib.sha256()
checksum.update(json.dumps(checksum_payload, sort_keys=True).encode("utf-8"))
checksum_value = checksum.hexdigest()
Expand Down
26 changes: 14 additions & 12 deletions test/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -10308,9 +10308,6 @@ def test_generate_checksum_errors(self, mock_read_enc_file, mock_get_enc_key):
subadmin = UserFactory.create(subadmin=True)
self.signin_user(subadmin)

# no checksum returned for missing/none task
assert generate_checksum(task=None) == None

# exception handled for bad priv data under file
# so that no checksum is returned
project = ProjectFactory.create(
Expand All @@ -10321,6 +10318,10 @@ def test_generate_checksum_errors(self, mock_read_enc_file, mock_get_enc_key):
"duplicate_fields": ["a", "c"]
}
})

# no checksum returned for missing/none task
assert generate_checksum(project=project, task=None) == None

task_info = {
"priv_data__upload_url": f"/fileproxy/encrypted/bcosv2-dev/testbucket/{project.id}/path/contents.txt"
}
Expand All @@ -10330,7 +10331,7 @@ def test_generate_checksum_errors(self, mock_read_enc_file, mock_get_enc_key):
)

# with bad data under file, exception was handled and no checksum returned
checksum = generate_checksum(task=task)
checksum = generate_checksum(project=project, task=task)
assert not checksum

@with_context
Expand All @@ -10357,7 +10358,7 @@ def test_generate_checksum_missing_project(self, mock_project):
)

# with missing project no checksum returned
checksum = generate_checksum(task=task)
checksum = generate_checksum(project=None, task=task)
assert not checksum

@with_context
Expand All @@ -10384,12 +10385,11 @@ def test_generate_checksum_public_data(self):

task = TaskFactory.create(
project=project,
info={"a": 1, "b": 2, "c": 3}
info={"a": 1, "b": 2, "c": 3},
dup_checksum=expected_checksum
)
# confirm task payload populated with checksum generated
task.dup_checksum == expected_checksum
checksum = generate_checksum(task)
assert checksum == expected_checksum
assert task.dup_checksum == expected_checksum, task.dup_checksum

# project w/o duplicate checksum configured gets
# tasks created with null checksum value
Expand Down Expand Up @@ -10438,8 +10438,9 @@ def test_checksum_private_data_files(self, mock_read_enc_file, mock_get_enc_key)
project=project,
info=task_info
)
checksum = generate_checksum(task)
assert checksum == expected_checksum
task_payload = {"id": task.id, "info": task.info}
checksum = generate_checksum(project= project, task=task_payload)
assert checksum == expected_checksum, checksum

@with_context
@patch("pybossa.task_creator_helper.get_encryption_key")
Expand Down Expand Up @@ -10489,7 +10490,8 @@ def test_generate_checksum_encrypted_payload(self, mock_get_enc_key):
project=project,
info=task_info
)
checksum = generate_checksum(task)
task_payload = {"id": task.id, "info": task.info}
checksum = generate_checksum(project=project, task=task_payload)
assert checksum == expected_checksum

class TestWebUserMetadataUpdate(web.Helper):
Expand Down

0 comments on commit 70f0a6d

Please sign in to comment.