Skip to content

Commit

Permalink
cleans up routes and makes facets more flexible
Browse files Browse the repository at this point in the history
  • Loading branch information
btylerburton committed Nov 18, 2024
1 parent 4917d82 commit 97a1144
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 174 deletions.
72 changes: 38 additions & 34 deletions app/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ def add_organization():
)


@mod.route("/organization/", methods=["GET"])
@mod.route("/organizations/", methods=["GET"])
def view_organizations():
organizations = db.get_all_organizations()
Expand Down Expand Up @@ -472,16 +471,16 @@ def view_harvest_source_data(source_id: str):
count=True,
skip_pagination=True,
source_id=source.id,
facets=["ckan_id != null"],
facets="ckan_id is not null",
)
error_records_count = db.get_harvest_records_by_source(
count=True,
skip_pagination=True,
source_id=source.id,
facets=["status = 'error'"],
facets="status = 'error'",
)
# TODO: wire in paginated jobs query
jobs = db.get_all_harvest_jobs_by_filter({"harvest_source_id": source.id})
# TODO: wire in paginated jobs htmx refresh ui & route
jobs = db.pget_harvest_jobs(facets=f"harvest_source_id = '{source.id}'")
next_job = "N/A"
future_jobs = db.get_new_harvest_jobs_by_source_in_future(source.id)
if len(future_jobs):
Expand Down Expand Up @@ -528,7 +527,6 @@ def view_harvest_source_data(source_id: str):
return render_template("view_source_data.html", data=data)


@mod.route("/harvest_source/", methods=["GET"])
@mod.route("/harvest_sources/", methods=["GET"])
def view_harvest_sources():
sources = db.get_all_harvest_sources()
Expand Down Expand Up @@ -745,18 +743,45 @@ def get_harvest_record(record_id):
def get_harvest_records():
job_id = request.args.get("harvest_job_id")
source_id = request.args.get("harvest_source_id")
page = request.args.get("page")
paginate = request.args.get("paginate", type=bool)
skip_pagination = request.args.get("skip_pagination", type=bool)
count = request.args.get("count", type=bool)
page = request.args.get("page", type=int)
facets = request.args.get("facets", "")
if job_id:
records = db.get_harvest_records_by_job(job_id, page)
if not records:
return "No harvest records found for this harvest job", 404
records = db.get_harvest_records_by_job(
job_id,
page=page,
paginate=paginate,
skip_pagination=skip_pagination,
facets=facets,
)

elif source_id:
records = db.get_harvest_records_by_source(source_id, page)
records = db.get_harvest_records_by_source(
source_id,
page=page,
paginate=paginate,
skip_pagination=skip_pagination,
count=count,
facets=facets,
)
if not records:
return "No harvest records found for this harvest source", 404
else:
records = db.pget_harvest_records(page)
return db._to_dict(records)
records = db.pget_harvest_records(
page=page,
paginate=paginate,
skip_pagination=skip_pagination,
facets=facets,
)

if not records:
return "No harvest records found for this query", 404
elif isinstance(records, int):
return f"{records} records found", 200
else:
return db._to_dict(records)


@mod.route("/harvest_record/<record_id>/raw", methods=["GET"])
Expand Down Expand Up @@ -818,27 +843,6 @@ def get_data_sources():
return render_template("get_data_sources.html", sources=source, organizations=org)


## Test interface, will remove later
@mod.route("/delete_all_records", methods=["DELETE"])
def delete_all_records():
db.delete_all_harvest_records()
return "All harvest records deleted"


## Test interface, will remove later
@mod.route("/add_harvest_job_error", methods=["POST"])
def add_harvest_job_error():
db.add_harvest_job_error(request.json)
return "Added harvest job error"


## Test interface, will remove later
@mod.route("/add_harvest_record_error", methods=["POST"])
def add_harvest_record_error():
err = db.add_harvest_record_error(request.json)
return db._to_dict(err)


def register_routes(app):
app.register_blueprint(mod)
app.register_blueprint(user)
Expand Down
50 changes: 23 additions & 27 deletions database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

import ckanapi
from ckanapi import RemoteCKAN
from sqlalchemy import create_engine, func, inspect, or_, select, text
from sqlalchemy import create_engine, func, inspect, select, text
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import scoped_session, sessionmaker
from harvester.utils.general_utils import query_filter_builder

