Skip to content

Commit

Permalink
backing up what we have so we can use it on loq on tuesday
Browse files Browse the repository at this point in the history
  • Loading branch information
rerpha committed Nov 29, 2024
1 parent 594df6e commit f2243a9
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 1 deletion.
132 changes: 132 additions & 0 deletions manual_system_tests/continuous_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Demonstration plan showing basic bluesky functionality."""

import os
import uuid

Check failure on line 4 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:4:8: F401 `uuid` imported but unused
from pathlib import Path
from typing import Generator

import bluesky.plan_stubs as bps
import bluesky.plans as bp

Check failure on line 9 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:9:25: F401 `bluesky.plans` imported but unused
import matplotlib
import matplotlib.pyplot as plt
from bluesky.callbacks import LiveFitPlot, LiveTable
from bluesky.plan_stubs import trigger_and_read
from bluesky.preprocessors import subs_decorator, run_decorator, monitor_during_decorator, contingency_wrapper

Check failure on line 14 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:14:66: F401 `bluesky.preprocessors.monitor_during_decorator` imported but unused

Check failure on line 14 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (E501)

manual_system_tests/continuous_scan.py:14:101: E501 Line too long (110 > 100)
from bluesky.utils import Msg
from ophyd_async.plan_stubs import ensure_connected

from ibex_bluesky_core.callbacks.file_logger import HumanReadableFileCallback
from ibex_bluesky_core.callbacks.fitting import LiveFit
from ibex_bluesky_core.callbacks.fitting.fitting_utils import Linear, Gaussian

Check failure on line 20 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:20:63: F401 `ibex_bluesky_core.callbacks.fitting.fitting_utils.Linear` imported but unused
from ibex_bluesky_core.callbacks.plotting import LivePlot
from ibex_bluesky_core.devices import get_pv_prefix
from ibex_bluesky_core.devices.block import block_rw_rbv, block_r, block_mot

Check failure on line 23 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:23:45: F401 `ibex_bluesky_core.devices.block.block_rw_rbv` imported but unused
from ibex_bluesky_core.devices.simpledae import SimpleDae

Check failure on line 24 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:24:49: F401 `ibex_bluesky_core.devices.simpledae.SimpleDae` imported but unused
from ibex_bluesky_core.devices.simpledae.controllers import (
RunPerPointController,

Check failure on line 26 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:26:5: F401 `ibex_bluesky_core.devices.simpledae.controllers.RunPerPointController` imported but unused
)
from ibex_bluesky_core.devices.simpledae.reducers import (
GoodFramesNormalizer,

Check failure on line 29 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (F401)

manual_system_tests/continuous_scan.py:29:5: F401 `ibex_bluesky_core.devices.simpledae.reducers.GoodFramesNormalizer` imported but unused
)
from ibex_bluesky_core.devices.simpledae.waiters import GoodFramesWaiter
from ibex_bluesky_core.run_engine import get_run_engine

NUM_POINTS: int = 3

Check failure on line 34 in manual_system_tests/continuous_scan.py

View workflow job for this annotation

GitHub Actions / call-linter-workflow / ruff

Ruff (I001)

manual_system_tests/continuous_scan.py:3:1: I001 Import block is un-sorted or un-formatted


def continuous_scan_plan(dave: float) -> Generator[Msg, None, None]:
"""Manual system test which moves a block and reads the DAE.
Prerequisites:
- A block named "mot" pointing at MOT:MTR0101.RBV
- A galil IOC in RECSIM mode with MTRCTRL=1 so that the above PV is available
- A DAE in a setup which can begin and end simulated runs.
Expected result:
- A sensible-looking LiveTable has been displayed
* Shows mot stepping from 0 to 10 in 3 evenly-spaced steps
- A plot has been displayed (in IBEX if running in the IBEX gui, in a QT window otherwise)
* Should plot "noise" on the Y axis from the simulated DAE
* Y axis should be named "normalized counts"
* X axis should be named "mot"
- The DAE was started and ended once per point
- The DAE waited for at least 500 good frames at each point
"""
prefix = get_pv_prefix()
bob = block_mot("bob")
alice = block_r(float, "alice")
mot = block_r(float, "mot")
_, ax = plt.subplots()
lf = LiveFit(
Gaussian.fit(), y=alice.name, x=bob.name
)

yield from ensure_connected(bob, alice, mot)

@subs_decorator(
[
HumanReadableFileCallback(
Path("C:\\") / "instrument" / "var" / "logs" / "bluesky" / "output_files",
[
alice.name,
bob.name
],
),
LiveTable(
[
alice.name,
bob.name
]
),
LiveFitPlot(livefit=lf, ax=ax, update_every=100),
LivePlot(
y=alice.name,
x=bob.name,
marker="x",
linestyle="none",
ax=ax,
),
]
)
def _inner() -> Generator[Msg, None, None]:

@run_decorator(md={})
def polling_plan():
yield from bps.create()
reading = yield from bps.read(bob)
yield from bps.read(alice)
yield from bps.save()

# start the ramp
status = yield from bps.abs_set(bob, dave, wait=False)
while not status.done:
yield from bps.checkpoint()
yield from bps.clear_checkpoint()

yield from bps.create()
new_reading = yield from bps.read(bob)
yield from bps.read(alice)

if new_reading[bob.name]["value"] == reading[bob.name]["value"]:
yield from bps.drop()
else:
reading = new_reading
yield from bps.save()

# take a 'post' data point
yield from trigger_and_read([bob, alice])

return (yield from polling_plan())

def _stop_motor(e):
yield from bps.stop(bob)

yield from contingency_wrapper(_inner(), except_plan=_stop_motor)


if __name__ == "__main__" and not os.environ.get("FROM_IBEX") == "True":
matplotlib.use("qtagg")
plt.ion()
RE = get_run_engine()
RE(continuous_scan_plan())
input("Plan complete, press return to close plot and exit")
18 changes: 17 additions & 1 deletion src/ibex_bluesky_core/devices/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
StandardReadableFormat,
observe_value,
)
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw
from ophyd_async.epics.core import epics_signal_r, epics_signal_rw, epics_signal_w
from ophyd_async.epics.motor import Motor

from ibex_bluesky_core.devices import get_pv_prefix
Expand Down Expand Up @@ -319,12 +319,28 @@ def __init__(
# However, we also have motor record aliases for :SP and :SP:RBV, which *don't* get mangled
# by GWBLOCK in that way. So by pointing at CS:SB:blockname:SP:RBV rather than
# CS:SB:blockname here, we avoid a write access exception when moving a motor block.
self.block_mot_stop = epics_signal_w(int, f"{prefix}CS:SB:{block_name}:SP:RBV.STOP")
super().__init__(f"{prefix}CS:SB:{block_name}:SP:RBV", name=block_name)

def __repr__(self) -> str:
"""Debug representation of this block."""
return f"{self.__class__.__name__}(name={self.name})"

async def pause(self):
print("HIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII")
dmov = await self.motor_done_move.get_value()
if not dmov:
logger.warning(f"stopping motor: {self}")
await self.stop()
raise ValueError("Cannot pause a motor when it's moving")

async def stop(self, success=False):
print(f"stopping {self}")
await self.motor_stop.trigger(wait=False)
print("trying block_mot_stop.set(1)")
await self.block_mot_stop.set(1, wait=False)
print(f"stopped {self}")


def block_r(datatype: Type[T], block_name: str) -> BlockR[T]:
"""Get a local read-only block for the current instrument.
Expand Down

0 comments on commit f2243a9

Please sign in to comment.