diff --git a/hera_librarian/models/search.py b/hera_librarian/models/search.py new file mode 100644 index 0000000..0bf9668 --- /dev/null +++ b/hera_librarian/models/search.py @@ -0,0 +1,93 @@ +""" +Pydantic models for the search endpoint. +""" + +from datetime import datetime +from pathlib import Path +from typing import Optional + +from pydantic import BaseModel, Field, RootModel + +from hera_librarian.deletion import DeletionPolicy + + +class FileSearchRequest(BaseModel): + """ + Represents a file search request. + """ + + name: Optional[str] = None + "The name of the file to search for." + create_time_window: Optional[tuple[datetime, ...]] = Field( + default=None, min_length=2, max_length=2 + ) + "The time window to search for files in. This is a tuple of two datetimes, the first being the start and the second being the end. Note that the datetimes should be in UTC." + uploader: Optional[str] = None + "The uploader to search for." + source: Optional[str] = None + "The source to search for." + max_results: int = 64 + "The maximum number of results to return." + + +class InstanceSearchResponse(BaseModel): + """ + Represents an instance in the file search response. + """ + + path: Path + "The path of the instance." + deletion_policy: DeletionPolicy + "The deletion policy of the instance." + created_time: datetime + "The time the instance was created." + available: bool + "Whether or not the instance is available." + + +class RemoteInstanceSearchResponse(BaseModel): + """ + Represents a remote instance in the file search response. + """ + + librarian_name: str + "The name of the librarian that this instance lives on." + copy_time: datetime + "The time at which this instance was copied to the remote librarian." + + +class FileSearchResponse(BaseModel): + """ + Represents a file search response. + """ + + name: str + "The name of the file." + create_time: datetime + "The time the file was created." + size: int + "The size of the file in bytes." + checksum: str + "The checksum of the file." + uploader: str + "The uploader of the file." + source: str + "The source of the file." + instances: list[InstanceSearchResponse] + "The instances of the file." + remote_instances: list[RemoteInstanceSearchResponse] + "The remote instances of the file." + + +FileSearchResponses = RootModel[list[FileSearchResponse]] + + +class FileSearchFailedResponse(BaseModel): + """ + Represents a file search failure response. + """ + + reason: str + "The reason why the search failed." + suggested_remedy: str + "A suggested remedy for the failure." diff --git a/hera_librarian/models/stores.py b/hera_librarian/models/stores.py deleted file mode 100644 index 871369f..0000000 --- a/hera_librarian/models/stores.py +++ /dev/null @@ -1,12 +0,0 @@ -""" -Pydantic model for store information. -""" - -from pydantic import BaseModel - -class StoreRequest(BaseModel): - """ - Pydantic model for store information. - """ - - stores: list["CoreStore"] \ No newline at end of file diff --git a/librarian_server/__init__.py b/librarian_server/__init__.py index d88b0fe..e4f8ec3 100644 --- a/librarian_server/__init__.py +++ b/librarian_server/__init__.py @@ -18,10 +18,11 @@ def main() -> FastAPI: log.debug("Adding API router.") - from .api import upload_router, ping_router, clone_router + from .api import upload_router, ping_router, clone_router, search_router app.include_router(upload_router) app.include_router(ping_router) app.include_router(clone_router) + app.include_router(search_router) return app \ No newline at end of file diff --git a/librarian_server/api/__init__.py b/librarian_server/api/__init__.py index 1fc4046..5bbab80 100644 --- a/librarian_server/api/__init__.py +++ b/librarian_server/api/__init__.py @@ -7,4 +7,5 @@ from .upload import router as upload_router from .ping import router as ping_router -from .clone import router as clone_router \ No newline at end of file +from .clone import router as clone_router +from .search import router as search_router \ No newline at end of file diff --git a/librarian_server/api/search.py b/librarian_server/api/search.py new file mode 100644 index 0000000..2b4b4e0 --- /dev/null +++ b/librarian_server/api/search.py @@ -0,0 +1,103 @@ +""" +Contains endpoints for searching the files uploaded to the librarian. +""" + +from fastapi import APIRouter, Depends, Response, status +from sqlalchemy import select +from sqlalchemy.orm import Session + +from hera_librarian.models.search import (FileSearchFailedResponse, + FileSearchRequest, + FileSearchResponse, + FileSearchResponses, + InstanceSearchResponse, + RemoteInstanceSearchResponse) + +from ..database import yield_session +from ..logger import log +from ..orm.file import File +from ..orm.instance import Instance, RemoteInstance +from ..settings import server_settings + +router = APIRouter(prefix="/api/v2/search") + + +@router.post("/file", response_model=FileSearchResponses | FileSearchFailedResponse) +def file( + request: FileSearchRequest, + response: Response, + session: Session = Depends(yield_session), +): + """ + Searches for files in the librarian. + + Possible response codes: + + 200 - OK. Search completed successfully. + 404 - No file found to match search criteria. + """ + + log.debug(f"Received file search request: {request}") + + # Start to build our query. + query = select(File) + + if request.name is not None: + query = query.where(File.name == request.name) + + if request.create_time_window is not None: + query = query.where(File.create_time >= request.create_time_window[0]) + query = query.where(File.create_time <= request.create_time_window[1]) + + if request.uploader is not None: + query = query.where(File.uploader == request.uploader) + + if request.source is not None: + query = query.where(File.source == request.source) + + query.order_by(File.create_time) + query.limit(max(min(request.max_results, server_settings.max_search_results), 0)) + + # Execute the query. + results = session.execute(query).scalars().all() + + if len(results) == 0: + log.debug(f"No files found. Returning error.") + response.status_code = status.HTTP_404_NOT_FOUND + return FileSearchFailedResponse( + reason="No files found.", + suggested_remedy="Check that you are searching for the correct file.", + ) + + # Build the response. + respond_files = [] + + for result in results: + respond_files.append( + FileSearchResponse( + name=result.name, + create_time=result.create_time, + size=result.size, + checksum=result.checksum, + uploader=result.uploader, + source=result.source, + instances=[ + InstanceSearchResponse( + path=instance.path, + deletion_policy=instance.deletion_policy, + created_time=instance.created_time, + available=instance.available, + ) + for instance in result.instances + ], + remote_instances=[ + RemoteInstanceSearchResponse( + librarian_name=remote_instance.librarian_name, + copy_time=remote_instance.copy_time, + ) + for remote_instance in result.remote_instances + ], + ) + ) + + return FileSearchResponses(respond_files) diff --git a/librarian_server/search.py b/librarian_server/search.py deleted file mode 100644 index 597fd8f..0000000 --- a/librarian_server/search.py +++ /dev/null @@ -1,1317 +0,0 @@ -# -*- mode: python; coding: utf-8 -*- -# Copyright 2016-2017 the HERA Collaboration -# Licensed under the BSD License. - -"""Searches of the database - -This code will likely need a lot of expansion, but we'll start simple. - -""" - - - -__all__ = str(''' -compile_search -StandingOrder -queue_standing_order_copies -register_standing_order_checkin -''').split() - -import datetime -import json -import logging -import os.path -import six -import sys -import time - -from flask import Response, flash, redirect, render_template, request, url_for - -from . import app, db, is_primary_server, logger -from .webutil import ServerError, json_api, login_required, optional_arg, required_arg - - -# The search parser. We save searches in a (hopefully) simple JSON format. The -# format is documented in `docs/Searching.md`. KEEP THE DOCS UPDATED! - -class _AttributeTypes(object): - string = 's' - int = 'i' - float = 'f' - - -AttributeTypes = _AttributeTypes() - - -class GenericSearchCompiler(object): - """A simple singleton class that helps with compiling searches. The only state - that we manage is the list of search clauses, which can be extended - dynamically to support different types of attributes that searchable - things possess. - - """ - - def __init__(self): - self.clauses = { - 'and': self._do_and, - 'or': self._do_or, - 'none-of': self._do_none_of, - 'always-true': self._do_always_true, - 'always-false': self._do_always_false, - } - - def compile(self, search): - """Compile a search that is specified as a JSON-like data structure. - - The `search` must be a dict, which is interpreted as a set of clauses - that are ANDed logically. - - """ - if isinstance(search, dict): - return self._compile_clause('and', search) - - raise ServerError('can\'t parse search: data must ' - 'be in dict format; got %s', search.__class__.__name__) - - def _compile_clause(self, name, payload): - impl = self.clauses.get(name) - if impl is None: - raise ServerError('can\'t parse search: unrecognized clause %r' % name) - return impl(name, payload) - - # Framework for doing searches on general attributes of database items. - - def _add_attributes(self, cls, attr_info): - from functools import partial - - for attr_name, attr_type, attr_getter in attr_info: - clause_name = attr_name.replace('_', '-') - - if attr_getter is None: - attr_getter = partial(getattr, cls, attr_name) - - if attr_type == AttributeTypes.string: - self.clauses[clause_name + - '-is-exactly'] = partial(self._do_str_is_exactly, attr_getter) - self.clauses[clause_name + '-is-not'] = partial(self._do_str_is_not, attr_getter) - self.clauses[clause_name + '-matches'] = partial(self._do_str_matches, attr_getter) - elif attr_type == AttributeTypes.int: - self.clauses[clause_name + - '-is-exactly'] = partial(self._do_int_is_exactly, attr_getter) - self.clauses[clause_name + '-is-not'] = partial(self._do_int_is_not, attr_getter) - self.clauses[clause_name + - '-greater-than'] = partial(self._do_num_greater_than, attr_getter) - self.clauses[clause_name + - '-less-than'] = partial(self._do_num_less_than, attr_getter) - self.clauses[clause_name + - '-in-range'] = partial(self._do_num_in_range, attr_getter) - self.clauses[clause_name + - '-not-in-range'] = partial(self._do_num_not_in_range, attr_getter) - elif attr_type == AttributeTypes.float: - self.clauses[clause_name + - '-greater-than'] = partial(self._do_num_greater_than, attr_getter) - self.clauses[clause_name + - '-less-than'] = partial(self._do_num_less_than, attr_getter) - self.clauses[clause_name + - '-in-range'] = partial(self._do_num_in_range, attr_getter) - self.clauses[clause_name + - '-not-in-range'] = partial(self._do_num_not_in_range, attr_getter) - - def _do_str_matches(self, attr_getter, clause_name, payload): - if not isinstance(payload, str): - raise ServerError('can\'t parse "%s" clause: contents must be text, ' - 'but got %s', clause_name, payload.__class__.__name__) - return attr_getter().like(payload) - - def _do_str_is_exactly(self, attr_getter, clause_name, payload): - if not isinstance(payload, str): - raise ServerError('can\'t parse "%s" clause: contents must be text, ' - 'but got %s', clause_name, payload.__class__.__name__) - return (attr_getter() == payload) - - def _do_str_is_not(self, attr_getter, clause_name, payload): - if not isinstance(payload, str): - raise ServerError('can\'t parse "%s" clause: contents must be text, ' - 'but got %s', clause_name, payload.__class__.__name__) - return (attr_getter() != payload) - - def _do_int_is_exactly(self, attr_getter, clause_name, payload): - if not isinstance(payload, int): - raise ServerError('can\'t parse "%s" clause: contents must be an integer, ' - 'but got %s', clause_name, payload.__class__.__name__) - return (attr_getter() == payload) - - def _do_int_is_not(self, attr_getter, clause_name, payload): - if not isinstance(payload, int): - raise ServerError('can\'t parse "%s" clause: contents must be an integer, ' - 'but got %s', clause_name, payload.__class__.__name__) - return (attr_getter() != payload) - - def _do_num_greater_than(self, attr_getter, clause_name, payload): - if not isinstance(payload, (int, float)): - raise ServerError('can\'t parse "%s" clause: contents must be numeric, ' - 'but got %s', clause_name, payload.__class__.__name__) - return (attr_getter() > payload) - - def _do_num_less_than(self, attr_getter, clause_name, payload): - if not isinstance(payload, (int, float)): - raise ServerError('can\'t parse "%s" clause: contents must be numeric, ' - 'but got %s', clause_name, payload.__class__.__name__) - return (attr_getter() < payload) - - def _do_num_in_range(self, attr_getter, clause_name, payload): - if (not isinstance(payload, list) or - len(payload) != 2 or - not isinstance(payload[0], (int, float)) or - not isinstance(payload[1], (int, float))): - raise ServerError('can\'t parse "%s" clause: contents must be a list of two numbers, ' - 'but got %s', clause_name, payload.__class__.__name__) - - v1, v2 = payload - if v1 > v2: - v1, v2 = v2, v1 - - from sqlalchemy import and_ - value = attr_getter() - return and_(value >= v1, value <= v2) - - def _do_num_not_in_range(self, attr_getter, clause_name, payload): - if (not isinstance(payload, list) or - len(payload) != 2 or - not isinstance(payload[0], (int, float)) or - not isinstance(payload[1], (int, float))): - raise ServerError('can\'t parse "%s" clause: contents must be a list of two numbers, ' - 'but got %s', clause_name, payload.__class__.__name__) - - v1, v2 = payload - if v1 > v2: - v1, v2 = v2, v1 - - from sqlalchemy import or_ - value = attr_getter() - return or_(value < v1, value > v2) - - # Custom, generic clauses. - - def _do_and(self, clause_name, payload): - if not isinstance(payload, dict) or not len(payload): - raise ServerError('can\'t parse "%s" clause: contents must be a dict, ' - 'but got %s', clause_name, payload.__class__.__name__) - from sqlalchemy import and_ - return and_(*[self._compile_clause(*t) for t in payload.items()]) - - def _do_or(self, clause_name, payload): - if not isinstance(payload, dict) or not len(payload): - raise ServerError('can\'t parse "%s" clause: contents must be a dict, ' - 'but got %s', clause_name, payload.__class__.__name__) - from sqlalchemy import or_ - return or_(*[self._compile_clause(*t) for t in payload.items()]) - - def _do_none_of(self, clause_name, payload): - if not isinstance(payload, dict) or not len(payload): - raise ServerError('can\'t parse "%s" clause: contents must be a dict, ' - 'but got %s', clause_name, payload.__class__.__name__) - from sqlalchemy import not_, or_ - return not_(or_(*[self._compile_clause(*t) for t in payload.items()])) - - def _do_always_true(self, clause_name, payload): - """We just ignore the payload.""" - from sqlalchemy import literal - return literal(True) - - def _do_always_false(self, clause_name, payload): - """We just ignore the payload.""" - from sqlalchemy import literal - return literal(False) - - -# Searches for observing sessions - -def _session_get_id(): - from .observation import ObservingSession - return ObservingSession.id - - -def _session_get_duration(): - """There is a "duration" property on the ObservingSession class, but it - computes its result in Python code using math, which means that it doesn't - work within an SQL query. Empirically, we get a silent failure to match - any files if we try to search that way. - - """ - from .observation import ObservingSession - return (ObservingSession.stop_time_jd - ObservingSession.start_time_jd) - - -def _session_get_num_obs(): - from sqlalchemy import func - from .observation import Observation, ObservingSession - return (db.session.query(func.count(Observation.obsid)) - .filter(Observation.session_id == ObservingSession.id).as_scalar()) - - -def _session_get_num_files(): - from sqlalchemy import func - from .file import File - from .observation import Observation, ObservingSession - return (db.session.query(func.count(File.name)) - .filter(File.obsid == Observation.obsid) - .filter(Observation.session_id == ObservingSession.id).as_scalar()) - - -def _session_get_age(): - from astropy.time import Time - from .observation import ObservingSession - return (Time.now().jd - ObservingSession.stop_time_jd) - - -simple_session_attrs = [ - ('session_id', AttributeTypes.int, _session_get_id), - ('start_time_jd', AttributeTypes.float, None), - ('stop_time_jd', AttributeTypes.float, None), - ('duration', AttributeTypes.float, _session_get_duration), - ('num_obs', AttributeTypes.int, _session_get_num_obs), - ('num_files', AttributeTypes.int, _session_get_num_files), - ('age', AttributeTypes.float, _session_get_age), -] - - -class ObservingSessionSearchCompiler(GenericSearchCompiler): - def __init__(self): - from .observation import ObservingSession - super(ObservingSessionSearchCompiler, self).__init__() - self._add_attributes(ObservingSession, simple_session_attrs) - - self.clauses['no-file-has-event'] = self._do_no_file_has_event - - def _do_no_file_has_event(self, clause_name, payload): - if not isinstance(payload, str): - raise ServerError('can\'t parse "%s" clause: contents must be text, ' - 'but got %s', clause_name, payload.__class__.__name__) - - from sqlalchemy import func - from .file import File, FileEvent - from .observation import Observation, ObservingSession - - # This feels awfully gross, but it works. - - return (db.session.query(func.count(File.name)) - .filter(File.obsid == Observation.obsid) - .filter(Observation.session_id == ObservingSession.id) - .join(FileEvent) - .filter(FileEvent.type == payload, - File.name == FileEvent.name).as_scalar() == 0) - - -the_session_search_compiler = ObservingSessionSearchCompiler() - - -# Searches for observations - -def _obs_get_duration(): - """There is a "duration" property on the Observation class, but it computes - its result in Python code using math, which means that it doesn't work - within an SQL query. Empirically, we get a silent failure to match any - files if we try to search that way. - - """ - from .observation import Observation - return (Observation.stop_time_jd - Observation.start_time_jd) - - -def _obs_get_num_files(): - from sqlalchemy import func - from .file import File - from .observation import Observation - return db.session.query(func.count(File.name)).filter(File.obsid == Observation.obsid).as_scalar() - - -def _obs_get_total_size(): - from sqlalchemy import func - from .file import File - from .observation import Observation - return db.session.query(func.sum(File.size)).filter(File.obsid == Observation.obsid).as_scalar() - - -simple_obs_attrs = [ - ('obsid', AttributeTypes.int, None), - ('start_time_jd', AttributeTypes.float, None), - ('stop_time_jd', AttributeTypes.float, None), - ('start_lst_hr', AttributeTypes.float, None), - ('session_id', AttributeTypes.int, None), - ('duration', AttributeTypes.float, _obs_get_duration), - ('num_files', AttributeTypes.int, _obs_get_num_files), - ('total_size', AttributeTypes.int, _obs_get_total_size), -] - - -class ObservationSearchCompiler(GenericSearchCompiler): - def __init__(self): - from .observation import Observation - super(ObservationSearchCompiler, self).__init__() - self._add_attributes(Observation, simple_obs_attrs) - - -the_obs_search_compiler = ObservationSearchCompiler() - - -# Searches for files - -def _file_get_num_instances(): - from sqlalchemy import func - from .file import File, FileInstance - return db.session.query(func.count()).filter(FileInstance.name == File.name).as_scalar() - - -simple_file_attrs = [ - ('name', AttributeTypes.string, None), - ('type', AttributeTypes.string, None), - ('source', AttributeTypes.string, None), - ('size', AttributeTypes.int, None), - ('obsid', AttributeTypes.int, None), - ('num-instances', AttributeTypes.int, _file_get_num_instances), -] - - -class FileSearchCompiler(GenericSearchCompiler): - def __init__(self): - from .file import File - super(FileSearchCompiler, self).__init__() - self._add_attributes(File, simple_file_attrs) - self.clauses['obs-matches'] = self._do_obs_matches - - self.clauses['name-like'] = self.clauses['name-matches'] # compat alias - self.clauses['source-is'] = self.clauses['source-is-exactly'] # compat alias - - self.clauses['obsid-is-null'] = self._do_obsid_is_null - - # These are technically properties of Observations, not Files, but - # users aren't going to want to jump through extra hoops to query for - # them, so we proxy the query clauses. - - from functools import partial - for pfx in ('start-time-jd', 'stop-time-jd', 'start-lst-hr', 'session-id'): - for cname in six.iterkeys(the_obs_search_compiler.clauses): - if cname.startswith(pfx): - self.clauses[cname] = self._do_obs_sub_query - - # I named these in a very ... weird way. - self.clauses['not-older-than'] = self._do_not_older_than - self.clauses['not-newer-than'] = self._do_not_newer_than - - def _do_obsid_is_null(self, clause_name, payload): - """We just ignore the payload.""" - from .file import File - return (File.obsid == None) - - def _do_not_older_than(self, clause_name, payload): - if not isinstance(payload, (int, float)): - raise ServerError('can\'t parse "%s" clause: contents must be ' - 'numeric, but got %s', clause_name, payload.__class__.__name__) - - from .file import File - cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=payload) - return (File.create_time > cutoff) - - def _do_not_newer_than(self, clause_name, payload): - if not isinstance(payload, (int, float)): - raise ServerError('can\'t parse "%s" clause: contents must be ' - 'numeric, but got %s', clause_name, payload.__class__.__name__) - - from .file import File - cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=payload) - return (File.create_time < cutoff) - - def _do_obs_matches(self, clause_name, payload): - from .observation import Observation - from .file import File - - matched_obsids = (db.session.query(Observation.obsid) - .filter(the_obs_search_compiler.compile(payload))) - return File.obsid.in_(matched_obsids) - - def _do_obs_sub_query(self, clause_name, payload): - from .observation import Observation - from .file import File - - matched_obsids = (db.session.query(Observation.obsid) - .filter(the_obs_search_compiler._compile_clause(clause_name, payload))) - return File.obsid.in_(matched_obsids) - - -the_file_search_compiler = FileSearchCompiler() - - -def compile_search(search_string, query_type='files'): - """This function returns a query on the File table that will return the File - items matching the search. - - """ - from .file import File, FileInstance - from .observation import Observation, ObservingSession - from .store import Store - - # As a convenience, we strip out #-delimited comments from the input text. - # The default JSON parser doesn't accept them, but they're nice for users. - - def filter_comments(): - for line in search_string.splitlines(): - yield line.split('#', 1)[0] - - search_string = '\n'.join(filter_comments()) - - # Parse JSON. - - try: - search = json.loads(search_string) - except Exception as e: - app.log_exception(sys.exc_info()) - raise ServerError('can\'t parse search as JSON: %s', e) - - # Offload to the helper classes. - - if query_type == 'files': - return File.query.filter(the_file_search_compiler.compile(search)) - elif query_type == 'names': - return db.session.query(File.name).filter(the_file_search_compiler.compile(search)) - elif query_type == 'obs': - return Observation.query.filter(the_obs_search_compiler.compile(search)) - elif query_type == 'sessions': - return ObservingSession.query.filter(the_session_search_compiler.compile(search)) - elif query_type == 'instances-stores': - # The following syntax gives us a LEFT OUTER JOIN which is what we want to - # get (at most) one instance for each File of interest. - return (db.session.query(FileInstance, File, Store) - .join(Store) - .join(File, isouter=True) - .filter(the_file_search_compiler.compile(search))) - elif query_type == 'instances': - return (db.session.query(FileInstance) - .join(File, isouter=True) - .filter(the_file_search_compiler.compile(search))) - else: - raise ServerError('unhandled query_type %r', query_type) - - -# "Standing orders" to copy files from one Librarian to another. - -stord_logger = logging.getLogger('librarian.standingorders') - - -class StandingOrder(db.Model): - """A StandingOrder describes a rule for copying data from this Librarian to - another. We save a search and a destination. When new files match that - search, we automatically start copying them to the destination. We create - a FileEvent with a name based on the name of the StandingOrder to mark - when a file has successfully been copied. - - It is assumed that the relevant search has some time limit applied so that - only files created in the last (e.g.) 7 days match. - - """ - __tablename__ = 'standing_order' - - id = db.Column(db.Integer, primary_key=True, autoincrement=True) - name = db.Column(db.String(64), unique=True, nullable=False) - search = db.Column(db.Text, nullable=False) - conn_name = db.Column(db.String(64), nullable=False) - - def __init__(self, name, search, conn_name): - self.name = name - self.search = search - self.conn_name = conn_name - self._validate() - - def _validate(self): - """Check that this object's fields follow our invariants. - - """ - compile_search(self.search) # will raise a ServerError if there's a problem. - - @property - def event_type(self): - return 'standing_order_succeeded:' + self.name - - def get_files_to_copy(self): - """Generate a list of files that ought to be copied, according to the - specifications of this StandingOrder. - - """ - from .file import File, FileEvent - - # The core query is something freeform specified by the user. - - query = compile_search(self.search) - - # We then layer on a check that the files don't have the specified - # marker event. - - already_done = (db.session.query(File.name) - .filter(FileEvent.name == File.name, - FileEvent.type == self.event_type)) - query = query.filter(~File.name.in_(already_done)) - - # Finally we filter out files that already have copy tasks associated - # with this standing order, exceping those tasks that encountered an - # error. - - from .store import UploaderTask - from .bgtasks import the_task_manager - - already_launched = set(os.path.basename(t.store_path) - for t in the_task_manager.tasks - if (isinstance(t, UploaderTask) and - self.name == t.standing_order_name and - t.exception is None)) - - for file in query: - if file.name not in already_launched: - yield file - - def maybe_launch_copies(self): - """Launch any file copy operations that need to happen according to this - StandingOrder's specification. - - """ - from .store import launch_copy_by_file_name - stord_logger.debug('evaluating standing order %s', self.name) - - for file in self.get_files_to_copy(): - stord_logger.debug('got a hit: %s', file.name) - if launch_copy_by_file_name(file.name, self.conn_name, - standing_order_name=self.name, no_instance='return'): - stord_logger.warn('standing order %s should copy file %s to %s, but no instances ' - 'of it are available', self.name, file.name, self.conn_name) - - -# A simple little manager for running standing orders. We have a timeout to -# not evaluate them that often ... in the current setup, evaluating certain -# orders can be quite hard on the database. - -MIN_STANDING_ORDER_INTERVAL = 1200 # seconds -DEFAULT_STANDING_ORDER_DELAY = 90 # seconds - - -def _launch_copy_timeout(): - stord_logger.debug('timeout invoked') - - if the_standing_order_manager.maybe_launch_copies(): - # The checks actually ran. - the_standing_order_manager.launch_queued = False - else: - # We didn't run the checks because we did so recently. If a new file - # was uploaded we want to make sure that it's eventually checked, so - # re-queue ourselves to run again. - from tornado.ioloop import IOLoop - stord_logger.debug('re-scheduling timeout') - IOLoop.current().call_later(DEFAULT_STANDING_ORDER_DELAY, _launch_copy_timeout) - - -class StandingOrderManager(object): - """A simple, singleton class for managing our standing orders. - - Other folks should primarily access the manager via the - `queue_standing_order_copies` function. That function *queues* a command - to examine our standing orders and launch any needed copy commands, with a - default delay of 90 seconds. The delay is in place since uploads of files - to the Librarian are likely to occur in batches, but it's easiest to just - command the manager to "do its thing" whenever a file is uploaded. The - delay makes it so that when we actually look for files to copy, there's - probably a bunch of them ready to go, not just the very first one that was - uploaded. - - """ - last_check = 0 - launch_queued = False - - def maybe_launch_copies(self): - """Returns True unless nothing happened because we've run a search recently. - - """ - now = time.time() - - if now - self.last_check < MIN_STANDING_ORDER_INTERVAL: - return False # Don't evaluate too often - - # Check if there are any restrictions on what we do with standing - # orders. TODO: it's been requested that we also add time constraints - # on the uploads (Github issue #23). - - mode = app.config.get('standing_order_mode', 'normal') - - if mode == 'disabled': - stord_logger.debug('not checking standing orders: explicitly disabled') - return True - elif mode == 'nighttime': - # Hack: qmaster is now on UTC = SAST - 2, so our definition of - # "night" is a bit different than you might expect. Our intent is - # 8pm-8am (actual) local time. - hour = time.localtime(now).tm_hour - if hour >= 6 and hour < 18: - stord_logger.debug('not checking standing orders: "nighttime" ' - 'mode and hour = %d', hour) - return True - elif mode != 'normal': - stord_logger.warn('unrecognized standing_order_mode %r; treating as "normal"', mode) - mode = 'normal' - - stord_logger.debug('running searches') - self.last_check = now - - for storder in StandingOrder.query.all(): - storder.maybe_launch_copies() - - return True - - def queue_launch_copy(self): - """Queue a main-thread callback to check whether we need to launch any copies - associated with our standing orders. - - """ - stord_logger.debug('called queue_launch_copy') - if self.launch_queued: - return - - self.launch_queued = True - from tornado.ioloop import IOLoop - stord_logger.debug('timeout actually scheduled') - IOLoop.current().call_later(DEFAULT_STANDING_ORDER_DELAY, _launch_copy_timeout) - - -the_standing_order_manager = StandingOrderManager() - - -def queue_standing_order_copies(): - # Only the primary server processes standing orders. - if not is_primary_server(): - stord_logger.debug('not checking standing orders -- not primary server process') - return - - stord_logger.debug('queueing check of standing orders') - the_standing_order_manager.queue_launch_copy() - - -def register_standing_order_checkin(): - """Create a Tornado PeriodicCallback that will periodically check the - standing orders to see if there's anything to do. - - Since we know all events related to files, in theory this shouldn't be - needed, but in practice this can't hurt. - - The timeout for the callback is measured in milliseconds, so we queue an - evaluation every 10 minutes. - - """ - from tornado import ioloop - - cb = ioloop.PeriodicCallback(queue_standing_order_copies, 60 * 10 * 1000) - cb.start() - return cb - - -# The local-disk staging system for the NRAO Librarian. In a sense this code -# isn't super relevant to searches, but the search system is how it gets -# launched, and it's not obvious to me that there's a better place to put it. - -from . import bgtasks - - -class StagerTask(bgtasks.BackgroundTask): - """Object that manages the task of staging files from one disk to - another on the machine that the Librarian server is running on. - - This functionality is extremely specialized to the NRAO Librarian, which - runs on a machine called `herastore01` that is equipped with both large - local RAID arrays, where the HERA data are stored, and a mount of a Lustre - network filesystem, where users do their data processing. This "staging" - functionality allows users to have the server copy data over to Lustre - as quick as possible. - - """ - - def __init__(self, dest, stage_info, bytes, user, chown_command): - """Arguments: - - dest (str) - The destination directory, which should exist. - stage_info - Iterable of `(store_prefix, parent_dirs, name)`. - bytes (integer) - Number of bytes to be staged. - user (str) - The name of the user that the files will be chowned to. - chown_command (list of str) - Beginning of the command line that will be used to change - file ownership after staging is complete. - - """ - self.dest = dest - self.stage_info = stage_info - self.user = user - self.chown_command = chown_command - self.desc = 'stage %d bytes to %s' % (bytes, dest) - - import os.path - import time - self.t_start = time.time() - - # In principle, we could execute multiple stage operations to the same - # destination directory at the same time, but the files that we use to - # report progress don't have unique names, so it wouldn't be possible - # to understand whether individual operations succeeded or failed. We - # therefore only allow one stage at once, using the STAGING-IN-PROGRESS - # file as a lock. - # - # Relatedly, if a stage has already been executed to this directory, - # any lingering STAGING-SUCCEEDED/STAGING-ERRORS files will get - # chowned when this operation completes. The chown happens happens - # *before* we write the new result files, so the when we try to do so - # we get an EPERM. Prevent this by blowing away preexisting result - # files. - - from errno import EEXIST, ENOENT - - try: - flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY - fd = os.open(os.path.join(dest, 'STAGING-IN-PROGRESS'), flags, 0o666) - except OSError as e: - if e.errno == EEXIST: - raise Exception( - 'a staging operation into directory "%s" is already in progress' % dest) - raise - else: - with os.fdopen(fd, 'wt') as f: - print(self.t_start, file=f) - - for base in ['STAGING-SUCCEEDED', 'STAGING-ERRORS']: - try: - os.unlink(os.path.join(dest, base)) - except OSError as e: - if e.errno != ENOENT: - raise - - self.failures = [] - - def thread_function(self): - import os - import subprocess - from .misc import copyfiletree, ensure_dirs_gw - - for store_prefix, parent_dirs, name in self.stage_info: - source = os.path.join(store_prefix, parent_dirs, name) - dest_pfx = os.path.join(self.dest, parent_dirs) - dest = os.path.join(self.dest, parent_dirs, name) - - try: - ensure_dirs_gw(dest_pfx) - except Exception as e: - self.failures.append((dest_pfx, str(e))) - - try: - copyfiletree(source, dest) - except Exception as e: - self.failures.append((dest, str(e))) - - if len(self.failures): - raise Exception('failures while attempting to create and copy files') - - # Now change ownership of the files. - - argv = self.chown_command + [ - '-u', self.user, - '-R', # <= recursive - '-d', self.dest, - ] - - subprocess.check_output( - argv, - stdin=open(os.devnull, 'rb'), - stderr=subprocess.STDOUT, - shell=False, - close_fds=True, - ) - - def wrapup_function(self, retval, exc): - import time - self.t_stop = time.time() - - if exc is not None or len(self.failures): - with open(os.path.join(self.dest, 'STAGING-ERRORS'), 'wt') as f: - if exc is not None: - print('Unhandled exception:', exc, file=f) - - for destpath, e in self.failures: - print('For %s: %s' % (destpath, e), file=f) - - outcome_desc = 'FAILED' - log_func = logger.warn - else: - with open(os.path.join(self.dest, 'STAGING-SUCCEEDED'), 'wt') as f: - print(self.t_stop, file=f) - - outcome_desc = 'finished' - log_func = logger.info - - try: - os.unlink(os.path.join(self.dest, 'STAGING-IN-PROGRESS')) - except Exception as e: - # NOTE: app.log_exception() does not work here since we're not in a - # request-handling context. - logger.exception('couldn\'t remove staging-in-progress indicator for %r', self.dest) - - log_func('local-disk staging into %s %s: duration %.1fs', - self.dest, outcome_desc, self.t_stop - self.t_start) - - -def launch_stage_operation(user, search, stage_dest): - """Shared code to prep and launch a local-disk staging operation. - - user - The user that will own the files in the end. This function validates - that specified username is in fact a valid one on the system, but - does not (and cannot) verify that the invoker is who they say they - are. - search - A SQLAlchemy search for File objects that the user wants to stage. - stage_dest - The user-specified destination for the staging operation. - Returns - A tuple `(final_dest_dir, n_instances, n_bytes)`. - - """ - import os.path - import pwd - from .file import File, FileInstance - from .misc import ensure_dirs_gw - from .store import Store - - lds_info = app.config['local_disk_staging'] - - # Valid username? - - try: - pwd.getpwnam(user) - except KeyError: - raise Exception('staging user name \"%s\" was not recognized by the system' % user) - - # Validate and make the destination directory; let exception handling deal - # with it if there's a problem. - dest = os.path.realpath(stage_dest) - if not dest.startswith(lds_info['dest_prefix']): - raise Exception('staging destination must resolve to a subdirectory of \"%s\"; ' - 'input \"%s\" resolved to \"%s\" instead' % (lds_info['dest_prefix'], - stage_dest, dest)) - ensure_dirs_gw(dest) - - info = list(search.filter( - Store.ssh_host == lds_info['ssh_host'], - Store.available, - )) - - n_bytes = 0 - - for inst, file, store in info: - n_bytes += file.size - - # Quasi-hack: don't try to stage multiple instances of the same - # file, since that will break if the "file" is a directory. - stage_info = [] - seen_names = set() - - for inst, file, store in info: - if inst.name not in seen_names: - seen_names.add(inst.name) - stage_info.append((store.path_prefix, inst.parent_dirs, inst.name)) - - bgtasks.submit_background_task(StagerTask( - dest, stage_info, n_bytes, user, lds_info['chown_command'])) - - return dest, len(info), n_bytes - - -# Web user interface - -@app.route('/standing-orders') -@login_required -def standing_orders(): - q = StandingOrder.query.order_by(StandingOrder.name.asc()) - - return render_template( - 'standing-order-listing.html', - title='Standing Orders', - storders=q, - ) - - -@app.route('/standing-orders/') -@login_required -def specific_standing_order(name): - storder = StandingOrder.query.filter(StandingOrder.name == name).first() - if storder is None: - flash('No such standing order "%s"' % name) - return redirect(url_for('standing_orders')) - - try: - cur_files = list(storder.get_files_to_copy()) - except Exception as e: - app.log_exception(sys.exc_info()) - flash('Cannot run this order’s search: %s' % e) - cur_files = [] - - return render_template( - 'standing-order-individual.html', - title='Standing Order %s' % (storder.name), - storder=storder, - cur_files=cur_files, - ) - - -default_search = """{ - "name-matches": "any-file-named-like-%-this", - "not-older-than": 14 # days -}""" - - -@app.route('/standing-orders//create', methods=['POST']) -@login_required -def create_standing_order(ignored_name): - """Note that we ignore the order name and instead takes its value from the - POST data; this is basically an implementation/consistency thing. - - """ - name = required_arg(request.form, str, 'name') - - try: - if not len(name): - raise Exception('order name may not be empty') - - storder = StandingOrder(name, default_search, 'undefined-connection') - storder._validate() - db.session.add(storder) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise Exception('failed to commit information to database; see logs for details') - except Exception as e: - flash('Cannot create "%s": %s' % (name, e)) - return redirect(url_for('standing_orders')) - - return redirect(url_for('standing_orders') + '/' + name) - - -@app.route('/standing-orders//update', methods=['POST']) -@login_required -def update_standing_order(name): - storder = StandingOrder.query.filter(StandingOrder.name == name).first() - if storder is None: - flash('No such standing order "%s"' % name) - return redirect(url_for('standing_orders')) - - new_name = required_arg(request.form, str, 'name') - new_conn = required_arg(request.form, str, 'conn') - new_search = required_arg(request.form, str, 'search') - - try: - storder.name = new_name - storder.conn_name = new_conn - storder.search = new_search - storder._validate() - db.session.merge(storder) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise Exception('failed to commit update to database; see logs for details') - except Exception as e: - flash('Cannot update "%s": %s' % (name, e)) - return redirect(url_for('standing_orders')) - - # There might be new things to look at! - queue_standing_order_copies() - - flash('Updated standing order "%s"' % new_name) - return redirect(url_for('standing_orders')) - - -@app.route('/standing-orders//delete', methods=['POST']) -@login_required -def delete_standing_order(name): - storder = StandingOrder.query.filter(StandingOrder.name == name).first() - if storder is None: - flash('No such standing order "%s"' % name) - return redirect(url_for('standing_orders')) - - db.session.delete(storder) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise ServerError('failed to commit deletion to database; see logs for details') - - flash('Deleted standing order "%s"' % name) - return redirect(url_for('standing_orders')) - - -# Web interface to searches outside of the standing order system - -sample_file_search = '{ "name-matches": "%12345%.uv" }' - - -@app.route('/search-files', methods=['GET', 'POST']) -@login_required -def search_files(): - return render_template( - 'search-files.html', - title='Search Files', - sample_search=sample_file_search, - ) - - -sample_obs_search = '{ "duration-less-than": 0.003 }' - - -@app.route('/search-obs', methods=['GET', 'POST']) -@login_required -def search_obs(): - return render_template( - 'search-obs.html', - title='Search Observations', - sample_search=sample_obs_search, - ) - - -sample_session_search = '{ "session-id-is-exactly": 1171209640 }' - - -@app.route('/search-sessions', methods=['GET', 'POST']) -@login_required -def search_sessions(): - return render_template( - 'search-sessions.html', - title='Search Observing Sessions', - sample_search=sample_session_search, - ) - - -# These formats are defined in templates/search-*.html: -file_name_format = 'Raw text with file names' -full_path_format = 'Raw text with full instance paths' -human_file_format = 'List of files' -human_obs_format = 'List of observations' -human_session_format = 'List of sessions' -stage_the_files_human_format = 'stage-the-files-human' - - -@app.route('/search', methods=['GET', 'POST']) -@login_required -def execute_search_ui(): - """The user-facing version of the search feature. - - Note that we perform no verification of the `stage_user` parameter! - (Besides checking that it corresponds to a real system user.) This is - incredibly lame but I'm not keen to build a real login system here. This - means that we let users perform "file giveaways". I believe that this can - be a security threat, but because the files that are given away are ones - that come out of the Librarian, I think the most nefarious thing that can - happen is denial-of-service by filling up someone else's quota. The chown - script deployed at NRAO has safety checks in place to prevent giveaways to - user accounts that are not HERA-using humans. - - """ - if len(request.form): - reqdata = request.form - else: - reqdata = request.args - - query_type = required_arg(reqdata, str, 'type') - search_text = required_arg(reqdata, str, 'search') - output_format = optional_arg(reqdata, str, 'output_format', human_file_format) - stage_user = optional_arg(reqdata, str, 'stage_user', '') - stage_dest_suffix = optional_arg(reqdata, str, 'stage_dest_suffix', '') - for_humans = True - - if output_format == full_path_format: - for_humans = False - query_type = 'names' - elif output_format == file_name_format: - for_humans = False - elif output_format == human_file_format: - for_humans = True - elif output_format == human_obs_format: - for_humans = True - elif output_format == human_session_format: - for_humans = True - elif output_format == stage_the_files_human_format: - for_humans = True - query_type = 'instances-stores' - if request.method == 'GET': - return Response('Staging requires a POST operation', status=400) - if not len(stage_user): - return Response('Stage-files command did not specify the username', status=400) - else: - return Response('Illegal search output type %r' % (output_format, ), status=400) - - status = 200 - - if for_humans: - mimetype = 'text/html' - else: - mimetype = 'text/plain' - - try: - search = compile_search(search_text, query_type=query_type) - - if output_format == full_path_format: - from .file import FileInstance - instances = FileInstance.query.filter(FileInstance.name.in_(search)) - text = '\n'.join(i.full_path_on_store() for i in instances) - elif output_format == file_name_format: - text = '\n'.join(f.name for f in search) - elif output_format == human_file_format: - files = list(search) - - text = render_template( - 'search-results-file.html', - title='Search Results: %d Files' % len(files), - search_text=search_text, - files=files, - error_message=None, - ) - elif output_format == human_obs_format: - obs = list(search) - text = render_template( - 'search-results-obs.html', - title='Search Results: %d Observations' % len(obs), - search_text=search_text, - obs=obs, - error_message=None, - ) - elif output_format == human_session_format: - sess = list(search) - text = render_template( - 'search-results-session.html', - title='Search Results: %d Sessions' % len(sess), - search_text=search_text, - sess=sess, - error_message=None, - ) - elif output_format == stage_the_files_human_format: - # This will DTRT if stage_dest_suffix is empty: - dest_prefix = app.config['local_disk_staging']['dest_prefix'] - stage_dest = os.path.join(dest_prefix, stage_user, stage_dest_suffix) - - try: - final_dest, n_instances, n_bytes = launch_stage_operation( - stage_user, search, stage_dest) - error_message = None - except Exception as e: - app.log_exception(sys.exc_info()) - final_dest = '(ignored)' - n_instances = n_bytes = 0 - error_message = str(e) - - text = render_template( - 'stage-launch-report.html', - title='Staging Results', - search_text=search_text, - final_dest=final_dest, - n_instances=n_instances, - n_bytes=n_bytes, - error_message=error_message, - ) - else: - raise ServerError('internal logic failure mishandled output format') - except Exception as e: - app.log_exception(sys.exc_info()) - status = 400 - - if for_humans: - text = render_template( - 'search-results-file.html', - title='Search Results: Error', - search_text=search_text, - files=[], - error_message=str(e), - ) - else: - text = 'Search resulted in error: %s' % e - - return Response(text, status=status, mimetype=mimetype) - - -stage_the_files_json_format = 'stage-the-files-json' -session_listing_json_format = 'session-listing-json' -file_listing_json_format = 'file-listing-json' -instance_listing_json_format = 'instance-listing-json' -obs_listing_json_format = 'obs-listing-json' - - -@app.route('/api/search', methods=['GET', 'POST']) -@json_api -def execute_search_api(args, sourcename=None): - """JSON API version of the search facility. - - Note that we perform no verification of the `stage_user` parameter! - (Besides checking that it corresponds to a real system user.) This is - incredibly lame but I'm not keen to build a real login system here. - - """ - search_text = required_arg(args, str, 'search') - output_format = required_arg(args, str, 'output_format') - stage_user = optional_arg(args, str, 'stage_user', '') - stage_dest = optional_arg(args, str, 'stage_dest', '') - - if output_format == stage_the_files_json_format: - query_type = 'instances-stores' - - if request.method == 'GET': - raise ServerError('staging requires a POST operation') - if not len(stage_dest): - raise ServerError('stage-files search did not specify destination directory') - if 'local_disk_staging' not in app.config: - raise ServerError('this Librarian does not support local-disk staging') - elif output_format == session_listing_json_format: - query_type = 'sessions' - elif output_format == file_listing_json_format: - query_type = 'files' - elif output_format == instance_listing_json_format: - query_type = 'instances' - elif output_format == obs_listing_json_format: - query_type = 'obs' - else: - raise ServerError('illegal search output type %r', output_format) - - search = compile_search(search_text, query_type=query_type) - - if output_format == stage_the_files_json_format: - final_dest, n_instances, n_bytes = launch_stage_operation(stage_user, search, stage_dest) - return dict( - destination=final_dest, - n_instances=n_instances, - n_bytes=n_bytes, - ) - elif output_format == session_listing_json_format: - return dict( - results=[sess.to_dict() for sess in search], - ) - elif output_format == file_listing_json_format: - return dict( - results=[files.to_dict() for files in search], - ) - elif output_format == instance_listing_json_format: - return dict( - results=[instance.to_dict() for instance in search], - ) - elif output_format == obs_listing_json_format: - return dict( - results=[obs.to_dict() for obs in search], - ) - else: - raise ServerError('internal logic failure mishandled output format') diff --git a/librarian_server/settings.py b/librarian_server/settings.py index 8ece347..095245e 100644 --- a/librarian_server/settings.py +++ b/librarian_server/settings.py @@ -68,6 +68,8 @@ class ServerSettings(BaseSettings): alembic_config_path: str = "." alembic_path: str = "alembic" + max_search_results: int = 64 + @classmethod def from_file(cls, config_path: Path | str) -> "ServerSettings": """ diff --git a/librarian_server/store.py b/librarian_server/store.py deleted file mode 100644 index bb7ea3b..0000000 --- a/librarian_server/store.py +++ /dev/null @@ -1,722 +0,0 @@ -# -*- mode: python; coding: utf-8 -*- -# Copyright 2016 the HERA Collaboration -# Licensed under the BSD License. - -"""Stores. - -So this gets a bit complicated. The `hera_librarian package`, which is used by -both the server and clients, includes a Store class, since Librarian clients -access stores directly by SSH'ing into them. However, here in the server, we -also have database records for every store. I *think* it will not make things -too complicated and crazy to do the multiple inheritance thing we do below, so -that we get the functionality of the `hera_librarian.store.Store` class while -also making our `ServerStore` objects use the SQLAlchemy ORM. If this turns -out to be a dumb idea, we should have the ORM-Store class just be a thin -wrapper that can easily be turned into a `hera_librarian.store.Store` -instance. - -""" - - - -__all__ = str(''' -Store -UploaderTask -OffloaderTask -''').split() - -import os.path - -from hera_librarian.base_store import BaseStore - -from . import app, db, logger -from .webutil import ServerError, json_api, login_required, optional_arg, required_arg - -from .orm.storemetadata import StoreMetadata -from .stores import CoreStore - -class Store: - pass - - -# RPC API - -@app.route('/api/register_instances', methods=['GET', 'POST']) -@json_api -def register_instances(args, sourcename=None): - """In principle, this RPC call is similar to what `initiate_upload` and - `complete_upload` do. However, this function should be called when files - have magically appeared on a store rather than being "uploaded" from some - external source. There is no consistency checking and no staging, and we - always attempt to infer the files' key properties. - - If you are SCP'ing a file to a store, you should be using the - `complete_upload` call, likely via the - `hera_librarian.LibrarianClient.upload_file` routine, rather than this - function. - - Because this API call is most sensibly initiated from a store, the caller - already goes to the work of gathering the basic file info (MD5, size, - etc.) that we're going to need in our inference step. See - `scripts/add_obs_librarian.py` for the implementation. - - """ - store_name = required_arg(args, str, 'store_name') - file_info = required_arg(args, dict, 'file_info') - - from .file import File, FileInstance - - store = Store.get_by_name(store_name) # ServerError if failure - slashed_prefix = store.path_prefix + '/' - - # Sort the files to get the creation times to line up. - - for full_path in sorted(file_info.keys()): - if not full_path.startswith(slashed_prefix): - raise ServerError('file path %r should start with "%s"', - full_path, slashed_prefix) - - # Do we already know about this instance? If so, just ignore it. - - store_path = full_path[len(slashed_prefix):] - parent_dirs = os.path.dirname(store_path) - name = os.path.basename(store_path) - - instance = FileInstance.query.get((store.id, parent_dirs, name)) - if instance is not None: - continue - - # OK, we have to create some stuff. - - file = File.get_inferring_info(store, store_path, sourcename, - info=file_info[full_path]) - inst = FileInstance(store, parent_dirs, name) - db.session.add(inst) - db.session.add(file.make_instance_creation_event(inst, store)) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise ServerError('failed to commit new records to database; see logs for details') - - # Finally, trigger a look at our standing orders. - - from .search import queue_standing_order_copies - queue_standing_order_copies() - - return {} - - -# File uploads and copies -- maybe this should be separated into its own file? - -from . import bgtasks - - -class UploaderTask(bgtasks.BackgroundTask): - """Object that manages the task of copying a file to another Librarian. - - If `known_staging_store` and `known_staging_subdir` are not None, the copy - will be launched assuming that files have already been staged at a known - location at the final destination. This is useful if files have been - copied from one Librarian site to another outside of the Librarian - framework. - - Parameters - ---------- - store : BaseStore object - A BaseStore object corresponding to the originating store. - conn_name : str - The name of the connection to use, as defined in ~/.hl_client.cfg. - rec_info : dict - A dictionary containing database information for the file to be - transferred. - store_path : str - The full path to the file in the local store. - remote_store_path : str, optional - The path to place the file in the destination store. This may be None, - in which case we will request the same "store path" as the FileInstance - in this Librarian. - standing_order_name : str, optional - The standing order corresponding to this upload task. - known_staging_store : str, optional - The store corresponding to the already-uploaded file. Must be specified - if `known_staging_subdir` is specified. - known_staging_subdir : str, optional - The target directory corresponding to the already-uploaded file. Must by - specified if `known_staging_store` is specified. - use_globus : bool, optional - Specify whether to try to use globus to transfer files. - client_id : str, optional - The globus client ID to use for the transfer. - transfer_token : str, optional - The globus transfer token to use for the transfer. - source_endpoint_id : str, optional - The globus endpoint ID of the source store. May be omitted, in which - case we assume it is a "personal" (as opposed to public) client. - """ - t_start = None - t_finish = None - - def __init__( - self, - store, - conn_name, - rec_info, - store_path, - remote_store_path, - standing_order_name=None, - known_staging_store=None, - known_staging_subdir=None, - use_globus=False, - client_id=None, - transfer_token=None, - source_endpoint_id=None, - ): - self.store = store - self.conn_name = conn_name - self.rec_info = rec_info - self.store_path = store_path - self.remote_store_path = remote_store_path - self.standing_order_name = standing_order_name - self.known_staging_store = known_staging_store - self.known_staging_subdir = known_staging_subdir - self.use_globus = use_globus - self.client_id = client_id - self.transfer_token = transfer_token - self.source_endpoint_id = source_endpoint_id - - self.desc = 'upload %s:%s to %s:%s' % (store.name, store_path, - conn_name, remote_store_path or '') - - if standing_order_name is not None: - self.desc += ' (standing order "%s")' % standing_order_name - - def thread_function(self): - import time - self.t_start = time.time() - self.store.upload_file_to_other_librarian( - self.conn_name, - self.rec_info, - self.store_path, - self.remote_store_path, - known_staging_store=self.known_staging_store, - known_staging_subdir=self.known_staging_subdir, - use_globus=self.use_globus, - client_id=self.client_id, - transfer_token=self.transfer_token, - source_endpoint_id=self.source_endpoint_id, - ) - self.t_finish = time.time() - - def wrapup_function(self, retval, exc): - # In principle, we might want different integer error codes if there are - # specific failure modes that we want to be able to analyze without - # parsing the error messages. At the time being, we just use "1" to mean - # that some exception happened. An "error" code of 0 always means success. - - if exc is None: - logger.info('upload of %s:%s => %s:%s succeeded', - self.store.name, self.store_path, self.conn_name, - self.remote_store_path or self.store_path) - error_code = 0 - error_message = 'success' - else: - logger.warn('upload of %s:%s => %s:%s FAILED: %s', - self.store.name, self.store_path, self.conn_name, - self.remote_store_path or self.store_path, exc) - error_code = 1 - error_message = str(exc) - - from .file import File - file = File.query.get(os.path.basename(self.store_path)) - - if error_code != 0: - dt = rate = None - else: - dt = self.t_finish - self.t_start # seconds - dt_eff = max(dt, 0.5) # avoid div-by-zero just in case - rate = file.size / (dt_eff * 1024.) # kilobytes/sec (AKA kB/s) - - - db.session.add(file.make_copy_finished_event(self.conn_name, self.remote_store_path, - error_code, error_message, duration=dt, - average_rate=rate)) - - if self.standing_order_name is not None and error_code == 0: - # XXX keep this name synched with that in search.py:StandingOrder - type = 'standing_order_succeeded:' + self.standing_order_name - db.session.add(file.make_generic_event(type)) - - if error_code == 0: - logger.info('transfer of %s:%s: duration %.1f s, average rate %.1f kB/s', - self.store.name, self.store_path, dt, rate) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise ServerError('failed to commit completion events to database') - - -def launch_copy_by_file_name( - file_name, - connection_name, - remote_store_path=None, - standing_order_name=None, - no_instance='raise', - known_staging_store=None, - known_staging_subdir=None, -): - """Launch a copy of a file to a remote Librarian. - - A ServerError will be raised if no instance of the file is available. - - The copy will be registered as a "background task" that the server will - execute in a separate thread. If the server crashes, information about the - background task will be lost. - - If `remote_store_path` is None, we request that the instance be located in - whatever "store path" was used by the instance we locate. - - If `no_instance` is "raise", an exception is raised if no instance of the - file is available on this location. If it is "return", we return True. - Other values are not allowed. - - If `known_staging_store` and `known_staging_subdir` are not None, the copy - will be launched assuming that files have already been staged at a known - location at the final destination. This is useful if files have been - copied from one Librarian site to another outside of the Librarian - framework. - - """ - # Find a local instance of the file - - from .file import FileInstance - inst = FileInstance.query.filter(FileInstance.name == file_name).first() - if inst is None: - if no_instance == 'raise': - raise ServerError('cannot upload %s: no local file instances with that name', file_name) - elif no_instance == 'return': - return True - else: - raise ValueError('unknown value for no_instance: %r' % (no_instance, )) - - file = inst.file - - # Gather up information describing the database records that the other - # Librarian will need. - - from .misc import gather_records - rec_info = gather_records(file) - - # Figure out if we should try to use globus or not - if app.config.get("use_globus", False): - source_endpoint_id = app.config.get("globus_endpoint_id", None) - try: - client_id = app.config["globus_client_id"] - transfer_token = app.config["globus_transfer_token"] - use_globus = True - except KeyError: - client_id = None - transfer_token = None - use_globus = False - else: - use_globus = False - client_id = None - transfer_token = None - source_endpoint_id = None - - # Launch the background task. We need to convert the Store to a base object since - # the background task can't access the database. - basestore = inst.store_object.convert_to_base_object() - bgtasks.submit_background_task( - UploaderTask( - basestore, - connection_name, - rec_info, - inst.store_path, - remote_store_path, - standing_order_name, - known_staging_store=known_staging_store, - known_staging_subdir=known_staging_subdir, - use_globus=use_globus, - client_id=client_id, - transfer_token=transfer_token, - source_endpoint_id=source_endpoint_id, - ) - ) - - # Remember that we launched this copy. - db.session.add(file.make_copy_launched_event(connection_name, remote_store_path)) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise ServerError('failed to commit copy-launch event to database') - - -@app.route('/api/launch_file_copy', methods=['GET', 'POST']) -@json_api -def launch_file_copy(args, sourcename=None): - """Launch a copy of a file to a remote store. - - """ - file_name = required_arg(args, str, 'file_name') - connection_name = required_arg(args, str, 'connection_name') - remote_store_path = optional_arg(args, str, 'remote_store_path') - known_staging_store = optional_arg(args, str, 'known_staging_store') - known_staging_subdir = optional_arg(args, str, 'known_staging_subdir') - - if (known_staging_store is None) ^ (known_staging_subdir is None): - raise ServerError('if known_staging_store is provided, known_staging_subdir must be ' - 'too, and vice versa') - - launch_copy_by_file_name(file_name, connection_name, remote_store_path, - known_staging_store=known_staging_store, - known_staging_subdir=known_staging_subdir) - return {} - - -@app.route('/api/gather_file_record', methods=['GET', 'POST']) -@json_api -def gather_file_record(args, sourcename=None): - """Get the record info for a file. - - """ - from .file import File - from .misc import gather_records - - file_name = required_arg(args, str, "file_name") - file = File.query.get(file_name) - if file is None: - raise ServerError('no file with that name found') - - rec_info = gather_records(file) - - return rec_info - - -@app.route('/api/create_file_record', methods=['GET', 'POST']) -@json_api -def create_file_record(args, sourcename=None): - """Create file records. - - """ - from .misc import create_records - - create_records(args, sourcename) - - return {} - - -# Offloading files. This functionality was developed for a time when we had to -# use the RTP "still" machines as temporary emergency Librarian stores. After -# the emergency was over, we wanted to transfer their files back to the main -# storage "pot" machine and deactivate the temporary stores. - -class InstanceOffloadInfo (object): - def __init__(self, file_instance): - self.parent_dirs = file_instance.parent_dirs - self.name = file_instance.name - self.success = False - - -class OffloaderTask (bgtasks.BackgroundTask): - """Object that manages the task of offloading file instances from one store to - another, staying on this Librarian. - - """ - - def __init__(self, source_store, dest_store, staging_dir, instance_info): - self.source_store = source_store - self.dest_store = dest_store - self.staging_dir = staging_dir - self.instance_info = instance_info - self.desc = 'offload ~%d instances from %s to %s' \ - % (len(instance_info), source_store.name, dest_store.name) - - def thread_function(self): - # I think it's better to just let the thread crash if anything goes - # wrong, rather than catching exceptions for each file. The offload - # operation is one that should be reliable; if something surprising - # happens, the cautious course of action is to stop trying to futz - # with things. - - for i, info in enumerate(self.instance_info): - # It's conceivable that we could be attempting to move two - # instances of the same file. In that case, their basenames would - # clash in our staging directory. Therefore we mix in the index of - # the instance_info item to uniquify things. - - sourcepath = os.path.join(info.parent_dirs, info.name) - stagepath = os.path.join(self.staging_dir, str(i) + '_' + info.name) - self.source_store.upload_file_to_local_store(sourcepath, self.dest_store, stagepath) - info.success = True - - def wrapup_function(self, retval, exc): - from .file import DeletionPolicy, FileInstance - - # Yay, we can access the database again! We need it to delete all of - # the instances that we *successfully* copied. We also need to turn - # the stores back into a DB-ified objects to do what we need to do. - - source_store = Store.get_by_name(self.source_store.name) - dest_store = Store.get_by_name(self.dest_store.name) - - if exc is None: - logger.info('instance offload %s => %s succeeded', - source_store.name, dest_store.name) - else: - # If the thread crashed, our state information should still be - # reasonable, and we might as well complete any offloads that may - # have actually copied successfully. So we pretty much ignore the - # fact that an exception occurred. - logger.warn('instance offload %s => %s FAILED: %s', - source_store.name, dest_store.name, exc) - - # For all successful copies, we need to un-stage the file in the usual - # way. If that worked, we mark the original instance as being - # deleteable. The command-line client give the user a query that will - # safely remove thee redundant instances using the standard deletion - # mechanism. - # - # Here we *are* paranoid about exceptions. - - pmode = app.config.get('permissions_mode', 'readonly') - need_chmod = (pmode == 'readonly') - - for i, info in enumerate(self.instance_info): - desc_name = '%s:%s/%s' % (source_store.name, info.parent_dirs, info.name) - - if not info.success: - logger.warn('offload thread did not succeed on instance %s', desc_name) - continue - - try: - source_inst = FileInstance.query.get((source_store.id, info.parent_dirs, info.name)) - except Exception as e: - logger.warn('offloader wrapup: no instance %s; already deleted?', desc_name) - continue - - stagepath = os.path.join(self.staging_dir, str(i) + '_' + source_inst.name) - - try: - dest_store.process_staged_file(stagepath, source_inst.store_path, - 'direct', source_inst.deletion_policy) - except Exception as e: - logger.warn('offloader failed to complete upload of %s', - source_inst.descriptive_name()) - continue - - # If we're still here, the copy succeeded and the destination - # store has a shiny new instance. Mark the source instance as - # deleteable. - - logger.info('offloader: marking "%s" for deletion', source_inst.descriptive_name()) - source_inst.deletion_policy = DeletionPolicy.ALLOWED - db.session.add(source_inst.file.make_generic_event('instance_deletion_policy_changed', - store_name=source_inst.store_object.name, - parent_dirs=source_inst.parent_dirs, - new_policy=DeletionPolicy.ALLOWED, - context='offload')) - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - logger.error('offloader: failed to commit db changes; continuing') - - # Finally, we can blow away the staging directory. - - logger.info('offloader: processing complete; clearing staging directory "%s"', self.staging_dir) - dest_store._delete(self.staging_dir) - - -OFFLOAD_BATCH_SIZE = 200 - - -@app.route('/api/initiate_offload', methods=['GET', 'POST']) -@json_api -def initiate_offload(args, sourcename=None): - """Launch a task to offload file instances from one store to another. - - This launches a background task that copies file instances from a source - store to a destination store, then marks the source instances for - deletion. If the source store is out of instances, it is marked as - unavailable. Repeated calls, combined with appropriate deletion commands, - will therefore eventually drain the source store of all its contents so - that it can be shut down. - - To keep each task reasonably-sized, there is a limit to the number of - files that may be offloaded in each call to this API. Just keep calling it - until the source store is emptied. The actual number of instances - transferred in each batch is unpredictable because instances may be added - to or removed from the store while the offload operation is running. - - Note that this API just launches the background task and returns quickly, - so it can't provide the caller with any information about whether the - offload operation is successful. You need to look at the Librarian logs or - task monitoring UI to check that. - - This API is motivated by a time when we needed to create some temporary - stores to provide emergency backstop disk space. Once the emergency was - over, we wanted to shut down these temporary stores. - - Due to this origin, this API is quite limited: for instance, you cannot - choose *which* file instances to offload in each call. - - """ - source_store_name = required_arg(args, str, 'source_store_name') - dest_store_name = required_arg(args, str, 'dest_store_name') - - from sqlalchemy import func - from sqlalchemy.orm import aliased - from .file import FileInstance - - source_store = Store.get_by_name(source_store_name) # ServerError if failure - dest_store = Store.get_by_name(dest_store_name) - - # Gather information about instances in the source store that we'll try to - # transfer. Background tasks can't access the database, so we need to - # pre-collect this information. We want instances this store that do not - # correspond to files that have instances on other stores, which results in - # some moderately messy SQL. - - inst_alias = aliased(FileInstance) - - n_other_stores = (db.session.query(func.count()) - .filter(inst_alias.name == FileInstance.name) - .filter(inst_alias.store != source_store.id) - .as_scalar()) - - q = (FileInstance.query - .filter(FileInstance.store == source_store.id) - .filter(n_other_stores == 0) - .limit(OFFLOAD_BATCH_SIZE)) - - info = [InstanceOffloadInfo(i) for i in q] - - # If no such instances exist, mark the store as unavailable, essentially - # clearing it for deletion, and return. - - if not len(info): - source_store.available = False - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - raise ServerError('offload: failed to mark store as unavailable') - - return {'outcome': 'store-shut-down'} - - # Otherwise, we're going to launch an offloader task. Create a staging - # directory and fire off the task. - - staging_dir = dest_store._create_tempdir('offloader') - base_source = source_store.convert_to_base_object() # again: can't access DB - base_dest = dest_store.convert_to_base_object() - - bgtasks.submit_background_task(OffloaderTask( - base_source, base_dest, staging_dir, info)) - - return {'outcome': 'task-launched', 'instance-count': len(info)} - - -@app.route('/stores//make-available', methods=['POST']) -@login_required -def make_store_available(name): - try: - store = Store.get_by_name(name) - except ServerError as e: - flash(str(e)) - return redirect(url_for('stores')) - - store.available = True - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - flash('Failed to update database?! See server logs for details.') - return redirect(url_for('stores')) - - flash('Marked store "%s" as available' % store.name) - return redirect(url_for('stores') + '/' + store.name) - - -@app.route('/stores//make-unavailable', methods=['POST']) -@login_required -def make_store_unavailable(name): - try: - store = Store.get_by_name(name) - except ServerError as e: - flash(str(e)) - return redirect(url_for('stores')) - - store.available = False - - try: - db.session.commit() - except SQLAlchemyError: - db.session.rollback() - app.log_exception(sys.exc_info()) - flash('Failed to update database?! See server logs for details.') - return redirect(url_for('stores')) - - flash('Marked store "%s" as unavailable' % store.name) - return redirect(url_for('stores') + '/' + store.name) - - -# Web user interface - -@app.route('/stores') -@login_required -def stores(): - q = Store.query.order_by(Store.name.asc()) - return render_template( - 'store-listing.html', - title='Stores', - stores=q - ) - - -@app.route('/stores/') -@login_required -def specific_store(name): - from sqlalchemy import func - - try: - store = Store.get_by_name(name) - except ServerError as e: - flash(str(e)) - return redirect(url_for('stores')) - - from .file import FileInstance - num_instances = (db.session.query(func.count()) - .filter(FileInstance.store == store.id) - .scalar()) - - if store.available: - toggle_action = 'make-unavailable' - toggle_description = 'Make unavailable' - else: - toggle_action = 'make-available' - toggle_description = 'Make available' - - return render_template( - 'store-individual.html', - title='Store %s' % (store.name), - store=store, - num_instances=num_instances, - toggle_action=toggle_action, - toggle_description=toggle_description, - ) diff --git a/tests/conftest.py b/tests/conftest.py index 9d6289c..eaeb4ae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -320,3 +320,65 @@ def test_server_with_missing_file(test_server, test_orm): session.commit() session.close() + + + +@pytest.fixture(scope="function") +def test_server_with_many_files(test_server, test_orm): + """ + Test server with a valid file and instance in the store. + """ + + session = test_server[1]() + + store = session.query(test_orm.StoreMetadata).first() + + data = random.randbytes(1024) + + file_names = [f"many_server_example_file_{x}.txt" for x in range(128)] + + for file_name in file_names: + file = test_orm.File.new_file( + filename=file_name, + size=len(data), + checksum=hashlib.md5(data).hexdigest(), + uploader="test", + source="test", + ) + + # Create the file in the store + path = store.store_manager._resolved_path_store(Path(file.name)) + + with open(path, "wb") as handle: + handle.write(data) + + instance = test_orm.Instance.new_instance( + path=path, + file=file, + store=store, + deletion_policy="ALLOWED", + ) + + session.add_all([file, instance]) + + session.commit() + + session.close() + + yield test_server + + # Now delete those items from the database. + + session = test_server[1]() + + for file_name in file_names: + file = session.get(test_orm.File, file_name) + instance = file.instances[0] + + session.delete(file) + session.delete(instance) + + session.commit() + session.close() + + path.unlink() diff --git a/tests/server_unit_test/test_search.py b/tests/server_unit_test/test_search.py new file mode 100644 index 0000000..520cf82 --- /dev/null +++ b/tests/server_unit_test/test_search.py @@ -0,0 +1,85 @@ +""" +Test the search endpoint. +""" + +import datetime + +from hera_librarian.models.search import (FileSearchFailedResponse, + FileSearchRequest, + FileSearchResponse, + FileSearchResponses) + + +def test_search_by_filename(test_server_with_many_files, test_client): + request = FileSearchRequest(name="many_server_example_file_0.txt") + + response = test_client.post( + "/api/v2/search/file", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 200 + + response = FileSearchResponses.model_validate_json(response.content) + + +def test_search_by_created_time(test_server_with_many_files, test_client): + request = FileSearchRequest( + create_time_window=( + datetime.datetime.utcnow() - datetime.timedelta(days=1), + datetime.datetime.utcnow(), + ) + ) + + response = test_client.post( + "/api/v2/search/file", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 200 + + response = FileSearchResponses.model_validate_json(response.content) + + +def test_search_by_source(test_server_with_many_files, test_client): + request = FileSearchRequest(source="test") + + response = test_client.post( + "/api/v2/search/file", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 200 + + response = FileSearchResponses.model_validate_json(response.content) + + +def test_search_by_uploader(test_server_with_many_files, test_client): + request = FileSearchRequest(uploader="test") + + response = test_client.post( + "/api/v2/search/file", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 200 + + response = FileSearchResponses.model_validate_json(response.content) + + +def test_failed_search(test_server_with_many_files, test_client): + request = FileSearchRequest(name="this_file_does_not_exist") + + response = test_client.post( + "/api/v2/search/file", + headers={"Content-Type": "application/json"}, + content=request.model_dump_json(), + ) + + assert response.status_code == 404 + + response = FileSearchFailedResponse.model_validate_json(response.content)