Skip to content

Commit

Permalink
Merge pull request #938 from BCDA-APS/872-aps-dm-branch2
Browse files Browse the repository at this point in the history
Hoist Data Management code from XPCS and HEDM
  • Loading branch information
prjemian authored Mar 18, 2024
2 parents a2e23bf + c287f92 commit a165d24
Show file tree
Hide file tree
Showing 11 changed files with 1,164 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ Fixes

* lineup2() should work with low intensity peaks.
* lineup2() would raise ZeroDivideError in some cases.
* Increase minimum aps-dm-api version to 8.

Maintenance
-----------

* Code format conforms to 'ruff'.
* Add additional support for APS Data Management API.

1.6.18
******
Expand Down
41 changes: 27 additions & 14 deletions apstools/devices/aps_data_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,30 @@
~DM_WorkflowConnector
from: https://github.com/APS-1ID-MPE/hexm-bluesky/blob/main/instrument/devices/data_management.py
"""

__all__ = """
DM_WorkflowConnector
""".split()

import logging
import os
import time

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # allow any log content at this level
logger.info(__file__)

from ophyd import Component
from ophyd import Device
from ophyd import Signal

from ..utils import run_in_thread

DM_STATION_NAME = str(os.environ.get("DM_STATION_NAME", "terrier")).lower()
logger = logging.getLogger(__name__)

NOT_AVAILABLE = "-n/a-"
NOT_RUN_YET = "not_run"
POLLING_PERIOD_S = 1.0
REPORT_PERIOD_DEFAULT = 10
REPORT_PERIOD_MIN = 1
STARTING = "running"
TIMEOUT_DEFAULT = 180 # TODO: Consider removing the timeout feature
TIMEOUT_DEFAULT = 180 # TODO: Consider removing/renaming the timeout feature


class DM_WorkflowConnector(Device):
Expand Down Expand Up @@ -88,9 +84,9 @@ class DM_WorkflowConnector(Device):
"""

job = None # DM processing job (must update during workflow execution)
_api = None # DM common API
_api = None # DM processing API

owner = Component(Signal, value=DM_STATION_NAME, kind="config")
owner = Component(Signal, value="", kind="config")
workflow = Component(Signal, value="")
workflow_args = {}

Expand All @@ -101,7 +97,7 @@ class DM_WorkflowConnector(Device):
stage_id = Component(Signal, value=NOT_RUN_YET)
status = Component(Signal, value=NOT_RUN_YET)

polling_period = Component(Signal, value=0.1, kind="config")
polling_period = Component(Signal, value=POLLING_PERIOD_S, kind="config")
reporting_period = Component(Signal, value=REPORT_PERIOD_DEFAULT, kind="config")
concise_reporting = Component(Signal, value=True, kind="config")

Expand All @@ -127,9 +123,11 @@ def __init__(self, name=None, workflow=None, **kwargs):
if name is None:
raise KeyError("Must provide value for 'name'.")
super().__init__(name=name)

if workflow is not None:
self.workflow.put(workflow)
self.workflow_args.update(kwargs)
self.owner.put(self.api.username)

def put_if_different(self, signal, value):
"""Put ophyd signal only if new value is different."""
Expand Down Expand Up @@ -187,7 +185,12 @@ def report_status(self, t_offset=None):
self.report_processing_stages()

def start_workflow(self, workflow="", timeout=TIMEOUT_DEFAULT, **kwargs):
"""Kickoff a DM workflow with optional wait & timeout."""
"""
Kickoff a DM workflow with optional reporting timeout.
The reporting process will continue until the workflow ends or the
timeout period is exceeded. It does not affect the actual workflow.
"""
if workflow == "":
workflow = self.workflow.get()
else:
Expand Down Expand Up @@ -224,7 +227,11 @@ def _cleanup():