from .models import (
HarvestJob,
Expand Down Expand Up @@ -224,8 +225,9 @@ def _clear_harvest_records():

ckan_ids = [record.ckan_id for record in records if record.ckan_id is not None]
error_records = [record for record in records if record.status == "error"]
jobs_in_progress = self.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source.id, "status": "in_progress"}
jobs_in_progress = self.pget_harvest_jobs(
facets=f"harvest_source_id = '{source.id}', 'status' = 'in_progress'",
paginate=False,
)

# Ensure no jobs are in progress
Expand Down Expand Up @@ -325,10 +327,6 @@ def add_harvest_job(self, job_data):
def get_harvest_job(self, job_id):
return self.db.query(HarvestJob).filter_by(id=job_id).first()

def get_all_harvest_jobs_by_filter(self, filter):
harvest_jobs = self.db.query(HarvestJob).filter_by(**filter).all()
return [job for job in harvest_jobs or []]

def get_first_harvest_jobs_by_filter(self, filter):
harvest_job = (
self.db.query(HarvestJob)
Expand Down Expand Up @@ -361,11 +359,6 @@ def get_new_harvest_jobs_by_source_in_future(self, source_id):
)
return [job for job in harvest_jobs or []]

def get_harvest_jobs_by_faceted_filter(self, attr, values):
query_list = [getattr(HarvestJob, attr) == value for value in values]
harvest_jobs = self.db.query(HarvestJob).filter(or_(*query_list)).all()
return [job for job in harvest_jobs]

def update_harvest_job(self, job_id, updates):
try:
job = self.db.get(HarvestJob, job_id)
Expand Down Expand Up @@ -602,29 +595,32 @@ def verify_user(self, usr_data):
#### PAGINATED QUERIES ####
@count
@paginate
def pget_harvest_jobs(self, filter=text(""), **kwargs):
return self.db.query(HarvestJob).filter(filter)
def pget_harvest_jobs(self, facets="", **kwargs):
facet_string = query_filter_builder(None, facets)
return self.db.query(HarvestJob).filter(text(facet_string))

@count
@paginate
def pget_harvest_records(self, filter=text(""), **kwargs):
return self.db.query(HarvestRecord).filter(filter)
def pget_harvest_records(self, facets="", **kwargs):
return self.db.query(HarvestRecord).filter(text(facets))

@count
@paginate
def pget_harvest_job_errors(self, filter=text(""), **kwargs):
return self.db.query(HarvestJobError).filter(filter)
def pget_harvest_job_errors(self, facets="", **kwargs):
return self.db.query(HarvestJobError).filter(text(facets))

@count
@paginate
def pget_harvest_record_errors(self, filter=text(""), **kwargs):
return self.db.query(HarvestRecordError).filter(filter)
def pget_harvest_record_errors(self, facets="", **kwargs):
return self.db.query(HarvestRecordError).filter(text(facets))

#### FACETED BUILDER QUERIES ####
def get_harvest_records_by_job(self, job_id, facets=[], **kwargs):
filter_string = " AND ".join([f"harvest_job_id = '{job_id}'"] + facets)
return self.pget_harvest_records(filter=text(filter_string), **kwargs)
#### FILTERED BUILDER QUERIES ####
def get_harvest_records_by_job(self, job_id, facets="", **kwargs):
facet_string = query_filter_builder(f"harvest_job_id = '{job_id}'", facets)
return self.pget_harvest_records(facets=facet_string, **kwargs)

def get_harvest_records_by_source(self, source_id, facets=[], **kwargs):
filter_string = " AND ".join([f"harvest_source_id = '{source_id}'"] + facets)
return self.pget_harvest_records(filter=text(filter_string), **kwargs)
def get_harvest_records_by_source(self, source_id, facets="", **kwargs):
facet_string = query_filter_builder(
f"harvest_source_id = '{source_id}'", facets
)
return self.pget_harvest_records(facets=facet_string, **kwargs)
5 changes: 3 additions & 2 deletions harvester/lib/load_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ def trigger_manual_job(self, source_id):
"""manual trigger harvest job,
takes a source_id"""
source = interface.get_harvest_source(source_id)
jobs_in_progress = interface.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source.id, "status": "in_progress"}
jobs_in_progress = interface.pget_harvest_jobs(
facets=f"harvest_source_id = '{source.id}', status = 'in_progress'",
paginate=False,
)
if len(jobs_in_progress):
return f"Can't trigger harvest. Job {jobs_in_progress[0].id} already in progress." # noqa E501
Expand Down
18 changes: 18 additions & 0 deletions harvester/utils/general_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,21 @@ def download_waf(files):
output.append({"url": file, "content": download_file(file, ".xml")})

