Skip to content

Commit

Permalink
Add helpers for external functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Willemsen committed Nov 5, 2024
1 parent 541ec39 commit 700f27a
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
7 changes: 7 additions & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ Reference documentation

fitting/*

.. toctree::
:maxdepth: 2
:caption: Plans & plan stubs
:glob:

plans/*

.. toctree::
:maxdepth: 2
:caption: Preprocessors
Expand Down
112 changes: 112 additions & 0 deletions doc/plans/plans_intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Introduction to plans

A *plan* in bluesky is a sequence of messages which instruct the bluesky run engine to
do things - for example to set or read some hardware. A minimal example plan is:

```python
from bluesky.utils import Msg

my_plan = [
Msg("open_run"),
Msg("close_run"),
]
```

In practice, bluesky plans are usually implemented as python generators, which allows much more flexibility:

```python
from bluesky.utils import Msg

def my_plan():
yield Msg("open_run")
yield Msg("close_run")
```

Bluesky provides some helper methods which can be used to compose larger plans, using python's `yield from`
syntax:

```python
import bluesky.plan_stubs as bps

def my_plan():
yield from bps.open_run()
yield from bps.close_run()
```

## Calling external code

All interaction with the "outside world" should be via bluesky messages, and **not** directly called from
within a plan. For example, the following is **bad**:

```python
import bluesky.plan_stubs as bps
from genie_python import genie as g

def bad_plan():
yield from bps.open_run()
g.cset("foo", 123) # This is bad - do not do this
yield from bps.close_run()
```

```{danger}
External I/O - including most `genie_python` or `inst` functions - should not be done directly in a plan,
as it will break:
- Rewindability (for example, the ability to interrupt a scan and then later seamlessly continue it)
- Simulation (the `cset` above would be executed during a simulation)
- Ability to emit documents
- Ability to use bluesky signals
- Error handling
- ...
```

In the above case, a good plan, which uses bluesky messages correctly, would be:

```python
import bluesky.plan_stubs as bps
from ophyd_async.plan_stubs import ensure_connected
from ibex_bluesky_core.devices.block import block_rw

foo = block_rw(float, "foo")

def good_plan():
yield from ensure_connected(foo)
yield from bps.open_run()
yield from bps.mv(foo, 123)
yield from bps.close_run()
```

In most cases, functionality has been provided natively in bluesky to allow control natively within bluesky.
If the functionality you want to use is not available in bluesky, a number of options are available:

### Ask IBEX development team to implement the desired functionality

The IBEX team can help to implement cases that do not yet have a clear bluesky mechanism.

### Create a custom bluesky `Msg` for the relevant functionality

```{note}
This is an advanced use-case. Please ask the IBEX team for assistance if needed.
```

The bluesky `RunEngine` can be taught new messages, and told how to handle them. The handlers must be
coroutines (declared using `async def`):

```python
from bluesky.utils import Msg
from ibex_bluesky_core.run_engine import get_run_engine
import bluesky.plan_stubs as bps
import asyncio

RE = get_run_engine()

async def _my_clever_inst_script_handler(msg: Msg):
# Usual caveats of asyncio.to_thread apply: won't be cancellable, ...
return await asyncio.to_thread(my_clever_inst_script, *msg.args, **msg.kwargs)

RE.register_command("my_clever_inst_script", _my_clever_inst_script_handler)

def better_plan():
yield from bps.open_run(...)
yield Msg("my_clever_inst_script", 1, 2, 3)
yield from bps.close_run(...)
```
45 changes: 45 additions & 0 deletions src/ibex_bluesky_core/plan_stubs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,46 @@
"""Core plan stubs."""

from typing import Callable, Generator, ParamSpec, TypeVar

import bluesky.plan_stubs as bps
from bluesky.utils import Msg

P = ParamSpec("P")
T = TypeVar("T")


CALL_SYNC_MSG_KEY = "ibex_bluesky_core_call_sync"


def call_sync(
func: Callable[P, T], *args: P.args, clear_checkpoint: bool = True, **kwargs: P.kwargs
) -> Generator[Msg, None, T]:
"""Call a synchronous user function in a plan, and returns the result of that call.
Attempts to guard against the most common pitfalls of naive implementations, for example:
- Blocking the whole event loop
- Breaking keyboard interrupt handling
It does not necessarily guard against all possible cases, and as such it is *recommended* to
use native bluesky functionality wherever possible in preference to this.
The wrapped function will be run in a different thread.
By default, this will clear any active checkpoints before running the external code, because
in general the external code is not safe to re-run later once it has started (e.g. it may have
done relative sets, or may have started some external process). This means that if a plan is
interrupted at any point between a call_sync and the next checkpoint, the plan cannot be
resumed. If the wrapped function *is* safe to call multiple times, after partially or completely
running (i.e. the wrapped function is idempotent, even after interruption part-way), then the
clear_checkpoint flag may be set to False to disable this mechanism.
Args:
func: A callable to run.
args: Arbitrary arguments to be passed to the wrapped function
kwargs: Arbitrary keyword arguments to be passed to the wrapped function
clear_checkpoint: set to False to omit clear_checkpoint message
"""
if clear_checkpoint:
yield from bps.clear_checkpoint()
return (yield Msg(CALL_SYNC_MSG_KEY, *args, obj=func, **kwargs))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

__all__ = ["get_run_engine"]

from ibex_bluesky_core.plan_stubs import CALL_SYNC_MSG_KEY
from ibex_bluesky_core.preprocessors import add_rb_number_processor
from ibex_bluesky_core.run_engine._msg_handlers import call_sync_handler


class _DuringTask(DuringTask):
Expand Down Expand Up @@ -89,6 +91,8 @@ def get_run_engine() -> RunEngine:
log_callback = DocLoggingCallback()
RE.subscribe(log_callback)

RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler)

RE.preprocessors.append(functools.partial(bpp.plan_mutator, msg_proc=add_rb_number_processor))

return RE
65 changes: 65 additions & 0 deletions src/ibex_bluesky_core/run_engine/_msg_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Private helper module for run engine message handlers.
Not intended for user use.
"""

import asyncio
import ctypes
import threading
from typing import Any

from bluesky.utils import Msg


class _ExternalFunctionInterrupted(KeyboardInterrupt):
pass


async def call_sync_handler(msg: Msg) -> Any: # noqa: ANN401
func = msg.obj
ret = None
done_event = asyncio.Event()
loop = asyncio.get_running_loop()

def _wrapper() -> Any: # noqa: ANN401
try:
nonlocal ret
ret = func(*msg.args, **msg.kwargs)
except _ExternalFunctionInterrupted:
pass # Suppress stack traces from our special interruption exception.
finally:
loop.call_soon_threadsafe(done_event.set)

worker_thread = threading.Thread(target=_wrapper, name="external_function_worker")
worker_thread.start()

try:
# Wait until done event is set.
# Ensure we're not blocking the whole event loop while waiting.
await done_event.wait()
except (KeyboardInterrupt, asyncio.CancelledError):
# We got interrupted while the external function was running.
#
# A few options:
# - We could hang until the external function returns (not ideal, in principle it could
# be a rather long-running external function, so would prevent the interrupt from working)
# - Interrupt but don't actually kill the thread, this leads to a misleading result where
# a user gets the shell back but the task is still running in the background
# - Hack around with ctypes to inject an injection into the thread running the external
# function. This is generally frowned upon as a bad idea (tm), but may be the "least bad"
# solution to running potentially-blocking user code within a plan.
#
# A few notes on PyThreadState_SetAsyncExc:
# - It is used by bluesky here, for a similar case:
# https://github.com/bluesky/bluesky/blob/v1.13.0a4/src/bluesky/run_engine.py#L1074
# - The documentation for this function includes the line
# "To prevent naive misuse, you must write your own C extension to call this."
# (Like bluesky, I have cheated by using ctypes instead of C)
# - Choosing to raise KeyboardInterrupt rather than a custom exception, as that is the
# exception type which is most likely to have some kind
#
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_ulong(worker_thread.ident), ctypes.py_object(_ExternalFunctionInterrupted)
)
raise
return ret

0 comments on commit 700f27a

Please sign in to comment.