@run_in_thread
def _run_DM_workflow_thread():
logger.info("run DM workflow: %s with timeout=%s s", self.workflow.get(), timeout)
logger.info(
"run DM workflow: %s with reporting time limit=%s s",
self.workflow.get(),
timeout,
)
self.job = self.api.startProcessingJob(
workflowOwner=self.owner.get(),
workflowName=workflow,
Expand Down Expand Up @@ -266,7 +273,13 @@ def _run_DM_workflow_thread():
self.status.subscribe(_reporter)
_run_DM_workflow_thread()

def run_as_plan(self, workflow="", wait=True, timeout=TIMEOUT_DEFAULT, **kwargs):
def run_as_plan(
self,
workflow: str = "",
wait: bool = True,
timeout: int = TIMEOUT_DEFAULT,
**kwargs,
):
"""Run the DM workflow as a bluesky plan."""
from bluesky import plan_stubs as bps

Expand Down
5 changes: 1 addition & 4 deletions apstools/devices/lakeshore_controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
from ophyd import Signal

from ..synApps import AsynRecord
from ..utils import HOUR
from . import PVPositionerSoftDoneWithStop

SECOND = 1
MINUTE = 60 * SECOND
HOUR = 60 * MINUTE


class LakeShore336_LoopControl(PVPositionerSoftDoneWithStop):
"""
Expand Down
7 changes: 3 additions & 4 deletions apstools/devices/tests/test_aps_data_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
import pytest

from .. import DM_WorkflowConnector
from ..aps_data_management import DM_STATION_NAME

# from dm.common.exceptions.dmException import DmException


@pytest.mark.parametrize("wf_name", ["a_workflow_name"])
Expand All @@ -18,7 +15,7 @@ def test_object(wf_name):
assert wf is not None

assert wf.workflow.get() == wf_name
assert wf.owner.get() == DM_STATION_NAME
assert wf.owner.get() in ("", None)

try:
# Force a test that dm package can be imported.
Expand All @@ -35,6 +32,8 @@ def test_object(wf_name):
"Connection refused" in err_str
or
"invalid literal for int() with base 10" in err_str
or
"Invalid owner name provided" in err_str
)
# fmt: on

Expand Down
48 changes: 45 additions & 3 deletions apstools/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
from ._core import TableStyle

from .aps_data_management import dm_setup

from .aps_data_management import build_run_metadata_dict
from .aps_data_management import dm_add_workflow
from .aps_data_management import dm_api_cat
from .aps_data_management import dm_api_daq
from .aps_data_management import dm_api_dataset_cat
from .aps_data_management import dm_api_ds
from .aps_data_management import dm_api_file
from .aps_data_management import dm_api_filecat
from .aps_data_management import dm_api_proc
from .aps_data_management import dm_file_ready_to_process
from .aps_data_management import dm_get_daqs
from .aps_data_management import dm_get_experiment_datadir_active_daq
from .aps_data_management import dm_get_experiment_file
from .aps_data_management import dm_get_experiment_path
from .aps_data_management import dm_get_experiments
from .aps_data_management import dm_get_workflow
from .aps_data_management import dm_source_environ
from .aps_data_management import dm_start_daq
from .aps_data_management import dm_station_name
from .aps_data_management import dm_stop_daq
from .aps_data_management import dm_update_workflow
from .aps_data_management import dm_upload
from .aps_data_management import get_workflow_last_stage
from .aps_data_management import share_bluesky_metadata_with_dm
from .aps_data_management import validate_experiment_dataDirectory
from .aps_data_management import wait_dm_upload
from .aps_data_management import DEFAULT_UPLOAD_TIMEOUT
from .aps_data_management import DEFAULT_UPLOAD_POLL_PERIOD
from .aps_data_management import DM_WorkflowCache
from .apsu_controls_subnet import warn_if_not_aps_controls_subnet

from .catalog import copy_filtered_catalog
from .catalog import findCatalogsInNamespace
from .catalog import getCatalog
Expand Down Expand Up @@ -63,6 +89,22 @@
from .spreadsheet import ExcelDatabaseFileBase
from .spreadsheet import ExcelDatabaseFileGeneric
from .spreadsheet import ExcelReadError
from .time_constants import DAY
from .time_constants import HOUR
from .time_constants import MINUTE
from .time_constants import SECOND
from .time_constants import WEEK
from .time_constants import ts2iso

# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: [email protected]
# :copyright: (c) 2017-2024, UChicago Argonne, LLC
#
# Distributed under the terms of the Argonne National Laboratory Open Source License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
Expand Down
Loading

0 comments on commit a165d24

Please sign in to comment.