diff --git a/CHANGES.rst b/CHANGES.rst index 2bba42d1f..dd6abefd3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -35,11 +35,13 @@ Fixes * lineup2() should work with low intensity peaks. * lineup2() would raise ZeroDivideError in some cases. +* Increase minimum aps-dm-api version to 8. Maintenance ----------- * Code format conforms to 'ruff'. +* Add additional support for APS Data Management API. 1.6.18 ****** diff --git a/apstools/devices/aps_data_management.py b/apstools/devices/aps_data_management.py index d8bf7fb47..de1da4f74 100644 --- a/apstools/devices/aps_data_management.py +++ b/apstools/devices/aps_data_management.py @@ -22,7 +22,6 @@ ~DM_WorkflowConnector -from: https://github.com/APS-1ID-MPE/hexm-bluesky/blob/main/instrument/devices/data_management.py """ __all__ = """ @@ -30,26 +29,23 @@ """.split() import logging -import os import time -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) # allow any log content at this level -logger.info(__file__) - from ophyd import Component from ophyd import Device from ophyd import Signal from ..utils import run_in_thread -DM_STATION_NAME = str(os.environ.get("DM_STATION_NAME", "terrier")).lower() +logger = logging.getLogger(__name__) + NOT_AVAILABLE = "-n/a-" NOT_RUN_YET = "not_run" +POLLING_PERIOD_S = 1.0 REPORT_PERIOD_DEFAULT = 10 REPORT_PERIOD_MIN = 1 STARTING = "running" -TIMEOUT_DEFAULT = 180 # TODO: Consider removing the timeout feature +TIMEOUT_DEFAULT = 180 # TODO: Consider removing/renaming the timeout feature class DM_WorkflowConnector(Device): @@ -88,9 +84,9 @@ class DM_WorkflowConnector(Device): """ job = None # DM processing job (must update during workflow execution) - _api = None # DM common API + _api = None # DM processing API - owner = Component(Signal, value=DM_STATION_NAME, kind="config") + owner = Component(Signal, value="", kind="config") workflow = Component(Signal, value="") workflow_args = {} @@ -101,7 +97,7 @@ class DM_WorkflowConnector(Device): stage_id = Component(Signal, value=NOT_RUN_YET) status = Component(Signal, value=NOT_RUN_YET) - polling_period = Component(Signal, value=0.1, kind="config") + polling_period = Component(Signal, value=POLLING_PERIOD_S, kind="config") reporting_period = Component(Signal, value=REPORT_PERIOD_DEFAULT, kind="config") concise_reporting = Component(Signal, value=True, kind="config") @@ -127,9 +123,11 @@ def __init__(self, name=None, workflow=None, **kwargs): if name is None: raise KeyError("Must provide value for 'name'.") super().__init__(name=name) + if workflow is not None: self.workflow.put(workflow) self.workflow_args.update(kwargs) + self.owner.put(self.api.username) def put_if_different(self, signal, value): """Put ophyd signal only if new value is different.""" @@ -187,7 +185,12 @@ def report_status(self, t_offset=None): self.report_processing_stages() def start_workflow(self, workflow="", timeout=TIMEOUT_DEFAULT, **kwargs): - """Kickoff a DM workflow with optional wait & timeout.""" + """ + Kickoff a DM workflow with optional reporting timeout. + + The reporting process will continue until the workflow ends or the + timeout period is exceeded. It does not affect the actual workflow. + """ if workflow == "": workflow = self.workflow.get() else: @@ -224,7 +227,11 @@ def _cleanup(): @run_in_thread def _run_DM_workflow_thread(): - logger.info("run DM workflow: %s with timeout=%s s", self.workflow.get(), timeout) + logger.info( + "run DM workflow: %s with reporting time limit=%s s", + self.workflow.get(), + timeout, + ) self.job = self.api.startProcessingJob( workflowOwner=self.owner.get(), workflowName=workflow, @@ -266,7 +273,13 @@ def _run_DM_workflow_thread(): self.status.subscribe(_reporter) _run_DM_workflow_thread() - def run_as_plan(self, workflow="", wait=True, timeout=TIMEOUT_DEFAULT, **kwargs): + def run_as_plan( + self, + workflow: str = "", + wait: bool = True, + timeout: int = TIMEOUT_DEFAULT, + **kwargs, + ): """Run the DM workflow as a bluesky plan.""" from bluesky import plan_stubs as bps diff --git a/apstools/devices/lakeshore_controllers.py b/apstools/devices/lakeshore_controllers.py index b4f739602..86e587acf 100644 --- a/apstools/devices/lakeshore_controllers.py +++ b/apstools/devices/lakeshore_controllers.py @@ -26,12 +26,9 @@ from ophyd import Signal from ..synApps import AsynRecord +from ..utils import HOUR from . import PVPositionerSoftDoneWithStop -SECOND = 1 -MINUTE = 60 * SECOND -HOUR = 60 * MINUTE - class LakeShore336_LoopControl(PVPositionerSoftDoneWithStop): """ diff --git a/apstools/devices/measComp_usb_ctr_support.py b/apstools/devices/measComp_usb_ctr_support.py index dfb222a72..055d83880 100644 --- a/apstools/devices/measComp_usb_ctr_support.py +++ b/apstools/devices/measComp_usb_ctr_support.py @@ -120,13 +120,19 @@ class MeasCompCtr(Device): # https://github.com/epics-modules/measComp/blob/master/measCompApp/Db/measCompDevice.template model_name = Component(EpicsSignalRO, "ModelName", kind="config", string=True) model_number = Component(EpicsSignalRO, "ModelNumber", kind="config") - firmware_version = Component(EpicsSignalRO, "FirmwareVersion", kind="config", string=True) + firmware_version = Component( + EpicsSignalRO, "FirmwareVersion", kind="config", string=True + ) unique_id = Component(EpicsSignalRO, "UniqueID", kind="config", string=True) ul_version = Component(EpicsSignalRO, "ULVersion", kind="config", string=True) - driver_version = Component(EpicsSignalRO, "DriverVersion", kind="config", string=True) + driver_version = Component( + EpicsSignalRO, "DriverVersion", kind="config", string=True + ) poll_time_ms = Component(EpicsSignalRO, "PollTimeMS", kind="config") poll_sleep_ms = Component(EpicsSignalRO, "PollSleepMS", kind="config") - last_error_message = Component(EpicsSignalRO, "LastErrorMessage", kind="config", string=True) + last_error_message = Component( + EpicsSignalRO, "LastErrorMessage", kind="config", string=True + ) # https://github.com/epics-modules/measComp/blob/master/measCompApp/Db/USBCTR.substitutions long_in = Component(EpicsSignalRO, "Li") diff --git a/apstools/devices/tests/test_aps_data_management.py b/apstools/devices/tests/test_aps_data_management.py index 4d1859187..b983628d9 100644 --- a/apstools/devices/tests/test_aps_data_management.py +++ b/apstools/devices/tests/test_aps_data_management.py @@ -7,9 +7,6 @@ import pytest from .. import DM_WorkflowConnector -from ..aps_data_management import DM_STATION_NAME - -# from dm.common.exceptions.dmException import DmException @pytest.mark.parametrize("wf_name", ["a_workflow_name"]) @@ -18,7 +15,7 @@ def test_object(wf_name): assert wf is not None assert wf.workflow.get() == wf_name - assert wf.owner.get() == DM_STATION_NAME + assert wf.owner.get() in ("", None) try: # Force a test that dm package can be imported. @@ -35,6 +32,8 @@ def test_object(wf_name): "Connection refused" in err_str or "invalid literal for int() with base 10" in err_str + or + "Invalid owner name provided" in err_str ) # fmt: on diff --git a/apstools/utils/__init__.py b/apstools/utils/__init__.py index a65ce1c22..1744d0bca 100644 --- a/apstools/utils/__init__.py +++ b/apstools/utils/__init__.py @@ -1,9 +1,35 @@ from ._core import TableStyle - from .aps_data_management import dm_setup - +from .aps_data_management import build_run_metadata_dict +from .aps_data_management import dm_add_workflow +from .aps_data_management import dm_api_cat +from .aps_data_management import dm_api_daq +from .aps_data_management import dm_api_dataset_cat +from .aps_data_management import dm_api_ds +from .aps_data_management import dm_api_file +from .aps_data_management import dm_api_filecat +from .aps_data_management import dm_api_proc +from .aps_data_management import dm_file_ready_to_process +from .aps_data_management import dm_get_daqs +from .aps_data_management import dm_get_experiment_datadir_active_daq +from .aps_data_management import dm_get_experiment_file +from .aps_data_management import dm_get_experiment_path +from .aps_data_management import dm_get_experiments +from .aps_data_management import dm_get_workflow +from .aps_data_management import dm_source_environ +from .aps_data_management import dm_start_daq +from .aps_data_management import dm_station_name +from .aps_data_management import dm_stop_daq +from .aps_data_management import dm_update_workflow +from .aps_data_management import dm_upload +from .aps_data_management import get_workflow_last_stage +from .aps_data_management import share_bluesky_metadata_with_dm +from .aps_data_management import validate_experiment_dataDirectory +from .aps_data_management import wait_dm_upload +from .aps_data_management import DEFAULT_UPLOAD_TIMEOUT +from .aps_data_management import DEFAULT_UPLOAD_POLL_PERIOD +from .aps_data_management import DM_WorkflowCache from .apsu_controls_subnet import warn_if_not_aps_controls_subnet - from .catalog import copy_filtered_catalog from .catalog import findCatalogsInNamespace from .catalog import getCatalog @@ -63,6 +89,22 @@ from .spreadsheet import ExcelDatabaseFileBase from .spreadsheet import ExcelDatabaseFileGeneric from .spreadsheet import ExcelReadError +from .time_constants import DAY +from .time_constants import HOUR +from .time_constants import MINUTE +from .time_constants import SECOND +from .time_constants import WEEK +from .time_constants import ts2iso + +# ----------------------------------------------------------------------------- +# :author: Pete R. Jemian +# :email: jemian@anl.gov +# :copyright: (c) 2017-2024, UChicago Argonne, LLC +# +# Distributed under the terms of the Argonne National Laboratory Open Source License. +# +# The full license is in the file LICENSE.txt, distributed with this software. +# ----------------------------------------------------------------------------- # ----------------------------------------------------------------------------- # :author: Pete R. Jemian diff --git a/apstools/utils/aps_data_management.py b/apstools/utils/aps_data_management.py index a314f3a78..7f9805289 100644 --- a/apstools/utils/aps_data_management.py +++ b/apstools/utils/aps_data_management.py @@ -2,41 +2,694 @@ """ Setup for for this beam line's APS Data Management Python API client. -This setup must be done before the first DM_WorkflowConnector() object is -created. The ``setup_file`` is the bash shell script provided by the APS Data -Management for the user's account. +FIRST + +The ``dm_setup(setup_file)`` function **must** be called first, +before any other calls to the ``dm`` package. The ``setup_file`` +argument is the bash script that activates the APS Data Management +conda environment for the workstation. That file contains definitions +of environment variables needed by the functions below. + +.. autosummary:: + + ~dm_setup + +.. DEVELOPERS NOTE + Do not import 'dm' or any of its children at the global level in + this file. This allows the file to be imported in a Python + environment that does not have the 'aps-dm-api' package installed. + If any of the functions are called (that attempt to import from 'dm', + those imports will raise exceptions as they are called.) + +FUNCTIONS + +.. autosummary:: + + ~build_run_metadata_dict + ~dm_add_workflow + ~dm_api_cat + ~dm_api_daq + ~dm_api_dataset_cat + ~dm_api_ds + ~dm_api_file + ~dm_api_filecat + ~dm_api_proc + ~dm_file_ready_to_process + ~dm_get_daqs + ~dm_get_experiment_datadir_active_daq + ~dm_get_experiment_file + ~dm_get_experiment_path + ~dm_get_experiments + ~dm_get_workflow + ~dm_source_environ + ~dm_start_daq + ~dm_station_name + ~dm_stop_daq + ~dm_update_workflow + ~dm_upload + ~get_workflow_last_stage + ~share_bluesky_metadata_with_dm + ~validate_experiment_dataDirectory + ~wait_dm_upload + ~DM_WorkflowCache + +:see: https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/Workflow-Processing-Service """ +__all__ = """ + build_run_metadata_dict + dm_add_workflow + dm_api_cat + dm_api_daq + dm_api_dataset_cat + dm_api_ds + dm_api_file + dm_api_filecat + dm_api_proc + dm_file_ready_to_process + dm_get_daqs + dm_get_experiment_datadir_active_daq + dm_get_experiment_file + dm_get_experiment_path + dm_get_experiments + dm_get_workflow + dm_setup + dm_source_environ + dm_start_daq + dm_station_name + dm_stop_daq + dm_update_workflow + dm_upload + get_workflow_last_stage + share_bluesky_metadata_with_dm + validate_experiment_dataDirectory + wait_dm_upload + DEFAULT_UPLOAD_TIMEOUT + DEFAULT_UPLOAD_POLL_PERIOD + DM_WorkflowCache +""".split() + +import datetime +import json import logging -import os import pathlib +import time +from os import environ + +import pyRestTable + +from bluesky import plan_stubs as bps + +from .time_constants import MINUTE +from .time_constants import SECOND +from .time_constants import ts2iso logger = logging.getLogger(__name__) +DEFAULT_PERIOD = 10 * SECOND +DEFAULT_WAIT = True +DEFAULT_DM_EXPERIMENT_KEYS = """ + id name startDate experimentType experimentStation +""".split() +DEFAULT_UPLOAD_TIMEOUT = 10 * MINUTE +DEFAULT_UPLOAD_POLL_PERIOD = 30 * SECOND +DM_SETUP_FILE = None +WORKFLOW_DONE_STATES = "done failed timeout aborted".split() +DM_ENV_SOURCED = False + def dm_setup(setup_file): """ - Configure data management from its bash setup script. + Name the APS Data Management bash script that activates its conda environment. The return result defines the ``BDP_WORKFLOW_OWNER`` symbol. """ - env_var_file = pathlib.Path(setup_file) - logger.info("APS DM environment file: %s", str(env_var_file)) - env_vars = {} - with open(env_var_file) as script: - for line in script.readlines(): - if not line.startswith("export "): - continue - key, value = line.strip().split()[-1].split("=") - env_vars[key] = value + global DM_SETUP_FILE - # Assumed env_var_file is a bash script. What if it is not? - key = "DM_STATION_NAME" - if key not in env_vars and key not in os.environ: - raise KeyError(f"Did not find expected {key!r}") + if not pathlib.Path(setup_file).exists(): + DM_SETUP_FILE = None + raise FileExistsError(f"{setup_file=} does not exist.") + DM_SETUP_FILE = setup_file - os.environ.update(env_vars) - bdp_workflow_owner = os.environ["DM_STATION_NAME"].lower() + dm_source_environ() + bdp_workflow_owner = environ["DM_STATION_NAME"].lower() logger.info("APS DM workflow owner: %s", bdp_workflow_owner) return bdp_workflow_owner + + +def build_run_metadata_dict(user_md: dict, **dm_kwargs) -> dict: + """Return a dictionary for use as Bluesky run metadata.""" + _md = { + "title": "title placeholder", + "description": "description placeholder", + "datetime": str(datetime.datetime.now()), + "data_management": dm_kwargs, + } + _md.update(user_md) + return _md + + +def validate_experiment_dataDirectory(dm_experiment_name: str): + """These bluesky plans use the experiment's 'dataDirectory'.""" + # Check that named experiment actually exists now. + # Raises dm.ObjectNotFound if does not exist. + experiment = dm_api_ds().getExperimentByName(dm_experiment_name) + if "dataDirectory" not in experiment: + # Cannot test that it exists since bluesky user might not have + # access to that file system or permission to read that directory. + raise KeyError(f"{dm_experiment_name=!r} does not have a 'dataDirectory'.") + + +def dm_add_workflow(workflow_file): + """Add APS Data Management workflow from file.""" + return dm_api_proc().addWorkflow(json.loads(open(workflow_file).read())) + + +def dm_get_workflow(workflow_name: str): + """Get named APS Data Management workflow.""" + api = dm_api_proc() + return api.getWorkflowByName(api.username, workflow_name) + + +def dm_update_workflow(workflow_file): + """Update APS Data Management workflow from file.""" + return dm_api_proc().updateWorkflow(json.loads(open(workflow_file).read())) + + +def dm_api_cat(): + """Return the APS Data Management Catalog API object.""" + from dm import CatApiFactory + + dm_source_environ() + return CatApiFactory.getRunCatApi() + + +def dm_api_dataset_cat(): + """Return the APS Data Management Dataset Metadata Catalog API object.""" + from dm import CatApiFactory + + dm_source_environ() + return CatApiFactory.getDatasetCatApi() + + +def dm_api_filecat(): + """Return the APS Data Management Metadata Catalog Service API object.""" + from dm import CatApiFactory + + dm_source_environ() + return CatApiFactory.getFileCatApi() + + +def dm_api_daq(): + """Return the APS Data Management Data Acquisition API object.""" + from dm import DaqApiFactory + + dm_source_environ() + api = DaqApiFactory.getExperimentDaqApi() + return api + + +def dm_api_ds(): + """Return the APS Data Management Data Storage API object.""" + from dm import DsApiFactory + + dm_source_environ() + api = DsApiFactory.getExperimentDsApi() + return api + + +def dm_api_file(): + """Return the APS Data Management File API object.""" + from dm import DsApiFactory + + dm_source_environ() + api = DsApiFactory.getFileDsApi() + return api + + +def dm_api_proc(): + """Return the APS Data Management Processing API object.""" + from dm import ProcApiFactory + + dm_source_environ() + api = ProcApiFactory.getWorkflowProcApi() + return api + + +def dm_get_daqs(experimentName: str): + """ + Return list of APS Data Management DAQ(s) for this experiment. + + PARAMETERS + + experimentName *str*: + Name of the APS Data Management experiment. + + RETURNS + + List of matching DAQ dictionaries. + """ + api = dm_api_daq() + # fmt: off + return [ + daq + for daq in api.listDaqs() + if daq.get("experimentName") == experimentName + ] + # fmt: on + + +def dm_file_ready_to_process( + experimentFilePath: str, # path (abs or rel) to a file + experimentName: str, + compression: str = "", + retrieveMd5Sum: bool = False, +) -> bool: + """ + Does DM determine the named file is ready for processing? + """ + return ( + dm_api_file() + .statFile(experimentFilePath, experimentName, compression, retrieveMd5Sum) + .get("readyForProcessing", False) + ) + + +def dm_source_environ(): + """ + Add APS Data Management environment variable definitions to this process. + + This function reads the bash script, searching for lines that start with + "export ". Such lines define bash shell environment variables in the bash + script. This function adds those environment variables to the current + environment. + + BASH COMMAND SUGGESTIONS:: + + source /home/dm/etc/dm.setup.sh + + source ~/DM/etc/dm.setup.sh + + The suggestions follow a pattern: ``${DM_ROOT}/etc/dm.setup.sh`` where + ``DM_ROOT`` is the location of the DM tools as installed in the current user + account. + """ + global DM_ENV_SOURCED + + if DM_ENV_SOURCED: + return + + if DM_SETUP_FILE is None: + raise ValueError("DM setup file is undefined. First call 'dm_setup(setup_file)'.") + + def chop(text): + return text.strip().split()[-1].split("=") + + # fmt: off + ev = { + chop(line)[0]: chop(line)[-1] + for line in open(DM_SETUP_FILE).readlines() + if line.startswith("export ") + } + if len(ev) == 0: + raise KeyError( + f"No environment variable definitions found in: {DM_SETUP_FILE}" + ) + environ.update(ev) + DM_ENV_SOURCED = True + # fmt: on + + +def dm_start_daq(experimentName: str, dataDirectory: str, **daqInfo): + """ + Start APS DM data acquisition (real-time directory monitoring and file upload). + + PARAMETERS + + experimentName *str*: + Name of the APS Data Management experiment. + dataDirectory: + data directory URL + daqInfo *dict*: + Dictionary of optional metadata (key/value pairs) describing data acquisition. + See https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/API-Reference/DAQ-Service#dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi.startDaq + for details. + + RETURNS + + - daqInfo dictionary + + """ + ret_daqInfo = dm_api_daq().startDaq(experimentName, dataDirectory, daqInfo) + return ret_daqInfo + + +def dm_stop_daq(experimentName: str, dataDirectory: str): + """ + Stop APS DM data acquisition (real-time directory monitoring and file upload). + + PARAMETERS + + experimentName *str*: + Name of the APS Data Management experiment. + dataDirectory: + data directory URL + + """ + dm_api_daq().stopDaq(experimentName, dataDirectory) + + +def dm_station_name(): + """Return the APS Data Management station name or ``None`` if not found.""" + dm_source_environ() + nm = environ.get("DM_STATION_NAME") + if nm is not None: + return str(nm).lower() + + +def dm_upload(experimentName: str, dataDirectory: str, **daqInfo): + """ + Start APS DM data acquisition file upload. + + PARAMETERS + + experimentName *str*: + Name of the APS Data Management experiment. + dataDirectory: + data directory URL + daqInfo *dict*: + Dictionary of optional metadata (key/value pairs) describing data acquisition. + See https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/API-Reference/DAQ-Service#dm.daq_web_service.api.experimentDaqApi.ExperimentDaqApi.startDaq + for details. + + .. seealso:: The ``wait_dm_upload()`` function in this module. + """ + dm_api_daq().upload(experimentName, dataDirectory, daqInfo) + + +def wait_dm_upload( + experiment_name: str, + experiment_file: str, + timeout: float = DEFAULT_UPLOAD_TIMEOUT, + poll_period: float = DEFAULT_UPLOAD_POLL_PERIOD, +): + """ + (bluesky plan) Wait for APS DM data acquisition to upload a file. + + PARAMETERS + + - experiment_name *str*: Name of the APS Data Management experiment. + - experiment_file *str* Name (and path) of file in DM. + - timeout *float*: Number of seconds to wait before raising a 'TimeoutError'. + - poll_period *float*: Number of seconds to wait before check DM again. + + RAISES + + - TimeoutError: if DM does not identify file within 'timeout' (seconds). + + """ + from dm import ObjectNotFound + + t0 = time.time() + deadline = t0 + timeout + yield from bps.null() # now, it's a bluesky plan + + while time.time() <= deadline: + try: + # either this succeeds or raises an exception + dm_get_experiment_file(experiment_name, experiment_file) + return # if successful + except ObjectNotFound: + yield from bps.sleep(poll_period) + + raise TimeoutError( + f"No such file={experiment_file!r} found" f" in DM {experiment_name=!r}" f" after {time.time()-t0:.1f} s." + ) + + +# def dm_add_experiment(experiment_name, typeName=None, **kwargs): +# """Create a new experiment. (Use sparingly, if ever.)""" +# typeName = typeName or "BDP" # TODO: generalize, TEST, XPCS8, ... +# dm_api_ds().addExperiment(experiment_name, typeName=typeName, **kwargs) + + +# def dm_delete_experiment(reference): +# """Delete ALL of an existing experiment. (No recovering from this!)""" +# api = dm_api_ds() +# if isinstance(reference, int): +# api.deleteExperimentById(reference) +# elif isinstance(reference, str): +# api.deleteExperimentByName(reference) + + +def dm_get_experiment_file(experiment_name: str, experiment_file: str): + """ + Get experiment file. + + PARAMETERS + + experiment_name *str*: + Name of the APS Data Management experiment. The experiment must exist. + + experiment_file *str*: + Name (with path) of the experiment file. + + RETURNS + + FileMetadata object. + + RAISES + + - InvalidRequest – in case experiment name or file path have not been provided + - AuthorizationError – in case user is not authorized to manage DM station + - ObjectNotFound – in case file with a given path does not exist + - DmException – in case of any other errors + """ + api = dm_api_filecat() + return api.getExperimentFile(experiment_name, experiment_file) + + +def dm_get_experiment_path(experiment_name: str): + """ + Return the storageDirectory for the named APS Data Management experiment as a path. + + PARAMETERS + + experiment_name *str*: + Name of the APS Data Management experiment. The experiment must exist. + + RETURNS + + Data directory for the experiment, as pathlib.Path object. + + RAISES + + dm.ObjectNotFound: + When experiment is not found. + """ + api = dm_api_ds() + path = api.getExperimentByName(experiment_name).get("storageDirectory") + if path is not None: + path = pathlib.Path(path) + return path + + +def dm_get_experiments(keys=DEFAULT_DM_EXPERIMENT_KEYS, table=False, default_value="-na-"): + """ + Get the most recent APS Data Management experiments (for the current station). + + Return result as either a list or a pyRestTable object (see ``table``). + + PARAMETERS: + + keys *[str]*: + Data keys to be shown in the table. + table *bool*: + If ``False`` (default), return a Python list. + If ``True``, return a pyRestTable ``Table()`` object. + default_value *str*: + Table value if no data available for that key. + """ + experiments = dm_api_ds().getExperimentsByStation() + if table and len(experiments) > 0: + if not isinstance(keys, (list, tuple)) or len(keys) == 0: + keys = experiments[0].DEFAULT_KEY_LIST + tbl = pyRestTable.Table() + tbl.labels = keys + for entry in experiments: + row = [] + for key in keys: + value = entry.data.get(key, default_value) + if isinstance(value, dict): + value = value.get("description", value) + # do this in steps, value might be modified (ts2iso, for example) + row.append(value) + # datetime + tbl.addRow(row) + return tbl + else: + return experiments + + +def get_workflow_last_stage(workflow_name): + """ + Return the name of the last stage in the named APS Data Management workflow. + """ + return list(dm_get_workflow(workflow_name)["stages"])[-1] + + +def share_bluesky_metadata_with_dm(experimentName: str, workflow_name: str, run, should_raise: bool = False): + """ + Once a bluesky run ends, share its metadata with APS DM. + + Only upload if we have a workflow. + """ + import uuid + + from dm import InvalidArgument + + api = dm_api_dataset_cat() + + datasetInfo = { + "experimentName": experimentName, + "datasetName": f"run_uid8_{run.metadata['start']['uid'][:8]}", # first part of run uid + "workflow_name": workflow_name, + "time_iso8601": ts2iso(run.metadata.get("start", {}).get("time", 0)), + "bluesky_metadata": {k: getattr(run, k).metadata for k in run}, # all streams + # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + "_id": str(uuid.uuid4()), # FIXME: dm must fix this upstream + } + + try: + dm_md = api.addExperimentDataset(datasetInfo) + logger.debug("Metadata shared to DM: %s", dm_md) + return dm_md + except InvalidArgument as ex: + logger.error(ex) + if should_raise: + raise ex + + +class DM_WorkflowCache: + """ + Keep track of one or more APS Data Management workflows for bluesky plans. + + .. autosummary:: + + ~define_workflow + ~print_cache_summary + ~report_dm_workflow_output + ~wait_workflows + ~_update_processing_data + """ + + cache = {} + + def define_workflow(self, key: str, connector: object): + """ + Add a DM_WorkflowConnector object to be managed. + + PARAMETERS + + key *str*: + Identifying text for this workflow object. + connector *object*: + Instance of DM_WorkflowConnector. + """ + if key in self.cache: + raise KeyError(f"Key already exists: {key!r}") + # TODO: validate connector + self.cache[key] = connector + + def print_cache_summary(self, title: str = "Summary"): + """Summarize (in a table) the DM workflows in the cache.""" + table = pyRestTable.Table() + table.labels = "# process status runTime started id".split() + for i, k in enumerate(self.cache, start=1): + v = self.cache[k] + job_id = v.job_id.get() + started = ts2iso(v.start_time) + table.addRow((i, k, v.status.get(), v.run_time.get(), started, job_id[:7])) + print(f"\n{title}\n{table}") + + def report_dm_workflow_output(self, final_stage_id: str): + """ + Print a final (summary) report about a single DM workflow. + + PARAMETERS + + final_stage_id *str*: + Text key of the last stage in the workflow. + """ + for wf in self.cache.values(): + job = wf.getJob() + stage = job.getWorkflowStage(final_stage_id) # example: "06-DONE" + for process in stage.get("childProcesses", {}).values(): + for key in "stdOut stdErr".split(): + value = str(process.get(key, "")).strip() + if len(value): + print(f"{final_stage_id} {key}:\n{value}") + print("~" * 50) + + def wait_workflows(self, period: float = DEFAULT_PERIOD, wait: bool = DEFAULT_WAIT): + """ + (plan) Wait (if ``True``) for existing workflows to finish. + + PARAMETERS + + period *float*: + Time between reports while waiting for all workflows to finish processing. + Default: 10 seconds. + wait *bool*: + Should RE wait for all workflows to finish? + Default: ``True`` + """ + print(f"DEBUG: wait_workflows(): waiting={wait}") + if wait: + print(f"Waiting for all previous workflows ({len(self.cache)}) to finish...") + for workflow in self.cache.values(): + # wait for each workflow to end + while workflow.status.get() not in WORKFLOW_DONE_STATES: + print(f"Waiting for {workflow=}") + yield from bps.sleep(period) + + def _update_processing_data(self): + """Update all the workflows in the cache (from the DM server).""" + for wf in self.cache.values(): + wf._update_processing_data() + + +def dm_daq_wait_upload_plan(id: str, period: float = DEFAULT_PERIOD): + """plan: Wait for DAQ uploads to finish.""" + api = dm_api_daq() + uploadInfo = api.getUploadInfo(id) + uploadStatus = uploadInfo.get("status") + while uploadStatus not in "done failed skipped aborted aborting".split(): + yield from bps.sleep(period) + uploadInfo = api.getUploadInfo(id) + uploadStatus = uploadInfo.get("status") + logger.debug("DM DAQ upload info: %s", uploadInfo) + if uploadStatus != "done": + raise ValueError( + f"DM DAQ upload status: {uploadStatus!r}, {id=!r}." + f" Processing error message(s): {uploadInfo['errorMessage']}." + ) + + +def dm_get_experiment_datadir_active_daq(experiment_name: str, data_directory: str): + """ + Return the daqInfo dict for the active DAQ, or 'None'. + """ + from dm.common.constants import dmProcessingStatus + + active_statuses = ( + dmProcessingStatus.DM_PROCESSING_STATUS_PENDING, + dmProcessingStatus.DM_PROCESSING_STATUS_RUNNING, + ) + for daq_info in dm_api_daq().listDaqs(): + if daq_info.get("experimentName") == experiment_name: + if daq_info.get("dataDirectory") == data_directory: + if daq_info.get("status") in active_statuses: + return daq_info + return None diff --git a/apstools/utils/tests/test_aps_data_management.py b/apstools/utils/tests/test_aps_data_management.py index 3d7dd676a..436e875df 100644 --- a/apstools/utils/tests/test_aps_data_management.py +++ b/apstools/utils/tests/test_aps_data_management.py @@ -1,11 +1,75 @@ +""" +Test the APS Data Management utility functions. +""" + import pytest import pathlib +import tempfile +import uuid from .. import dm_setup +from .. import dm_source_environ +from .. import aps_data_management as adm + + +@pytest.fixture() +def tmpfile(): + tempdir = pathlib.Path(tempfile.mkdtemp()) + yield tempdir / f"file_{str(uuid.uuid4())[:7]}" + + +def test_dm_setup_raises(tmpfile): + # reset, to be safe + adm.DM_ENV_SOURCED = False + assert not adm.DM_ENV_SOURCED + + # Test with a non-existing file name. + # tmpfile does not exist yet since nothing was written to it. + with pytest.raises(FileExistsError) as exinfo: + dm_setup(tmpfile) + assert "does not exist" in str(exinfo) + assert adm.DM_SETUP_FILE is None + assert not adm.DM_ENV_SOURCED + + # Test with a file that has no 'export' statements. This Python file, for example. + with pytest.raises(KeyError) as exinfo: + dm_setup(__file__) + assert "No environment variable definitions found" in str(exinfo) + assert adm.DM_SETUP_FILE == str(pathlib.Path(__file__)) + assert not adm.DM_ENV_SOURCED + + bash_script = tmpfile + with open(bash_script, "w") as f: + f.write("export EXAMPLE=example\n") + with pytest.raises(KeyError) as exinfo: + dm_setup(bash_script) + assert adm.DM_ENV_SOURCED + assert adm.DM_SETUP_FILE == tmpfile + assert adm.environ.get("EXAMPLE") == "example" + assert "DM_STATION_NAME" in str(exinfo) + + # Finally, call with a setup that does not raise an exception. + # Adds the minimum required (to pass) 'export' statement. + value = "apstools_test" + with open(bash_script, "a") as f: + f.write(f"export DM_STATION_NAME={value}\n") + adm.DM_ENV_SOURCED = False + assert not adm.DM_ENV_SOURCED + dm_setup(bash_script) + assert adm.DM_ENV_SOURCED + assert adm.DM_SETUP_FILE == tmpfile + assert adm.environ.get("DM_STATION_NAME") == value + +def test_dm_source_environ_raises(tmpfile): + # reset, to be safe + adm.DM_ENV_SOURCED = False + assert not adm.DM_ENV_SOURCED -def test_util(): - test_bash_file = pathlib.Path(__file__) - with pytest.raises(KeyError) as exc: - dm_setup(test_bash_file) - assert "DM_STATION_NAME" in str(exc.value) + # tmpfile does not exist yet since nothing was written to it. + with pytest.raises(FileExistsError) as exinfo: + dm_setup(tmpfile) + with pytest.raises(ValueError) as exinfo: + dm_source_environ() + assert not adm.DM_ENV_SOURCED + assert "file is undefined" in str(exinfo) diff --git a/apstools/utils/time_constants.py b/apstools/utils/time_constants.py new file mode 100644 index 000000000..983a29997 --- /dev/null +++ b/apstools/utils/time_constants.py @@ -0,0 +1,37 @@ +""" +Define symbols used by other modules to define time (seconds). + +.. autosummary:: + + ~DAY + ~HOUR + ~MINUTE + ~SECOND + ~WEEK + ~ts2iso + +""" + +__all__ = """DAY HOUR MINUTE SECOND WEEK ts2iso""".split() + +import datetime + +#: One second of time (the base unit). +SECOND = 1 + +#: 60 seconds (in seconds) +MINUTE = 60 * SECOND + +#: 60 minutes (in seconds) +HOUR = 60 * MINUTE + +#: 24 hours (in seconds) +DAY = 24 * HOUR + +#: 7 days (in seconds) +WEEK = 7 * DAY + + +def ts2iso(ts: float, sep: str = " ") -> str: + """Convert Python timestamp (float) to IS8601 time in current time zone.""" + return datetime.datetime.fromtimestamp(ts).isoformat(sep=sep) diff --git a/docs/source/api/_utils.rst b/docs/source/api/_utils.rst index 957a6b60c..79e23e54e 100644 --- a/docs/source/api/_utils.rst +++ b/docs/source/api/_utils.rst @@ -11,6 +11,23 @@ here. Utilities by Activity ---------------------- +.. _utils.aps_dm: + +APS Data Management +++++++++++++++++++++++ + +.. autosummary:: + + ~apstools.utils.aps_data_management.dm_setup + ~apstools.utils.aps_data_management.dm_api_cat + ~apstools.utils.aps_data_management.dm_api_daq + ~apstools.utils.aps_data_management.dm_api_dataset_cat + ~apstools.utils.aps_data_management.dm_api_ds + ~apstools.utils.aps_data_management.dm_api_file + ~apstools.utils.aps_data_management.dm_api_filecat + ~apstools.utils.aps_data_management.dm_api_proc + ~apstools.utils.aps_data_management.DM_WorkflowCache + .. _utils.finding: Finding @@ -68,6 +85,7 @@ Other Utilities ~apstools.utils.plot.trim_plot_lines ~apstools.utils.misc.trim_string_for_EPICS ~apstools.utils.misc.unix + ~apstools.utils.time_constants.ts2iso .. _utils.general: @@ -133,6 +151,9 @@ Submodules .. automodule:: apstools.utils.aps_data_management :members: + :private-members: + :show-inheritance: + :inherited-members: .. automodule:: apstools.utils.apsu_controls_subnet :members: @@ -184,3 +205,6 @@ Submodules .. automodule:: apstools.utils.spreadsheet :members: + +.. automodule:: apstools.utils.time_constants + :members: diff --git a/docs/source/examples/de_aps_data_management.md b/docs/source/examples/de_aps_data_management.md new file mode 100644 index 000000000..40f57a6d5 --- /dev/null +++ b/docs/source/examples/de_aps_data_management.md @@ -0,0 +1,282 @@ +# Integration with APS Data Management (DM) + +> The APS Data Management System is a system for gathering together experimental +> data, metadata about the experiment and providing users access to the data +> based on a users role. + +APS beamlines have been using DM to process acquired data (with DM workflows). +See the [documentation](https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/HowTos/Getting-Started) for more details. This document describes a way to integrate DM with the Bluesky framework. Before the integration is described, a brief overview of DM... + +## DM Overview + +For integration with the Bluesky framework, we provide a minimal explanation for these parts of DM: + +- experiment +- DAQ +- workflow +- upload + +### experiment + +> The experiment is the central object for creating entries in the Data +> Management System. + +Typically, a DM experiment will manage all data for a single proposal/ESAF at a +beamline. Beamline staff will create the DM experiments they will need. + +The **name** of the experiment will be used by bluesky to coordinate acquired data and runs with DM. + +An experiment will create data storage with permissions for the beamline data +acquisition account and all the users listed in the proposal. + +More info [here](https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/HowTos/Getting-Started#getting-some-data-into-the-system) + +### DAQ + +> ... overall purpose of this system is to get data into the system and provide +> access control to that data. + +In DM, a *DAQ* monitors a local file directory tree (on a specific workstation) +and copies any new content to the DM experiment's data storage. + +More info [here](https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/HowTos/Getting-Started#getting-files-into-data-management) + +### workflow + +> A workflow is simply a set of defined steps that will process data. + +The steps to process data: move acquired data to +computation hosts (local or remote), run the computation applications, then move +any results to a final destination. A workflow is capable of other activities, +such as communicating back to EPICS. + +More info [here](https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/HowTos/Getting-Started#workflows-and-processing-data) + +### upload + +In addition to automatic file copying with a DAQ, DM provides for single file +(or whole directory) uploads to the experiment storage. This can be used for +content that is not in the local data directory trees monitored by a DAQ. + +In *file* mode, files are processed (transfer, catalog, invoke workflow, etc) +one by one. In *directory* mode, there will be a single transfer of the whole +directory, after which all files will be cataloged, etc. + +The advantage of the file mode is that you can keep track of progress and +see/recover from errors easier. The advantage of the directory mode is that it +is more efficient, especially for large number of small files, where any new +transfer connection is more expensive relative to the data transfer time. + +## Bluesky integration + +Many beamlines operate with software that provides some general setup for the next user group. Then, the user runs various plans to align +the instrument and sample, then to collect the scientific data. + +### Setup User + +Activities could include creating local storage directories, identifying +proposal and/or ESAF numbers, ... + +The `setup_user()` procedure is a convenient place to enter the name of the DM +`experiment` to be used for the user's data collection. Bluesky should remember +this name so that the user does not need to supply for any of their data +collection activities. For example: + +```py +def setup_user(dm_experiment_name: str = ""): + yield from bps.mv(dm_experiment, dm_experiment_name) +``` + +where + +```py +from ophyd import Signal +dm_experiment = Signal(name="dm_experiment", value="") +``` + +#### Start a DAQ if needed + +Bluesky might direct some data acquisition to write data into local file storage +(such as area detector image files). A DAQ can be started by `setup_user()` to +copy new files to the DM experiment. For example: + +```py +from apstools.utils import dm_get_experiment_datadir_active_daq, dm_start_daq + +def setup_user(dm_experiment_name: str = ""): + yield from bps.mv(dm_experiment, dm_experiment_name) + + # local directory tree to monitor + data_directory = "/some/path/to/data/" + # DM experiment subdirectory for upload + dm_daq_directory = "something" + daq = dm_get_experiment_datadir_active_daq( + dm_experiment_name, data_directory) + if daq is None: + daq = dm_start_daq( + dm_experiment_name, + data_directory, + destDirectory=dm_daq_directory + ) + # remember this for later + yield from bps.mv(dm_daq_id, daq["id]) +``` + +where + +```py +dm_daq_id = Signal(name="dm_daq_id", value="") +``` + +Quickly, this can become more complicated if more than one DAQ is needed. + +The value for `dm_daq_directory` is to be decided by the software (called in the +workflow) that processes the data. + +#### Upload a file if needed + +It may be needed to upload a file during the `setup_user()` plan. Here's an example: + +```py +import pathlib +from apstools.utils import dm_upload + +def setup_user( + dm_experiment_name: str = "", + upload_file: str = "", +): + yield from bps.mv(dm_experiment, dm_experiment_name) + + # upload a file + # DM experiment subdirectory for upload + p = pathlib.Path(upload_file) + dm_upload_directory = "something" + dm_upload( + dm_experiment_name, + str(p.parent), # the directory name + experimentFilePath=p.name # the file name + destDirectory=dm_upload_directory, + ) +``` + +Like `dm_daq_directory` above, the value for `dm_upload_directory` is to be +decided by the software (called in the workflow) that processes the data. + +### Data collection plan(s) + +Integration of DM with bluesky plans is dependent on the type of scan to be +executed and how the workflow will interact. + +Two general possibilities come to mind: + +- file-based collection and workflow +- streaming-based collection and workflow + +In either case, the `apstools.devices.DM_WorkflowConnector` is an ophyd-style +`Device` that is used to coordinate with a DM workflow. Create a connector: + +```py +from apstools.devices import DM_WorkflowConnector + + # ... later, in the bluesky plan ... + # REPLACE "dm_workflow" with the name of the workflow to be used. + # This could be a keyword argument to the plan! + # The reporting settings could also be user-selectable keyword arguments. + dm_workflow = DM_WorkflowConnector(name="dm_workflow") + yield from bps.mv( + dm_workflow.concise_reporting, False, # True: for less details + dm_workflow.reporting_period, 60, # for periodic reports (s) + ) +``` + +Start the workflow (with the `dm_workflow` object) in the data acquisition plan +as if it is a bluesky plan. This way, the startup does not block the RunEngine +from its periodic responsibilities. + +```py + yield from dm_workflow.run_as_plan( + workflow=workflow_name, + wait=dm_wait, + timeout=999_999_999, # seconds (aka forever) + # all kwargs after this line are DM argsDict content + filePath=dm_directory, # like dm_daq_directory + experiment=dm_experiment.get(), + # ... any other keyword args needed by the plan ... + ) +``` + +Here `dm_directory` is (like `dm_daq_directory` and `dm_upload_directory` +above) the experiment subdirectory where the workflow expects to find the files. + +Note that the `timeout` parameter is for the background process that watches the +workflow's progress. If the timeout is reached, the reporting stops but the +workflow itself is unaffected. + +Any keywords expected by the plan should be included as user-selectable +arguments of the data collection plan, or determined by the plan itself. + +#### File-based + +In a *file* data collection and workflow, data is acquired first and stored +into files on local storage. Upload of the files is managed either by a DAQ or +by direct call(s) to `apstools.utils.dm_upload()`. If a DAQ is used, the +bluesky plan should wait until the DAQ reports the expected file(s) upload have +completed. Then, bluesky tells DM to start the workflow and monitors it until +it completes. + +It is a choice of the workflow if more than one of the same kind of workflow can +execute at the same time. Some workflows expect specific files to exist and may +not tolerate their manipulation by another workflow running at the same time. DM +can be configured to control this with a scheduling queue. Alternatively, this +decision process could be built into the bluesky plan. + +The general outline: + +1. Bluesky plan + 1. assembles run metadata dictionary + 2. prepares instrument for streaming data collection + 3. initiates data collection + 4. waits for data collection to finish + 5. waits for any DAQs to complete expected uploads (if required) + 6. waits for any existing workflows to finish (if required) + 7. starts DM workflow + 8. monitors workflow in the background (periodic reports) + 9. uploads run metadata to DM +2. DM workflow + 1. execute workflow steps as programmed + +See this example file-based bluesky data acquisition [plan](https://github.com/APS-1ID-MPE/hexm-bluesky/blob/a0b12fcf392b12b3d498dab070aee1f535614b24/instrument/plans/bdp202403.py#L77-L248). + +#### Streaming-based + +In a *streaming* data collection and workflow, the workflow must be started +first so it can setup the tools to receive data that will be streamed. A common +tool to use for this interprocess communication is EPICS PVAccess. PVAccess is +preferred since it can comunicate structured data. It may be easier to +communicate across network boundaries than EPICS ChannelAccess. + +Without a detailed description or code, here is an outline of a possible +streaming-based data collection and workflow with bluesky and DM. + +1. Bluesky plan + 1. creates a PVA object for reports from the workflow + 2. starts DM workflow, passing the name of its PVA object + 3. assembles run metadata dictionary + 4. prepares instrument for streaming data collection + 5. connects with any PVA objects from workflow, as needed + 6. waits for DM workflow to become ready +2. DM workflow + 1. execute workflow steps as programmed + 1. connects with bluesky PVA object + 2. creates its own PVA object for commands from workflow + 3. prepares itself for streaming data + 4. signals to bluesky that it is ready +3. data acquisition stream(s) + 1. bluesky initiates data collection + 2. DM workflow receives data + 3. either process signals the other while data is being collected +4. data collection finishes + 1. Either DM workflow signals or Bluesky signals + 2. bluesky reports on further progress of workflow +5. DM workflow finishes +6. Bluesky uploads run metadata to DM diff --git a/environment.yml b/environment.yml index 07f8cea52..5479a2e2f 100644 --- a/environment.yml +++ b/environment.yml @@ -35,7 +35,7 @@ dependencies: - spec2nexus - sphinx >=5 - - aps-dm-api >=5 # linux-64 osx-64 + - aps-dm-api >=8 # linux-64 osx-64 # - pip: # - bluesky_live