return output


def query_filter_builder(base, facets):
"""Builds filter strings from base and comma separated string of filters
:param base str - base filter query
:param facets str - extra facets
"""
if base is None:
facet_string = facets.split(",")[0]
facet_list = facets.split(",")[1:]
else:
facet_string = base
facet_list = facets.split(",")
for facet in facet_list:
if facet != "":
facet_string += f" AND {facet}"
return facet_string
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ def interface_with_multiple_sources(
return interface_with_fixture_json


## MISC
@pytest.fixture
def interface_with_multiple_jobs(interface_no_jobs, source_data_dcatus):
statuses = ["new", "in_progress", "complete", "error"]
Expand Down
24 changes: 14 additions & 10 deletions tests/integration/app/test_load_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ def test_manual_job_doesnt_affect_scheduled_jobs(
assert source_data_dcatus["frequency"] == "daily"
assert jobs[0].date_created == datetime.now() + timedelta(days=1)

jobs = interface_no_jobs.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source_data_dcatus["id"]}
source_id = source_data_dcatus["id"]
jobs = interface_no_jobs.pget_harvest_jobs(
facets=f"harvest_source_id = '{source_id}'"
)
assert len(jobs) == 2
assert jobs[0].date_created == datetime.now() + timedelta(days=1)
Expand All @@ -212,8 +213,9 @@ def test_dont_create_new_job_if_job_already_in_progress(
load_manager = LoadManager()
load_manager.schedule_first_job(source_data_dcatus["id"])
message = load_manager.trigger_manual_job(source_data_dcatus["id"])
new_job = interface_no_jobs.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source_data_dcatus["id"], "status": "in_progress"}
source_id = source_data_dcatus["id"]
new_job = interface_no_jobs.pget_harvest_jobs(
facets=f"harvest_source_id = '{source_id}', status = 'in_progress'"
)
assert message == f"Updated job {new_job[0].id} to in_progress"
message = load_manager.trigger_manual_job(source_data_dcatus["id"])
Expand All @@ -222,8 +224,8 @@ def test_dont_create_new_job_if_job_already_in_progress(
== f"Can't trigger harvest. Job {new_job[0].id} already in progress."
)

jobs = interface_no_jobs.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source_data_dcatus["id"]}
jobs = interface_no_jobs.pget_harvest_jobs(
facets=f"harvest_source_id = '{source_id}'"
)

assert len(jobs) == 2
Expand Down Expand Up @@ -251,8 +253,9 @@ def test_assert_env_var_changes_task_size(
assert start_task_mock.call_args[0][4] == "1536"

# clear out in progress jobs
jobs = interface_no_jobs.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source_data_dcatus["id"]}
source_id = source_data_dcatus["id"]
jobs = interface_no_jobs.pget_harvest_jobs(
facets=f"harvest_source_id = '{source_id}'"
)
interface_no_jobs.delete_harvest_job(jobs[0].id)

Expand Down Expand Up @@ -283,8 +286,9 @@ def test_trigger_cancel_job(
load_manager = LoadManager()
load_manager.trigger_manual_job(source_data_dcatus["id"])

jobs = interface_no_jobs.get_all_harvest_jobs_by_filter(
{"harvest_source_id": source_data_dcatus["id"]}
source_id = source_data_dcatus["id"]
jobs = interface_no_jobs.pget_harvest_jobs(
facets=f"harvest_source_id = '{source_id}'"
)

task_guid_val = "3a24b55a02b0-eb7b-4eeb-9f45-645cedd3d93b"
Expand Down
Loading

1 comment on commit 97a1144

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests Skipped Failures Errors Time
2 0 💤 0 ❌ 0 🔥 6.072s ⏱️

Please sign in to comment.