diff --git a/docs/apis.rst b/docs/apis.rst index a5ac3df..b0f483c 100644 --- a/docs/apis.rst +++ b/docs/apis.rst @@ -13,6 +13,14 @@ ExecutionEngine .. autoclass:: exengine.kernel.ex_future.ExecutionFuture :members: +.. autoclass:: exengine.device_types.Device + :members: + +.. autoclass:: exengine.kernel.ex_event_base.ExecutorEvent + :members: + +.. autoclass:: exengine.kernel.notification_base.Notification + :members: Data @@ -29,18 +37,3 @@ Data .. autoclass:: exengine.kernel.data_handler.DataHandler :members: - -Base classes for extending ExEngine -=================================== - -.. autoclass:: exengine.device_types.Device - :members: - -.. autoclass:: exengine.kernel.ex_event_base.ExecutorEvent - :members: - -.. autoclass:: exengine.kernel.notification_base.Notification - :members: - -.. autoclass:: exengine.kernel.data_storage_base.DataStorage - :members: diff --git a/docs/extending.rst b/docs/extending.rst index f224d90..a7385b9 100644 --- a/docs/extending.rst +++ b/docs/extending.rst @@ -10,4 +10,5 @@ Extending ExEngine extending/add_devices extending/add_events extending/add_notifications - extending/add_storage \ No newline at end of file + extending/add_storage + extending/threading \ No newline at end of file diff --git a/docs/extending/add_devices.rst b/docs/extending/add_devices.rst index c6b16b0..b2ed1ef 100644 --- a/docs/extending/add_devices.rst +++ b/docs/extending/add_devices.rst @@ -199,6 +199,17 @@ then open ``_build/html/index.html`` in a web browser to view the documentation. Advanced Topics ----------------- +Thread Safety and Execution Control +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +ExEngine provides powerful threading capabilities for device implementations, ensuring thread safety and allowing fine-grained control over execution threads. Key features include: + +Automatic thread safety for device methods and attribute access. +The ability to specify execution threads for devices, methods, or events using the @on_thread decorator. +Options to bypass the executor for non-hardware-interacting methods or attributes. + +For a comprehensive guide on ExEngine's threading capabilities, including detailed explanations and usage examples, please refer to the :ref:threading section. + + What inheritance from ``Device`` provides ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/extending/threading.rst b/docs/extending/threading.rst new file mode 100644 index 0000000..cfe79b7 --- /dev/null +++ b/docs/extending/threading.rst @@ -0,0 +1,192 @@ +.. _threading: + +Threading in ExEngine +===================== + +ExEngine simplifies multi-threaded hardware control by managing threading complexities. This approach allows for high-performance applications without requiring developers to handle concurrency at every level. + +This page provides an overview of ExEngine's threading management and its effective use. + +The Challenge: Balancing Simplicity and Performance +--------------------------------------------------- + +In hardware control applications, there's often a mismatch between simple user code and complex device interactions. Ideally, hardware control should be as straightforward as: + +.. code-block:: python + + some_device = SomeDevice() + some_device.take_action() + value = some_device.read_value() + +While this works for single-threaded applications, it can cause issues in multi-threaded environments. For example, a user interface thread and a separate control logic thread might simultaneously access a device. If the device wasn't explicitly designed for multi-threading (i.e. using locks or other synchronization mechanisms), this can lead to hard-to-diagnose bugs. + +Common solutions like single-threaded event loops can limit performance, while implementing thread safety in each device increases complexity. + + +ExEngine's Solution for Thread-Safe Device Control +-------------------------------------------------- + +ExEngine addresses the challenge of thread-safe device control by routing all method calls and attribute accesses of ``Device`` objects through a common thread pool managed by the ``ExecutionEngine``. In other words, when a user calls a method on a device, sets an attribute, or gets an attribute, the call is automatically routed to the ``ExecutionEngine`` for execution. + +This allows for simple, seemingly single-threaded user code. That is, users can methods and set attributes in the normal way (e.g. ``device.some_method()``, ``device.some_attribute = value``), from any thread, but the actual execution happens on a thread managed by the executor. + +This approach ensures thread safety when using devices from multiple contexts without requiring explicit synchronization in user or device code. + +**Low-level Implementation Details** + +.. toggle:: + + While understanding the underlying mechanics isn't essential for regular usage, here's a brief overview: + + The core of this solution lies in the ``DeviceMetaclass``, which wraps all methods and set/get operations on attributes classes inheriting from ``Device`` subclasses. + + When a method is called or an attribute is accessed, instead of executing directly, a corresponding event (like ``MethodCallEvent`` or ``GetAttrEvent``) is created and submitted to the ``ExecutionEngine``. The calling thread blocks until the event execution is complete, maintaining the illusion of synchronous operation. + + In other words, calling a function like: + + .. code-block:: python + + some_device.some_method(arg1, arg2) + + Gets automatically transformed into a ``MethodCallEvent`` object, which is then submitted to the ``ExecutionEngine`` for execution, and its result is returned to the calling thread. + + .. code-block:: python + + some_event = MethodCallEvent(method_name="some_method", + args=(arg1, arg2), + kwargs={}, + instance=some_device) + future = ExecutionEngine.get_instance().submit(event) + # Wait for it to complete on the executor thread + result = future.await_execution() + + + + On an executor thread, the event's ``execute`` method is called: + + .. code-block:: python + + def execute(self): + method = getattr(self.instance, self.method_name) + return method(*self.args, **self.kwargs) + + + This process ensures that all device interactions occur on managed threads, preventing concurrent access issues while maintaining a simple API for users. + + +Direct Use of the ExecutionEngine +--------------------------------- + +While device operations are automatically routed through the ExecutionEngine, users can also submit complex events directly: + +.. code-block:: python + + future = engine.submit(event) + +By default, this executes on the ExecutionEngine's primary thread. + +ExEngine also supports named threads for task-specific execution: + +.. code-block:: python + + engine.submit(readout_event, thread_name="DetectorThread") + engine.submit(control_event, thread_name="HardwareControlThread") + + +The ExecutionEngine automatically creates the specified threads as needed. You don't need to manually create or manage these threads. + +This feature enables logical separation of asynchronous tasks. For instance: + +- One thread can be dedicated to detector readouts +- Another can manage starting, stopping, and controlling other hardware + +Using named threads enhances organization and can improve performance in multi-task scenarios. + + + +Using the @on_thread Decorator +------------------------------ + +ExEngine provides a powerful ``@on_thread`` decorator that allows you to specify which thread should execute a particular event, device, or method. This feature gives you fine-grained control over thread assignment without complicating your code. + +Importing the Decorator +^^^^^^^^^^^^^^^^^^^^^^^ + +To use the ``@on_thread`` decorator, import it from ExEngine: + +```python +from exengine import on_thread +``` + +Decorating Events +^^^^^^^^^^^^^^^^^ + +You can use ``@on_thread`` to specify which thread should execute an event: + +.. code-block:: python + + @on_thread("CustomEventThread") + class MyEvent(ExecutorEvent): + def execute(self): + # This will always run on "CustomEventThread" + ... + +Decorating Devices +^^^^^^^^^^^^^^^^^^ + +When applied to a device class, ``@on_thread`` sets the default thread for all methods of that device: + +.. code-block:: python + + @on_thread("DeviceThread") + class MyDevice(Device): + def method1(self): + # This will run on "DeviceThread" + ... + + def method2(self): + # This will also run on "DeviceThread" + ... + +Decorating Methods +^^^^^^^^^^^^^^^^^^ + +You can also apply ``@on_thread`` to individual methods within a device: + +.. code-block:: python + + class MyDevice(Device): + @on_thread("Method1Thread") + def method1(self): + # This will run on "Method1Thread" + ... + + @on_thread("Method2Thread") + def method2(self): + # This will run on "Method2Thread" + ... + + +Priority and Overriding +^^^^^^^^^^^^^^^^^^^^^^^ + +When both a class and a method have ``@on_thread`` decorators, the method-level decorator takes precedence: + +.. code-block:: python + + @on_thread("DeviceThread") + class MyDevice(Device): + def method1(self): + # This will run on "DeviceThread" + ... + + @on_thread("SpecialThread") + def method2(self): + # This will run on "SpecialThread", overriding the class-level decorator + ... + + + +While ``@on_thread`` provides great flexibility, be mindful of potential overhead from excessive thread switching. Use it judiciously, especially for frequently called methods. + + diff --git a/docs/index.rst b/docs/index.rst index e9526df..297d874 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,9 +12,9 @@ Key Features: 2. **Adaptable to Multiple Frontends**: Compatible with GUIs, scripts, networked automated labs, and AI-integrated microscopy -3. **Powerful Threading Capabilities**: Utilities for parallelization, asynchronous execution, and complex, multi-device workflows. +3. :ref:`Powerful Threading Capabilities `: Utilities for parallelization, asynchronous execution, and complex, multi-device workflows. -4. **Modality Agnostic**: Adaptable to diverse microscopy techniques thanks to general purpose design. +4. **Modality Agnosticism**: Adaptable to diverse microscopy techniques thanks to general purpose design. 5. **Modular, Reusable Device Instructions**: Building blocks that can be combined to create complex workflows, in order to promote code reuse and simplify experiment design diff --git a/docs/usage.rst b/docs/usage.rst index 35bf8d9..ce62f01 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -9,4 +9,4 @@ Usage usage/installation usage/backends - usage/key_features \ No newline at end of file + usage/key_features diff --git a/src/exengine/__init__.py b/src/exengine/__init__.py index 1b3cc6d..d393708 100644 --- a/src/exengine/__init__.py +++ b/src/exengine/__init__.py @@ -6,3 +6,4 @@ from ._version import __version__, version_info from .kernel.executor import ExecutionEngine +from .kernel.threading_decorator import on_thread diff --git a/src/exengine/integration_tests/test_imports.py b/src/exengine/integration_tests/test_imports.py index 87eb27c..3fb5af5 100644 --- a/src/exengine/integration_tests/test_imports.py +++ b/src/exengine/integration_tests/test_imports.py @@ -36,6 +36,11 @@ def test_mm_imports(): except ImportError as e: pytest.fail(f"Import failed for MicroManagerDevice: {e}") +def test_onthread_import(): + try: + from exengine import on_thread + except ImportError as e: + pytest.fail(f"Import failed for MicroManagerDevice: {e}") def test_ndstorage_imports(): try: diff --git a/src/exengine/integration_tests/test_preferred_thread_annotations.py b/src/exengine/integration_tests/test_preferred_thread_annotations.py new file mode 100644 index 0000000..808143d --- /dev/null +++ b/src/exengine/integration_tests/test_preferred_thread_annotations.py @@ -0,0 +1,218 @@ +import pytest +import threading +import functools +from exengine.kernel.device import Device +from exengine.kernel.ex_event_base import ExecutorEvent +from exengine.kernel.executor import ExecutionEngine +from exengine import on_thread +from exengine.kernel.executor import _MAIN_THREAD_NAME + + + +class ThreadRecordingEvent(ExecutorEvent): + def execute(self): + return threading.current_thread().name + + +@on_thread("CustomEventThread") +class DecoratedEvent(ThreadRecordingEvent): + pass + + +class TestDevice(Device): + + def __init__(self, name): + super().__init__(name, no_executor_attrs=('_attribute', 'set_attribute_thread', + 'get_attribute_thread', 'regular_method_thread', + 'decorated_method_thread')) + self._attribute = 123 + + @property + def attribute(self): + self.get_attribute_thread = threading.current_thread().name + return self._attribute + + @attribute.setter + def attribute(self, value): + self.set_attribute_thread = threading.current_thread().name + self._attribute = value + + def regular_method(self): + self.regular_method_thread = threading.current_thread().name + + @on_thread("CustomMethodThread") + def decorated_method(self): + self.decorated_method_thread = threading.current_thread().name + + +@on_thread("CustomDeviceThread") +class CustomThreadTestDevice(Device): + + def __init__(self, name): + super().__init__(name, no_executor_attrs=('_attribute', + 'set_attribute_thread', 'get_attribute_thread', + 'regular_method_thread', 'decorated_method_thread')) + self._attribute = 123 + + @property + def attribute(self): + self.get_attribute_thread = threading.current_thread().name + return self._attribute + + @attribute.setter + def attribute(self, value): + self.set_attribute_thread = threading.current_thread().name + self._attribute = value + + def regular_method(self): + self.regular_method_thread = threading.current_thread().name + +@pytest.fixture() +def engine(): + engine = ExecutionEngine() + yield engine + engine.shutdown() + +############################################################ +# Event tests +############################################################ + +def test_undecorated_event(engine): + """ + Test that an undecorated event executes on the main executor thread. + """ + event = ThreadRecordingEvent() + future = engine.submit(event) + result = future.await_execution() + assert result == _MAIN_THREAD_NAME + +def test_decorated_event(engine): + """ + Test that a decorated event executes on the specified custom thread. + """ + event = DecoratedEvent() + future = engine.submit(event) + result = future.await_execution() + assert result == "CustomEventThread" + + +############################################################ +# Device tests +############################################################ +def test_device_attribute_access(engine): + """ + Test that device attribute access runs on the main thread when nothing else specified. + """ + device = TestDevice("TestDevice") + device.attribute = 'something' + assert device.set_attribute_thread == _MAIN_THREAD_NAME + +def test_device_regular_method_access(engine): + """ + Test that device method access runs on the main thread when nothing else specified. + """ + device = TestDevice("TestDevice") + device.regular_method() + assert device.regular_method_thread == _MAIN_THREAD_NAME + +def test_device_decorated_method_access(engine): + """ + Test that device method access runs on the main thread when nothing else specified. + """ + device = TestDevice("TestDevice") + device.decorated_method() + assert device.decorated_method_thread == "CustomMethodThread" + +def test_custom_thread_device_attribute_access(engine): + """ + Test that device attribute access runs on the custom thread when specified. + """ + custom_device = CustomThreadTestDevice("CustomDevice") + custom_device.attribute = 'something' + assert custom_device.set_attribute_thread == "CustomDeviceThread" + +def test_custom_thread_device_property_access(engine): + """ + Test that device property access runs on the custom thread when specified. + """ + custom_device = CustomThreadTestDevice("CustomDevice") + custom_device.attribute = 'something' + assert custom_device.set_attribute_thread == "CustomDeviceThread" + + f = custom_device.attribute + assert custom_device.get_attribute_thread == "CustomDeviceThread" + + +@on_thread("OuterThread") +class OuterThreadDevice(Device): + def __init__(self, name, inner_device): + super().__init__(name) + self.inner_device = inner_device + self.outer_thread = None + + def outer_method(self): + self.outer_thread = threading.current_thread().name + self.inner_device.inner_method() + + +@on_thread("InnerThread") +class InnerThreadDevice(Device): + def __init__(self, name): + super().__init__(name) + self.inner_thread = None + + def inner_method(self): + self.inner_thread = threading.current_thread().name + + +def test_nested_thread_switch(engine): + """ + Test that nested calls to methods with different thread specifications + result in correct thread switches at each level. + """ + inner_device = InnerThreadDevice("InnerDevice") + outer_device = OuterThreadDevice("OuterDevice", inner_device) + + class OuterEvent(ExecutorEvent): + def execute(self): + outer_device.outer_method() + + event = OuterEvent() + + engine.submit(event).await_execution() + + assert outer_device.outer_thread == "OuterThread" + assert inner_device.inner_thread == "InnerThread" + + +def another_decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +class MultiDecoratedDevice(Device): + @on_thread("Thread1") + @another_decorator + def method1(self): + return threading.current_thread().name + + @another_decorator + @on_thread("Thread2") + def method2(self): + return threading.current_thread().name + + +def test_multiple_decorators(engine): + """ + Test that the thread decorator works correctly when combined with other decorators. + """ + device = MultiDecoratedDevice("MultiDevice") + + class MultiEvent(ExecutorEvent): + def execute(self): + return device.method1(), device.method2() + + assert engine.submit(MultiEvent()).await_execution() == ("Thread1", "Thread2") \ No newline at end of file diff --git a/src/exengine/integration_tests/work_in_progress/sandbox_test_micromanager_device.py b/src/exengine/integration_tests/work_in_progress/sandbox_test_micromanager_device.py deleted file mode 100644 index a632f62..0000000 --- a/src/exengine/integration_tests/work_in_progress/sandbox_test_micromanager_device.py +++ /dev/null @@ -1,42 +0,0 @@ -from exengine.kernel.data_coords import DataCoordinates -import os -from exengine.kernel.executor import ExecutionEngine -from exengine.kernel.ex_event_base import DataHandler -from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera -from storage_backends.ndtiff_and_ndram.NDTiffandRAM import NDRAMStorage -from exengine.events.detector_events import StartCapture, ReadoutData -from mmpycorex import create_core_instance, terminate_core_instances, get_default_install_location - -mm_install_dir = get_default_install_location() -config_file = os.path.join(mm_install_dir, 'MMConfig_demo.cfg') -create_core_instance(mm_install_dir, config_file, - buffer_size_mb=1024, max_memory_mb=1024, # set these low for github actions - python_backend=True, - debug=False) - - -executor = ExecutionEngine() - - - -camera = MicroManagerCamera() - -num_images = 100 -# Create a data handle to manage the handoff of data from the camera to the storage backend -storage = NDRAMStorage() -data_handler = DataHandler(storage=storage) - -start_capture_event = StartCapture(num_images=num_images, camera=camera) -readout_images_event = ReadoutData(number=num_images, camera=camera, - data_coordinate_iterator=[DataCoordinates(time=t) for t in range(num_images)], - data_handler=data_handler) -executor.submit(start_capture_event) -future = executor.submit(readout_images_event) - -# Wait for all images to be readout -future.await_execution() -executor.check_exceptions() - -data_handler.finish() -executor.shutdown() -terminate_core_instances() diff --git a/src/exengine/integration_tests/work_in_progress/sbox.py b/src/exengine/integration_tests/work_in_progress/sbox.py deleted file mode 100644 index cb27667..0000000 --- a/src/exengine/integration_tests/work_in_progress/sbox.py +++ /dev/null @@ -1,48 +0,0 @@ -from mmpycorex import create_core_instance, terminate_core_instances, get_default_install_location -from exengine.kernel.data_coords import DataCoordinates -from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera -import os -from exengine.kernel.executor import ExecutionEngine -from exengine.kernel.ex_event_base import DataHandler -from storage_backends.ndtiff_and_ndram.NDTiffandRAM import NDRAMStorage -from exengine.events.detector_events import StartCapture, ReadoutData - -mm_install_dir = get_default_install_location() -config_file = os.path.join(mm_install_dir, 'MMConfig_demo.cfg') -create_core_instance(mm_install_dir, config_file, - buffer_size_mb=1024, max_memory_mb=1024, # set these low for github actions - python_backend=True, - debug=False) - - -executor = ExecutionEngine() - - - -camera = MicroManagerCamera() - -num_images = 100 -data_handler = DataHandler(storage=NDRAMStorage()) - -start_capture_event = StartCapture(num_images=num_images, camera=camera) -readout_images_event = ReadoutData(number=num_images, camera=camera, - image_coordinate_iterator=[DataCoordinates(time=t) for t in range(num_images)], - data_handler=data_handler) -executor.submit(start_capture_event) -future = executor.submit(readout_images_event) - -future.await_execution() - -data_handler.finish() - -executor.shutdown() -terminate_core_instances() - - - -# # print all threads that are still a -# import threading -# -# for thread in threading.enumerate(): -# print(thread) -# pass \ No newline at end of file diff --git a/src/exengine/kernel/device.py b/src/exengine/kernel/device.py index 735788f..c932e76 100644 --- a/src/exengine/kernel/device.py +++ b/src/exengine/kernel/device.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Callable, Sequence, Optional, List, Tuple, Iterable, Union from weakref import WeakSet from dataclasses import dataclass +import types from .ex_event_base import ExecutorEvent from .executor import ExecutionEngine @@ -54,7 +55,7 @@ def _thread_start(self, *args, **kwargs): # Call this function to initialize the thread patching if not hasattr(threading.Thread, '_monkey_patched_start'): _python_debugger_active, _within_executor_threads, _original_thread_start = _initialize_thread_patching() - _no_executor_attrs = ['_name', '_no_executor', '_no_executor_attrs'] + _no_executor_attrs = ['_name', '_no_executor', '_no_executor_attrs', '_thread_name'] @dataclass @@ -108,14 +109,29 @@ def wrap_for_executor(attr_name, attr_value): if hasattr(attr_value, '_wrapped_for_executor'): return attr_value + # Add this block to handle properties + if isinstance(attr_value, property): + return property( + fget=DeviceMetaclass.wrap_for_executor(f"{attr_name}_getter", attr_value.fget) if attr_value.fget else None, + fset=DeviceMetaclass.wrap_for_executor(f"{attr_name}_setter", attr_value.fset) if attr_value.fset else None, + fdel=DeviceMetaclass.wrap_for_executor(f"{attr_name}_deleter", attr_value.fdel) if attr_value.fdel else None, + doc=attr_value.__doc__ + ) + @wraps(attr_value) def wrapper(self: 'Device', *args: Any, **kwargs: Any) -> Any: if attr_name in _no_executor_attrs or self._no_executor: return attr_value(self, *args, **kwargs) if DeviceMetaclass._is_reroute_exempted_thread(): return attr_value(self, *args, **kwargs) + # check for method-level preferred thread name first, then class-level + thread_name = getattr(attr_value, '_thread_name', None) or getattr(self, '_thread_name', None) + if ExecutionEngine.on_any_executor_thread(): + # check for device-level preferred thread + if thread_name is None or threading.current_thread().name == thread_name: + return attr_value(self, *args, **kwargs) event = MethodCallEvent(method_name=attr_name, args=args, kwargs=kwargs, instance=self) - return ExecutionEngine.get_instance().submit(event).await_execution() + return ExecutionEngine.get_instance().submit(event, thread_name=thread_name).await_execution() wrapper._wrapped_for_executor = True return wrapper @@ -133,8 +149,7 @@ def is_debugger_thread(): @staticmethod def _is_reroute_exempted_thread() -> bool: - return (DeviceMetaclass.is_debugger_thread() or ExecutionEngine.on_any_executor_thread() or - threading.current_thread() in _within_executor_threads) + return (DeviceMetaclass.is_debugger_thread() or threading.current_thread() in _within_executor_threads) @staticmethod def find_in_bases(bases, method_name): @@ -147,10 +162,12 @@ def __new__(mcs, name: str, bases: tuple, attrs: dict) -> Any: new_attrs = {} for attr_name, attr_value in attrs.items(): if not attr_name.startswith('_'): - if callable(attr_value): + if isinstance(attr_value, property): # Property + new_attrs[attr_name] = mcs.wrap_for_executor(attr_name, attr_value) + elif callable(attr_value): # Regular method new_attrs[attr_name] = mcs.wrap_for_executor(attr_name, attr_value) - else: - pass + else: # Attribute + new_attrs[attr_name] = attr_value else: new_attrs[attr_name] = attr_value @@ -174,18 +191,26 @@ def __getattribute__(self: 'Device', name: str) -> Any: return object.__getattribute__(self, name) if DeviceMetaclass._is_reroute_exempted_thread(): return getattribute_with_fallback(self, name) - else: - event = GetAttrEvent(attr_name=name, instance=self, method=getattribute_with_fallback) - return ExecutionEngine.get_instance().submit(event).await_execution() + thread_name = getattr(self, '_thread_name', None) + if ExecutionEngine.on_any_executor_thread(): + # check for device-level preferred thread + if thread_name is None or threading.current_thread().name == thread_name: + return getattribute_with_fallback(self, name) + event = GetAttrEvent(attr_name=name, instance=self, method=getattribute_with_fallback) + return ExecutionEngine.get_instance().submit(event, thread_name=thread_name).await_execution() def __setattr__(self: 'Device', name: str, value: Any) -> None: if name in _no_executor_attrs or self._no_executor: - original_setattr(self, name, value) - elif DeviceMetaclass._is_reroute_exempted_thread(): - original_setattr(self, name, value) - else: - event = SetAttrEvent(attr_name=name, value=value, instance=self, method=original_setattr) - ExecutionEngine.get_instance().submit(event).await_execution() + return original_setattr(self, name, value) + if DeviceMetaclass._is_reroute_exempted_thread(): + return original_setattr(self, name, value) + thread_name = getattr(self, '_thread_name', None) + if ExecutionEngine.on_any_executor_thread(): + # Check for device-level preferred thread + if thread_name is None or threading.current_thread().name == thread_name: + return original_setattr(self, name, value) + event = SetAttrEvent(attr_name=name, value=value, instance=self, method=original_setattr) + ExecutionEngine.get_instance().submit(event, thread_name=thread_name).await_execution() new_attrs['__getattribute__'] = __getattribute__ new_attrs['__setattr__'] = __setattr__ diff --git a/src/exengine/kernel/ex_event_base.py b/src/exengine/kernel/ex_event_base.py index 49f5114..5cd0d1c 100644 --- a/src/exengine/kernel/ex_event_base.py +++ b/src/exengine/kernel/ex_event_base.py @@ -42,12 +42,15 @@ class ExecutorEvent(ABC, metaclass=_ExecutorEventMeta): # Base events just have an event executed event. Subclasses can also add their own lists # of notifications types, and the metaclass will merge them into one big list notification_types: ClassVar[List[Type[Notification]]] = [EventExecutedNotification] + _thread_name: Optional[str] = None def __init__(self, *args, **kwargs): super().__init__() self._num_retries_on_exception = 0 self._finished = False self._initialized = False + # Check for method-level preferred thread name first, then class-level + self._thread_name = getattr(self.execute, '_thread_name', None) or getattr(self.__class__, '_thread_name', None) def _pre_execution(self, engine) -> ExecutionFuture: """ diff --git a/src/exengine/kernel/executor.py b/src/exengine/kernel/executor.py index ccd6a66..7defde8 100644 --- a/src/exengine/kernel/executor.py +++ b/src/exengine/kernel/executor.py @@ -15,6 +15,8 @@ from .data_handler import DataHandler +_MAIN_THREAD_NAME = 'MainExecutorThread' +_ANONYMOUS_THREAD_NAME = 'AnonymousExecutorThread' class MultipleExceptions(Exception): def __init__(self, exceptions: List[Exception]): @@ -34,7 +36,7 @@ def __new__(cls, *args, **kwargs): cls._instance = super().__new__(cls) return cls._instance - def __init__(self, num_threads=1): + def __init__(self): self._exceptions = queue.Queue() self._devices = {} self._notification_queue = queue.Queue() @@ -46,9 +48,8 @@ def __init__(self, num_threads=1): with self._lock: if not hasattr(self, '_initialized'): - self._thread_managers = [] - for _ in range(num_threads): - self._start_new_thread() + self._thread_managers = {} + self._start_new_thread(_MAIN_THREAD_NAME) self._initialized = True def subscribe_to_notifications(self, subscriber: Callable[[Notification], None], @@ -143,7 +144,7 @@ def on_main_executor_thread(cls): """ Check if the current thread is an executor thread """ - return threading.current_thread() is ExecutionEngine.get_instance()._thread_managers[0] + return threading.current_thread().name is _MAIN_THREAD_NAME @classmethod def on_any_executor_thread(cls): @@ -153,8 +154,8 @@ def on_any_executor_thread(cls): and threading.current_thread().execution_engine_thread) return result - def _start_new_thread(self): - self._thread_managers.append(_ExecutionThreadManager()) + def _start_new_thread(self, name): + self._thread_managers[name] = _ExecutionThreadManager(name) def set_debug_mode(self, debug): ExecutionEngine._debug = debug @@ -176,7 +177,7 @@ def check_exceptions(self): else: raise MultipleExceptions(exceptions) - def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], + def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], thread_name=None, transpile: bool = True, prioritize: bool = False, use_free_thread: bool = False, data_handler: DataHandler = None) -> Union[ExecutionFuture, Iterable[ExecutionFuture]]: """ @@ -195,6 +196,10 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], event_or_events : Union[ExecutorEvent, Iterable[ExecutorEvent]] A single ExecutorEvent or an iterable of ExecutorEvents to be submitted. + thread_name : str, optional (default=None) + Name of the thread to submit the event to. If None, the thread is determined by the + 'use_free_thread' parameter. + transpile : bool, optional (default=True) If True and multiple events are submitted, attempt to optimize them for better performance. This may result in events being combined or reorganized. @@ -204,7 +209,7 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], Useful for system-wide changes affecting other events, like hardware adjustments. use_free_thread : bool, optional (default=False) - If True, execute the event(s) on an available thread with an empty queue, creating a execution_engine one if necessary. + If True, execute the event(s) on an available thread with an empty queue, creating a new thread if needed. Useful for operations like cancelling or stopping events awaiting signals. If False, execute on the primary thread. @@ -224,11 +229,6 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], - Use 'prioritize' for critical system changes that should occur before other queued events. - 'use_free_thread' is essential for operations that need to run independently, like cancellation events. """ - - # global ExecutorEvent - # if isinstance(ExecutorEvent, str): - # # runtime import to avoid circular imports - # from .ex_event_base import ExecutorEvent if isinstance(event_or_events, ExecutorEvent): event_or_events = [event_or_events] @@ -236,29 +236,46 @@ def submit(self, event_or_events: Union[ExecutorEvent, Iterable[ExecutorEvent]], # TODO: transpile events pass - futures = tuple(self._submit_single_event(event, use_free_thread, prioritize) + futures = tuple(self._submit_single_event(event, thread_name or getattr(event_or_events[0], '_thread_name', None), + use_free_thread, prioritize) for event in event_or_events) if len(futures) == 1: return futures[0] return futures - def _submit_single_event(self, event: ExecutorEvent, use_free_thread: bool = False, prioritize: bool = False): + def _submit_single_event(self, event: ExecutorEvent, thread_name=None, use_free_thread: bool = False, + prioritize: bool = False): """ Submit a single event for execution """ future = event._pre_execution(self) if use_free_thread: need_new_thread = True - for thread in self._thread_managers: - if thread.is_free(): - thread.submit_event(event) - need_new_thread = False - break + if thread_name is not None: + warnings.warn("thread_name may be ignored when use_free_thread is True") + # Iterate through main thread and anonymous threads + if self._thread_managers[_MAIN_THREAD_NAME].is_free(): + self._thread_managers[_MAIN_THREAD_NAME].submit_event(event, prioritize=prioritize) + need_new_thread = False + else: + for tname in self._thread_managers.keys(): + if tname.startswith(_ANONYMOUS_THREAD_NAME) and self._thread_managers[tname].is_free(): + self._thread_managers[tname].submit_event(event, prioritize=prioritize) + need_new_thread = False + break if need_new_thread: - self._start_new_thread() - self._thread_managers[-1].submit_event(event) + num_anon_threads = len([tname for tn in self._thread_managers.keys() if + tn.startswith(_ANONYMOUS_THREAD_NAME)]) + anonymous_thread_name = _ANONYMOUS_THREAD_NAME + str(num_anon_threads) + self._start_new_thread(anonymous_thread_name) + self._thread_managers[anonymous_thread_name].submit_event(event) else: - self._thread_managers[0].submit_event(event, prioritize=prioritize) + if thread_name is not None: + if thread_name not in self._thread_managers: + self._start_new_thread(thread_name) + self._thread_managers[thread_name].submit_event(event, prioritize=prioritize) + else: + self._thread_managers[_MAIN_THREAD_NAME].submit_event(event, prioritize=prioritize) return future @@ -270,9 +287,9 @@ def shutdown(self): # TODO: add explicit shutdowns for devices here? self._devices = None self._shutdown_event.set() - for thread in self._thread_managers: + for thread in self._thread_managers.values(): thread.shutdown() - for thread in self._thread_managers: + for thread in self._thread_managers.values(): thread.join() # Make sure the notification thread is stopped diff --git a/src/exengine/kernel/test/test_executor.py b/src/exengine/kernel/test/test_executor.py index cb46e48..9689cdb 100644 --- a/src/exengine/kernel/test/test_executor.py +++ b/src/exengine/kernel/test/test_executor.py @@ -11,9 +11,9 @@ import time -@pytest.fixture(scope="module") +@pytest.fixture() def execution_engine(): - engine = ExecutionEngine(num_threads=2) + engine = ExecutionEngine() yield engine engine.shutdown() @@ -22,11 +22,13 @@ def execution_engine(): # Tests for automated rerouting of method calls to the ExecutionEngine to executor threads ############################################################################################# counter = 1 -class MockDevice(Device): +class TestDevice(Device): def __init__(self): global counter - super().__init__(name=f'mock_device_{counter}') + super().__init__(name=f'mock_device_{counter}', no_executor_attrs=('property_getter_monitor', 'property_setter_monitor')) counter += 1 + self.property_getter_monitor = False + self.property_setter_monitor = False self._test_attribute = None def test_method(self): @@ -44,27 +46,40 @@ def get_test_attribute(self): assert threading.current_thread().execution_engine_thread return self._test_attribute + @property + def test_property(self): + assert ExecutionEngine.on_any_executor_thread() + self.property_getter_monitor = True + return self._test_attribute + + @test_property.setter + def test_property(self, value): + assert ExecutionEngine.on_any_executor_thread() + self.property_setter_monitor = True + self._test_attribute = value + + def test_device_method_execution(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() result = mock_device.test_method() assert result is True def test_device_attribute_setting(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() mock_device.set_test_attribute("test_value") result = mock_device.get_test_attribute() assert result == "test_value" def test_device_attribute_direct_setting(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() mock_device.direct_set_attribute = "direct_test_value" assert mock_device.direct_set_attribute == "direct_test_value" def test_multiple_method_calls(execution_engine): - mock_device = MockDevice() + mock_device = TestDevice() result1 = mock_device.test_method() mock_device.set_test_attribute("test_value") @@ -73,6 +88,17 @@ def test_multiple_method_calls(execution_engine): assert result1 is True assert result2 == "test_value" +def test_device_property_getter(execution_engine): + mock_device = TestDevice() + + _ = mock_device.test_property + assert mock_device.property_getter_monitor + +def test_device_property_setter(execution_engine): + mock_device = TestDevice() + + mock_device.test_property = "test_value" + assert mock_device.property_setter_monitor ####################################################### # Tests for internal threads in Devices @@ -204,8 +230,11 @@ def create_sync_event(start_event, finish_event): event.executed = False event.executed_time = None event.execute_count = 0 + event.executed_thread_name = None + event._thread_name = None def execute(): + event.executed_thread_name = threading.current_thread().name start_event.set() # Signal that the execution has started finish_event.wait() # Wait for the signal to finish event.executed_time = time.time() @@ -366,4 +395,113 @@ def test_single_execution_with_free_thread(execution_engine): assert event1.executed assert event2.executed assert event1.execute_count == 1 - assert event2.execute_count == 1 \ No newline at end of file + assert event2.execute_count == 1 + + +####################################################### +# Tests for named thread functionalities ############## +####################################################### + +from exengine.kernel.executor import _MAIN_THREAD_NAME +from exengine.kernel.executor import _ANONYMOUS_THREAD_NAME + + +def test_submit_to_main_thread(execution_engine): + """ + Test submitting an event to the main thread. + """ + start_event = threading.Event() + finish_event = threading.Event() + event = create_sync_event(start_event, finish_event) + + future = execution_engine.submit(event) + start_event.wait() + finish_event.set() + + assert event.executed_thread_name == _MAIN_THREAD_NAME + +def test_submit_to_new_anonymous_thread(execution_engine): + """ + Test that submitting an event with use_free_thread=True creates a new anonymous thread if needed. + """ + start_event1 = threading.Event() + finish_event1 = threading.Event() + event1 = create_sync_event(start_event1, finish_event1) + + start_event2 = threading.Event() + finish_event2 = threading.Event() + event2 = create_sync_event(start_event2, finish_event2) + + # Submit first event to main thread + execution_engine.submit(event1) + start_event1.wait() + + # Submit second event with use_free_thread=True + future2 = execution_engine.submit(event2, use_free_thread=True) + start_event2.wait() + + finish_event1.set() + finish_event2.set() + + assert event1.executed_thread_name == _MAIN_THREAD_NAME + assert event2.executed_thread_name.startswith(_ANONYMOUS_THREAD_NAME) + assert len(execution_engine._thread_managers) == 2 # Main thread + 1 anonymous thread + +def test_multiple_anonymous_threads(execution_engine): + """ + Test creation of multiple anonymous threads when submitting multiple events with use_free_thread=True. + """ + events = [] + start_events = [] + finish_events = [] + num_events = 5 + + for _ in range(num_events): + start_event = threading.Event() + finish_event = threading.Event() + event = create_sync_event(start_event, finish_event) + events.append(event) + start_events.append(start_event) + finish_events.append(finish_event) + + futures = [execution_engine.submit(event, use_free_thread=True) for event in events] + + for start_event in start_events: + start_event.wait() + + for finish_event in finish_events: + finish_event.set() + + thread_names = set(event.executed_thread_name for event in events) + assert len(thread_names) == num_events # Each event should be on a different thread + assert all(name.startswith(_ANONYMOUS_THREAD_NAME) or name == _MAIN_THREAD_NAME for name in thread_names) + assert len(execution_engine._thread_managers) == num_events # num_events anonymous threads + +def test_reuse_named_thread(execution_engine): + """ + Test that submitting multiple events to the same named thread reuses that thread. + """ + thread_name = "custom_thread" + events = [] + start_events = [] + finish_events = [] + num_events = 3 + + for _ in range(num_events): + start_event = threading.Event() + finish_event = threading.Event() + event = create_sync_event(start_event, finish_event) + events.append(event) + start_events.append(start_event) + finish_events.append(finish_event) + + futures = [execution_engine.submit(event, thread_name=thread_name) for event in events] + + for finish_event in finish_events: + finish_event.set() + + for start_event in start_events: + start_event.wait() + + assert all(event.executed_thread_name == thread_name for event in events) + assert len(execution_engine._thread_managers) == 2 # Main thread + 1 custom named thread \ No newline at end of file diff --git a/src/exengine/kernel/threading_decorator.py b/src/exengine/kernel/threading_decorator.py new file mode 100644 index 0000000..3a4f420 --- /dev/null +++ b/src/exengine/kernel/threading_decorator.py @@ -0,0 +1,21 @@ +import functools +from typing import Optional, Union, Type, Callable + +def on_thread(thread_name: Optional[str] = None): + """ + Decorator to specify the preferred thread name for a class or method. + """ + def decorator(obj: Union[Type, Callable]): + if isinstance(obj, type): + # It's a class + obj._thread_name = thread_name + return obj + else: + # It's a method or function + @functools.wraps(obj) + def wrapper(*args, **kwargs): + return obj(*args, **kwargs) + wrapper._thread_name = thread_name + return wrapper + + return decorator \ No newline at end of file