diff --git a/src/exengine/__init__.py b/src/exengine/__init__.py index d393708..b4fa3de 100644 --- a/src/exengine/__init__.py +++ b/src/exengine/__init__.py @@ -7,3 +7,10 @@ from .kernel.executor import ExecutionEngine from .kernel.threading_decorator import on_thread + +__all__ = [ + "ExecutionEngine", + "on_thread", + "__version__", + "version_info", +] diff --git a/src/exengine/backends/micromanager/mm_device_implementations.py b/src/exengine/backends/micromanager/mm_device_implementations.py index d716ad1..bf5df59 100644 --- a/src/exengine/backends/micromanager/mm_device_implementations.py +++ b/src/exengine/backends/micromanager/mm_device_implementations.py @@ -5,11 +5,14 @@ from exengine.device_types import (Detector, TriggerableSingleAxisPositioner, TriggerableDoubleAxisPositioner) from exengine.kernel.device import Device from mmpycorex import Core -import numpy as np +import numpy.typing as npt import pymmcore import time from concurrent.futures import ThreadPoolExecutor -from typing import List, Union, Iterable, Tuple +from typing import Union, Iterable, Tuple, TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any @@ -29,7 +32,7 @@ def __init__(self, name=None, _validatename=True): self._core_noexec = Core() if _validatename: loaded_devices = self._core_noexec.get_loaded_devices() - if name is not None and not name in loaded_devices: + if name is not None and name not in loaded_devices: raise Exception(f'Device with name {name} not found') if name is None and len(loaded_devices) > 1: raise ValueError("Multiple Stage device_implementations found, must specify device name") @@ -73,7 +76,7 @@ def __dir__(self): print(f"Warning: Failed to retrieve device properties: {e}") return sorted(attributes) - def get_allowed_property_values(self, property_name: str) -> List[str]: + def get_allowed_property_values(self, property_name: str) -> list[str]: return self._core_noexec.get_allowed_property_values(self._device_name_noexec, property_name) def is_property_read_only(self, property_name: str) -> bool: @@ -128,7 +131,7 @@ def set_position(self, position: float) -> None: def get_position(self) -> float: return self._core_noexec.get_position(self._device_name_noexec) - def set_position_sequence(self, positions: np.ndarray) -> None: + def set_position_sequence(self, positions: npt.NDArray["Any"]) -> None: if not self._core_noexec.is_stage_sequenceable(self._device_name_noexec): raise ValueError("Stage does not support sequencing") max_length = self._core_noexec.get_stage_sequence_max_length(self._device_name_noexec) @@ -175,7 +178,7 @@ def set_position(self, x: float, y: float) -> None: def get_position(self) -> Tuple[float, float]: return self._core_noexec.get_xy_position(self._device_name_noexec) - def set_position_sequence(self, positions: np.ndarray) -> None: + def set_position_sequence(self, positions: npt.NDArray["Any"]) -> None: if not self._core_noexec.is_xy_stage_sequenceable(self._device_name_noexec): raise ValueError("Stage does not support sequencing") max_length = self._core_noexec.get_xy_stage_sequence_max_length(self._device_name_noexec) @@ -262,14 +265,14 @@ def stop(self) -> None: def is_stopped(self) -> bool: return not self._core_noexec.is_sequence_running(self._device_name_noexec) and not self._snap_active - def pop_data(self, timeout=None) -> Tuple[np.ndarray, dict]: + def pop_data(self, timeout=None) -> Tuple[npt.NDArray["Any"], dict]: if self._frame_count != 1: md = pymmcore.Metadata() start_time = time.time() while True: try: pix = self._core_noexec.pop_next_image_md(0, 0, md) - except IndexError as e: + except IndexError: pix = None if pix is not None: break diff --git a/src/exengine/backends/micromanager/test/test_mm_camera.py b/src/exengine/backends/micromanager/test/test_mm_camera.py index fd80ddc..db469b5 100644 --- a/src/exengine/backends/micromanager/test/test_mm_camera.py +++ b/src/exengine/backends/micromanager/test/test_mm_camera.py @@ -1,8 +1,6 @@ import pytest import time -import os import itertools -from mmpycorex import create_core_instance, terminate_core_instances, get_default_install_location from exengine.kernel.executor import ExecutionEngine from exengine.kernel.data_handler import DataHandler from exengine.kernel.data_coords import DataCoordinates @@ -33,7 +31,7 @@ def capture_images(num_images, executor, camera): executor.submit([start_capture_event, readout_images_event]) - while not {'time': num_images - 1} in storage: + while {'time': num_images - 1} not in storage: time.sleep(1) data_handler.finish() @@ -60,11 +58,11 @@ def test_continuous_capture(executor, camera): storage = NDRAMStorage() data_handler = DataHandler(storage=storage) - start_capture_event = StartContinuousCapture(camera=camera) + start_capture_event = StartContinuousCapture(detector=camera) readout_images_event = ReadoutData(detector=camera, data_coordinates_iterator=(DataCoordinates(time=t) for t in itertools.count()), data_handler=data_handler) - stop_capture_event = StopCapture(camera=camera) + stop_capture_event = StopCapture(detector=camera) _, readout_future, _ = executor.submit([start_capture_event, readout_images_event, stop_capture_event]) time.sleep(2) diff --git a/src/exengine/base_classes.py b/src/exengine/base_classes.py index 2c44948..b796dd1 100644 --- a/src/exengine/base_classes.py +++ b/src/exengine/base_classes.py @@ -2,4 +2,15 @@ from .kernel.ex_event_capabilities import DataProducing, Stoppable, Abortable from .kernel.ex_event_base import ExecutorEvent from .kernel.device import Device -from .kernel.data_storage_base import DataStorage \ No newline at end of file +from .kernel.data_storage_base import DataStorage + +__all__ = [ + "Notification", + "NotificationCategory", + "DataProducing", + "Stoppable", + "Abortable", + "ExecutorEvent", + "Device", + "DataStorage" +] \ No newline at end of file diff --git a/src/exengine/conftest.py b/src/exengine/conftest.py index 4ea6c65..1f63961 100644 --- a/src/exengine/conftest.py +++ b/src/exengine/conftest.py @@ -1,13 +1,5 @@ import os -import sys -import shutil -import warnings - import pytest -import re -import glob - -import socket from mmpycorex import (download_and_install_mm, find_existing_mm_install, create_core_instance, terminate_core_instances, get_default_install_location) diff --git a/src/exengine/data.py b/src/exengine/data.py index b9f0b15..4696e40 100644 --- a/src/exengine/data.py +++ b/src/exengine/data.py @@ -2,4 +2,10 @@ Convenience file for imports """ from .kernel.data_coords import DataCoordinates, DataCoordinatesIterator -from .kernel.data_handler import DataHandler \ No newline at end of file +from .kernel.data_handler import DataHandler + +__all__ = [ + "DataCoordinates", + "DataCoordinatesIterator", + "DataHandler" +] \ No newline at end of file diff --git a/src/exengine/device_types.py b/src/exengine/device_types.py index 143ec89..47d6928 100644 --- a/src/exengine/device_types.py +++ b/src/exengine/device_types.py @@ -3,9 +3,9 @@ """ from abc import abstractmethod -from typing import Tuple -import numpy as np +from typing import Tuple, Any from .kernel.device import Device +import numpy.typing as npt # TODO: could replace hard coded classes with @@ -31,7 +31,7 @@ class TriggerableSingleAxisPositioner(SingleAxisPositioner): A special type of positioner that can accept a sequence of positions to move to when provided external TTL triggers """ @abstractmethod - def set_position_sequence(self, positions: np.ndarray) -> None: + def set_position_sequence(self, positions: npt.NDArray[Any]) -> None: ... @abstractmethod @@ -54,13 +54,13 @@ def set_position(self, x: float, y: float) -> None: ... @abstractmethod - def get_position(self) -> Tuple[float, float]: + def get_position(self) -> "Tuple[float, float]": ... class TriggerableDoubleAxisPositioner(DoubleAxisPositioner): @abstractmethod - def set_position_sequence(self, positions: np.ndarray) -> None: + def set_position_sequence(self, positions: npt.NDArray[Any]) -> None: ... @abstractmethod @@ -101,7 +101,7 @@ def is_stopped(self) -> bool: ... @abstractmethod - def pop_data(self, timeout=None) -> Tuple[np.ndarray, dict]: + def pop_data(self, timeout=None) -> Tuple[npt.NDArray[Any], dict[str, Any]]: """ Get the next image and metadata from the camera buffer. If timeout is None, this function will block until an image is available. If timeout is a number, this function will block for that many seconds before returning diff --git a/src/exengine/events/detector_events.py b/src/exengine/events/detector_events.py index 91d1388..7342c03 100644 --- a/src/exengine/events/detector_events.py +++ b/src/exengine/events/detector_events.py @@ -12,7 +12,7 @@ class DataAcquiredNotification(Notification[DataCoordinates]): category = NotificationCategory.Data - description = "Data has been acquired by a camera or other data-producing device and is now available" + description = "Data has been acquired by a detector or other data-producing device and is now available" # payload is the data coordinates of the acquired data class ReadoutData(Stoppable, DataProducing, ExecutorEvent): @@ -57,8 +57,8 @@ def execute(self) -> None: # if detector is a string, look it up in the device registry self.detector: Detector = (self.detector if isinstance(self.detector, Detector) else ExecutionEngine.get_device(self.detector)) - # TODO a more efficient way to do this is with callbacks from the camera - # but this is not currently implemented, at least for Micro-Manager cameras + # TODO a more efficient way to do this is with callbacks from the detector + # but this is not currently implemented, at least for Micro-Manager cameras image_counter = itertools.count() if self.num_blocks is None else range(self.num_blocks) for image_number, image_coordinates in zip(image_counter, self.data_coordinate_iterator): while True: @@ -108,19 +108,19 @@ class StartContinuousCapture(ExecutorEvent): Tell Detector device to start capturing images continuously, until a stop signal is received """ - def __init__(self, camera: Optional[Detector] = None): + def __init__(self, detector: Optional[Detector] = None): super().__init__() - self.camera = camera + self.detector = detector def execute(self): """ - Capture images from the camera + Capture images from the detector """ try: - self.camera.arm() - self.camera.start() + self.detector.arm() + self.detector.start() except Exception as e: - self.camera.stop() + self.detector.stop() raise e class StopCapture(ExecutorEvent): @@ -128,9 +128,9 @@ class StopCapture(ExecutorEvent): Tell Detector device to start capturing data continuously, until a stop signal is received """ - def __init__(self, camera: Optional[Detector] = None): + def __init__(self, detector: Optional[Detector] = None): super().__init__() - self.camera = camera + self.detector = detector def execute(self): - self.camera.stop() + self.detector.stop() diff --git a/src/exengine/events/multi_d_events.py b/src/exengine/events/multi_d_events.py index e4a45fb..67eab35 100644 --- a/src/exengine/events/multi_d_events.py +++ b/src/exengine/events/multi_d_events.py @@ -5,7 +5,7 @@ from exengine.kernel.data_coords import DataCoordinates from exengine.events.property_events import (SetPropertiesEvent) from exengine.events.positioner_events import SetTriggerable1DPositionsEvent, SetPosition1DEvent -from typing import Union, List, Iterable, Optional +from typing import Union, Iterable, Optional import numpy as np import copy from itertools import chain @@ -16,7 +16,7 @@ def flatten(lst): def multi_d_acquisition_events( num_time_points: int = None, - time_interval_s: Union[float, List[float]] = 0, + time_interval_s: Union[float, list[float]] = 0, z_start: float = None, z_end: float = None, z_step: float = None, @@ -27,7 +27,7 @@ def multi_d_acquisition_events( xy_positions: Iterable = None, xyz_positions: Iterable = None, - position_labels: List[str] = None, + position_labels: list[str] = None, order: str = "tpcz", sequence: str = None, # should be "zc", "cz", "tzc", etc camera: Optional[Union[Detector, str]] = None, @@ -64,7 +64,7 @@ def multi_d_acquisition_events( has_zsteps = False if any([z_start, z_step, z_end]): - if not None in [z_start, z_step, z_end]: + if None not in [z_start, z_step, z_end]: has_zsteps = True else: raise ValueError('All of z_start, z_step, and z_end must be provided') diff --git a/src/exengine/events/positioner_events.py b/src/exengine/events/positioner_events.py index 28607f9..c375eb3 100644 --- a/src/exengine/events/positioner_events.py +++ b/src/exengine/events/positioner_events.py @@ -1,10 +1,13 @@ -from typing import List, Union, Tuple, Optional, SupportsFloat -import numpy as np +from typing import Union, Tuple, Optional, SupportsFloat, TYPE_CHECKING +import numpy.typing as npt from exengine.kernel.ex_event_base import ExecutorEvent from exengine.device_types import (DoubleAxisPositioner, SingleAxisPositioner, TriggerableSingleAxisPositioner, TriggerableDoubleAxisPositioner) +if TYPE_CHECKING: + from typing import Any + class SetPosition2DEvent(ExecutorEvent): """ @@ -23,7 +26,7 @@ class SetTriggerable2DPositionsEvent(ExecutorEvent): Set the position of a movable device """ - def __init__(self, device: Optional[TriggerableDoubleAxisPositioner], positions: Union[List[Tuple[float, float]], np.ndarray]): + def __init__(self, device: Optional[TriggerableDoubleAxisPositioner], positions: Union[list[Tuple[float, float]], npt.NDArray["Any"]]): super().__init__() self.device = device self.positions = positions @@ -49,7 +52,7 @@ class SetTriggerable1DPositionsEvent(ExecutorEvent): Send a sequence of positions to a 1D positioner that will be triggered by TTL pulses """ - def __init__(self, device: Optional[TriggerableSingleAxisPositioner], positions: Union[List[float], np.ndarray]): + def __init__(self, device: Optional[TriggerableSingleAxisPositioner], positions: Union[list[float], npt.NDArray["Any"]]): super().__init__() self.device = device self.positions = positions diff --git a/src/exengine/events/property_events.py b/src/exengine/events/property_events.py index eb5cb1b..db87685 100644 --- a/src/exengine/events/property_events.py +++ b/src/exengine/events/property_events.py @@ -1,5 +1,4 @@ -from typing import Any, Iterable, Tuple, Union, List -from dataclasses import dataclass +from typing import Any, Iterable, Tuple, Union from exengine.kernel.device import Device from exengine.kernel.executor import ExecutionEngine from exengine.kernel.ex_event_base import ExecutorEvent diff --git a/src/exengine/examples/implicit_vs_explicit_excutor.py b/src/exengine/examples/implicit_vs_explicit_excutor.py index 4fafe2e..74564ac 100644 --- a/src/exengine/examples/implicit_vs_explicit_excutor.py +++ b/src/exengine/examples/implicit_vs_explicit_excutor.py @@ -1,27 +1,34 @@ -from mmpycorex import create_core_instance, download_and_install_mm, terminate_core_instances +from mmpycorex import create_core_instance, terminate_core_instances from exengine.kernel.executor import ExecutionEngine -from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera, MicroManagerSingleAxisStage +from exengine.kernel.notification_base import Notification +from exengine.backends.micromanager.mm_device_implementations import MicroManagerSingleAxisStage from exengine.events.positioner_events import SetPosition1DEvent +def event_complete(notification: Notification) -> None: + print(f"Event complete, notification: {notification.category} - {notification.description} - {notification.payload}") # download_and_install_mm() # If needed # Start Micro-Manager core instance with Demo config -create_core_instance() +try: + create_core_instance() -executor = ExecutionEngine() -z_stage = MicroManagerSingleAxisStage() + executor = ExecutionEngine() + z_stage = MicroManagerSingleAxisStage() -# This occurs on the executor thread. The event is submitted to the executor and its result is awaited, -# meaning the call will block until the method is executed. -z_stage.set_position(100, thread='device_setting_thread') -# it is equivalent to: -executor.submit(SetPosition1DEvent(position=100, device=z_stage)).await_execution() + executor.subscribe_to_notifications(event_complete) + # explicit + z_stage.set_position(100) + # it is equivalent to: + # executor.submit(SetPosition1DEvent(position=100, device=z_stage), thread_name='device_setting_thread').await_execution() + # but the execution thread is the main thread + # implicit + # start capture first; we use await execution in order to make sure that the camera has finished acquisition + executor.submit(SetPosition1DEvent(position=100, device=z_stage), thread_name='device_setting_thread') -executor.submit(SetPosition1DEvent(position=100, device=z_stage), thread='device_setting_thread') -executor.submit(ReadoutImages(), thread='readout_thread') - - - -executor.shutdown() \ No newline at end of file + executor.shutdown() + terminate_core_instances() +except Exception as e: + print(f"An error occurred: {e}") + terminate_core_instances() \ No newline at end of file diff --git a/src/exengine/examples/micromanager_example.py b/src/exengine/examples/micromanager_example.py index 1a65e96..c1f8420 100644 --- a/src/exengine/examples/micromanager_example.py +++ b/src/exengine/examples/micromanager_example.py @@ -1,11 +1,14 @@ -from mmpycorex import create_core_instance, download_and_install_mm, terminate_core_instances +from mmpycorex import create_core_instance, terminate_core_instances from exengine.kernel.executor import ExecutionEngine from exengine.kernel.data_coords import DataCoordinates -from exengine.kernel.ex_event_base import DataHandler +from exengine.kernel.notification_base import Notification, NotificationCategory +from exengine.data import DataHandler from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera, MicroManagerSingleAxisStage -from storage_backends.ndtiff_and_ndram.NDTiffandRAM import NDRAMStorage +from exengine.storage_backends.ndtiff_and_ndram import NDRAMStorage from exengine.events.detector_events import StartCapture, ReadoutData +def callback(notification: Notification) -> None: + print(f"Notification received: {notification.category} - {notification.payload}") # download_and_install_mm() # If needed # Start Micro-Manager core instance with Demo config @@ -13,20 +16,21 @@ executor = ExecutionEngine() - # Create Micro-Manager Devices camera = MicroManagerCamera() z_stage = MicroManagerSingleAxisStage() - # Capture 100 images on the camera -num_images = 100 +num_images = 5 data_handler = DataHandler(storage=NDRAMStorage()) start_capture_event = StartCapture(num_blocks=num_images, detector=camera) readout_images_event = ReadoutData(num_blocks=num_images, detector=camera, data_coordinates_iterator=[DataCoordinates(time=t) for t in range(num_images)], data_handler=data_handler) + +executor.subscribe_to_notifications(callback, NotificationCategory.Data) + executor.submit(start_capture_event) future = executor.submit(readout_images_event) @@ -35,8 +39,6 @@ data_handler.finish() - - executor.shutdown() terminate_core_instances() diff --git a/src/exengine/examples/using_devices.py b/src/exengine/examples/using_devices.py index 1a76323..5987563 100644 --- a/src/exengine/examples/using_devices.py +++ b/src/exengine/examples/using_devices.py @@ -1,4 +1,4 @@ -from mmpycorex import create_core_instance, download_and_install_mm, terminate_core_instances +from mmpycorex import create_core_instance, terminate_core_instances from exengine.kernel.executor import ExecutionEngine from exengine.backends.micromanager.mm_device_implementations import MicroManagerCamera, MicroManagerSingleAxisStage @@ -9,12 +9,10 @@ executor = ExecutionEngine() - # Create Micro-Manager Devices camera = MicroManagerCamera() z_stage = MicroManagerSingleAxisStage() - # By default, setting/getting attributes and calling methods occure on the main executor thread # This sets the property of the Micro-Manager camera object camera.Exposure = 100 @@ -26,4 +24,5 @@ print(image) -executor.shutdown() \ No newline at end of file +executor.shutdown() +terminate_core_instances() \ No newline at end of file diff --git a/src/exengine/integration_tests/test_events_and_notifications.py b/src/exengine/integration_tests/test_events_and_notifications.py index 9c17d6d..28438e8 100644 --- a/src/exengine/integration_tests/test_events_and_notifications.py +++ b/src/exengine/integration_tests/test_events_and_notifications.py @@ -5,9 +5,6 @@ from exengine import ExecutionEngine from exengine.kernel.ex_event_base import ExecutorEvent from exengine.kernel.notification_base import Notification, NotificationCategory -from dataclasses import dataclass -from typing import List -import time class TestNotification(Notification[str]): @@ -25,7 +22,7 @@ class YetAnotherTestNotification(Notification[float]): class NotificationEmittingEvent(ExecutorEvent): notification_types = [TestNotification, AnotherTestNotification, YetAnotherTestNotification] - def __init__(self, notifications_to_emit: List[Notification]): + def __init__(self, notifications_to_emit: list[Notification]): super().__init__() self.notifications_to_emit = notifications_to_emit diff --git a/src/exengine/integration_tests/work_in_progress/_test_multid_events_low_level.py b/src/exengine/integration_tests/work_in_progress/_test_multid_events_low_level.py index 4c8cc55..8d21d24 100644 --- a/src/exengine/integration_tests/work_in_progress/_test_multid_events_low_level.py +++ b/src/exengine/integration_tests/work_in_progress/_test_multid_events_low_level.py @@ -3,10 +3,8 @@ construction of low-level acquisition events """ -import os import numpy as np import pytest -from mmpycorex import create_core_instance, terminate_core_instances, get_default_install_location from exengine.backends.micromanager.mm_device_implementations import ( MicroManagerSingleAxisStage, MicroManagerDevice, MicroManagerXYStage, MicroManagerCamera ) diff --git a/src/exengine/kernel/data_handler.py b/src/exengine/kernel/data_handler.py index a3aee94..5b2681b 100644 --- a/src/exengine/kernel/data_handler.py +++ b/src/exengine/kernel/data_handler.py @@ -1,7 +1,7 @@ import threading import queue from typing import Any, Dict, Tuple, Callable, Union, Optional -import numpy as np +import numpy.typing as npt from pydantic.types import JsonValue from dataclasses import dataclass @@ -25,7 +25,7 @@ def peek(self): # make a dataclass to hold the data, metadata, future, and boolean flag for whether the data has been processed @dataclass class _DataMetadataFutureHolder: - data: np.ndarray + data: npt.NDArray["Any"] metadata: Dict future: Optional["ExecutionFuture"] processed: bool = False @@ -50,9 +50,9 @@ class DataHandler: # and may create another for processing data def __init__(self, storage: DataStorage, - process_function: Callable[[DataCoordinates, np.ndarray, JsonValue], - Optional[Union[DataCoordinates, np.ndarray, JsonValue, - Tuple[DataCoordinates, np.ndarray, JsonValue]]]] = None, + process_function: Callable[[DataCoordinates, npt.NDArray["Any"], JsonValue], + Optional[Union[DataCoordinates, npt.NDArray["Any"], JsonValue, + Tuple[DataCoordinates, npt.NDArray["Any"], JsonValue]]]] = None, _executor=None): # delayed import to avoid circular imports if _executor is None: @@ -176,7 +176,7 @@ def await_completion(self): self._storage_thread.join() def get(self, coordinates: DataCoordinates, return_data=True, return_metadata=True, processed=None, - ) -> Optional[Tuple[np.ndarray, JsonValue]]: + ) -> Optional[Tuple[npt.NDArray["Any"], JsonValue]]: """ Get an image and associated metadata. If they are present, either in the intake queue or the storage_backends queue (if it exists), return them. If not present, get them from the storage_backends object. If not present there, return None @@ -203,7 +203,7 @@ def get(self, coordinates: DataCoordinates, return_data=True, return_metadata=Tr return data, metadata - def put(self, coordinates: Any, image: np.ndarray, metadata: Dict, execution_future: Optional["ExecutionFuture"]): + def put(self, coordinates: Any, image: npt.NDArray["Any"], metadata: Dict, execution_future: Optional["ExecutionFuture"]): """ Hand off this image to the data handler. It will handle handoff to the storage_backends object and image processing if requested, as well as providing temporary access to the image and metadata as it passes through this diff --git a/src/exengine/kernel/data_storage_base.py b/src/exengine/kernel/data_storage_base.py index 1715a09..a8ccc4b 100644 --- a/src/exengine/kernel/data_storage_base.py +++ b/src/exengine/kernel/data_storage_base.py @@ -2,10 +2,10 @@ Protocol for storage_backends class that acquisitions ultimate write to where the acquisition data ultimately gets stored """ -from typing import Union, Dict +from typing import Union, Dict, Any from abc import ABC from .data_coords import DataCoordinates -import numpy as np +import numpy.typing as npt from pydantic.types import JsonValue class DataStorage(ABC): @@ -18,11 +18,11 @@ def __contains__(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[ """Check if item is in the container.""" ... - def __getitem__(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str]]]) -> np.ndarray: + def __getitem__(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str]]]) -> npt.NDArray[Any]: """ Read a single data corresponding to the given coordinates. Same as get_data() """ ... - def get_data(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str]]]) -> np.ndarray: + def get_data(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str]]]) -> npt.NDArray[Any]: """ Read a single data corresponding to the given coordinates """ @@ -37,7 +37,7 @@ def get_metadata(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[ # TODO: one alternative to saying you have to make the data immediately available is to have a callback # that is called when the data is available. This would allow for disk-backed storage_backends to write the data # to disk before calling the callback. - def put(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str]]], data: np.ndarray, + def put(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str]]], data: npt.NDArray[Any], metadata: JsonValue): """ Add data and corresponding metadata to the dataset. Once this method has been called, the data and metadata @@ -48,7 +48,7 @@ def put(self, data_coordinates: Union[DataCoordinates, Dict[str, Union[int, str] ---------- data_coordinates : DataCoordinates or dict Coordinates of the data - data : np.ndarray + data : npt.NDArray Data to be stored metadata : dict Metadata associated with the data @@ -79,14 +79,14 @@ def close(self): # pass # @abstractmethod -# def get_image_coordinates_list(self) -> List[Dict[str, Union[int, str]]]: +# def get_image_coordinates_list(self) -> list[Dict[str, Union[int, str]]]: # """ # Return a list of the coordinates (e.g. {'channel': 'DAPI', 'z': 0, 'time': 0}) of every image in the dataset # # Returns # ------- # list -# List of image coordinates +# list of image coordinates # """ # pass # @@ -117,7 +117,7 @@ def close(self): # # # @abstractmethod -# def as_array(self, axes: List[str] = None, stitched: bool = False, +# def as_array(self, axes: list[str] = None, stitched: bool = False, # **kwargs: Union[int, str]) -> 'dask.array': # """ # Create one big Dask array with last two axes as y, x and preceding axes depending on data. @@ -132,7 +132,7 @@ def close(self): # Parameters # ---------- # axes : list, optional -# List of axes names over which to iterate and merge into a stacked array. +# list of axes names over which to iterate and merge into a stacked array. # If None, all axes will be used in PTCZYX order (Default value = None). # stitched : bool, optional # If True and tiles were acquired in a grid, lay out adjacent tiles next to one another diff --git a/src/exengine/kernel/device.py b/src/exengine/kernel/device.py index c932e76..2f452da 100644 --- a/src/exengine/kernel/device.py +++ b/src/exengine/kernel/device.py @@ -3,10 +3,9 @@ """ from abc import ABCMeta, ABC from functools import wraps -from typing import Any, Dict, Callable, Sequence, Optional, List, Tuple, Iterable, Union +from typing import Any, Dict, Callable, Sequence, Optional, Tuple, Iterable, Union from weakref import WeakSet from dataclasses import dataclass -import types from .ex_event_base import ExecutorEvent from .executor import ExecutionEngine @@ -270,7 +269,7 @@ def __init__(self, name: str, no_executor: bool = False, no_executor_attrs: Sequ self._name = name - def get_allowed_property_values(self, property_name: str) -> Optional[List[str]]: + def get_allowed_property_values(self, property_name: str) -> Optional[list[str]]: return None # By default, any value is allowed def is_property_read_only(self, property_name: str) -> bool: diff --git a/src/exengine/kernel/ex_event_base.py b/src/exengine/kernel/ex_event_base.py index cb9a699..6e6347c 100644 --- a/src/exengine/kernel/ex_event_base.py +++ b/src/exengine/kernel/ex_event_base.py @@ -1,9 +1,8 @@ import warnings -from typing import Optional, Any,ClassVar, Type, List, Dict, Union, Iterable, Callable +from typing import Optional, Any, ClassVar, Type, Callable from abc import ABC, abstractmethod, ABCMeta import weakref import inspect -from functools import partial from .notification_base import Notification from .notification_base import EventExecutedNotification @@ -41,7 +40,7 @@ def __new__(mcs, name, bases, attrs): class ExecutorEvent(ABC, metaclass=_ExecutorEventMeta): # Base events just have an event executed event. Subclasses can also add their own lists # of notifications types, and the metaclass will merge them into one big list - notification_types: ClassVar[List[Type[Notification]]] = [EventExecutedNotification] + notification_types: ClassVar[list[Type[Notification]]] = [EventExecutedNotification] _thread_name: Optional[str] = None def __init__(self, *args, **kwargs): diff --git a/src/exengine/kernel/ex_event_capabilities.py b/src/exengine/kernel/ex_event_capabilities.py index 8be0437..b2babbf 100644 --- a/src/exengine/kernel/ex_event_capabilities.py +++ b/src/exengine/kernel/ex_event_capabilities.py @@ -1,16 +1,16 @@ """ Additional functionalities that can be added to ExecutorEvents """ -from dataclasses import dataclass, field -from typing import Dict, Union, Iterable +from typing import Dict, Union, Iterable, TYPE_CHECKING import itertools -import numpy as np -import warnings +import numpy.typing as npt from .data_coords import DataCoordinates, DataCoordinatesIterator from .data_handler import DataHandler +if TYPE_CHECKING: + from typing import Any class DataProducing: @@ -37,7 +37,7 @@ def image_generator(): self._data_handler = data_handler - def put_data(self, data_coordinates: DataCoordinates, image: np.ndarray, metadata: Dict): + def put_data(self, data_coordinates: DataCoordinates, image: npt.NDArray["Any"], metadata: Dict): """ Put data into the output queue """ diff --git a/src/exengine/kernel/ex_future.py b/src/exengine/kernel/ex_future.py index d50fa3b..bff351d 100644 --- a/src/exengine/kernel/ex_future.py +++ b/src/exengine/kernel/ex_future.py @@ -1,10 +1,8 @@ -from typing import Union, Optional, Any, Dict, Tuple, Sequence, Set, TypeVar, Type, Iterable +from typing import Union, Optional, Any, Dict, Tuple, Sequence, Set import threading import warnings from .data_coords import DataCoordinates, DataCoordinatesIterator from .notification_base import Notification -import numpy as np -from dataclasses import field from typing import TYPE_CHECKING diff --git a/src/exengine/kernel/executor.py b/src/exengine/kernel/executor.py index 7250286..26ae708 100644 --- a/src/exengine/kernel/executor.py +++ b/src/exengine/kernel/executor.py @@ -6,7 +6,7 @@ from typing import Deque import warnings import traceback -from typing import Union, Iterable, List, Callable, Any, Type +from typing import Union, Iterable, Callable, Type import queue import inspect @@ -14,13 +14,11 @@ from .ex_event_base import ExecutorEvent, AnonymousCallableEvent from .ex_future import ExecutionFuture -from .data_handler import DataHandler - _MAIN_THREAD_NAME = 'MainExecutorThread' _ANONYMOUS_THREAD_NAME = 'AnonymousExecutorThread' class MultipleExceptions(Exception): - def __init__(self, exceptions: List[Exception]): + def __init__(self, exceptions: list[Exception]): self.exceptions = exceptions messages = [f"{type(e).__name__}: {''.join(traceback.format_exception(type(e), e, e.__traceback__))}" for e in exceptions] super().__init__("Multiple exceptions occurred:\n" + "\n".join(messages)) @@ -41,8 +39,8 @@ def __init__(self): self._exceptions = queue.Queue() self._devices = {} self._notification_queue = queue.Queue() - self._notification_subscribers: List[Callable[[Notification], None]] = [] - self._notification_subscriber_filters: List[Union[NotificationCategory, Type]] = [] + self._notification_subscribers: list[Callable[[Notification], None]] = [] + self._notification_subscriber_filters: list[Union[NotificationCategory, Type]] = [] self._notification_lock = threading.Lock() self._notification_thread = None self._shutdown_event = threading.Event() diff --git a/src/exengine/kernel/notification_base.py b/src/exengine/kernel/notification_base.py index b47343a..9a724d2 100644 --- a/src/exengine/kernel/notification_base.py +++ b/src/exengine/kernel/notification_base.py @@ -32,17 +32,17 @@ class Notification(ABC, Generic[TNotificationPayload]): For example: - @dataclass - class DataAcquired(Notification[DataCoordinates]): - - # Define the category and description of the notification shared by all instances of this class - category = NotificationCategory.Data - description = "Data has been acquired by a camera or other data-producing device and is now available" - - # payload is the data coordinates of the acquired - - # Create an instance of the notification - notification = DataAcquired(payload=DataCoordinates(t=1, y=2, channel="DAPI")) + >>> @dataclass + >>> class DataAcquired(Notification[DataCoordinates]): + >>> + >>> # Define the category and description of the notification shared by all instances of this class + >>> category = NotificationCategory.Data + >>> description = "Data has been acquired by a camera or other data-producing device and is now available" + >>> + >>> # payload is the data coordinates of the acquired + >>> + >>> # Create an instance of the notification + >>> notification = DataAcquired(payload=DataCoordinates(t=1, y=2, channel="DAPI")) """ timestamp: datetime = field(default_factory=datetime.now, init=False) diff --git a/src/exengine/kernel/test/test_data_handler.py b/src/exengine/kernel/test/test_data_handler.py index 388df80..9988ff4 100644 --- a/src/exengine/kernel/test/test_data_handler.py +++ b/src/exengine/kernel/test/test_data_handler.py @@ -3,8 +3,9 @@ """ import time import pytest +import numpy.typing as npt import numpy as np -from typing import Dict +from typing import Dict, TYPE_CHECKING from unittest.mock import Mock from exengine.kernel.executor import ExecutionEngine @@ -12,17 +13,20 @@ from exengine.kernel.data_coords import DataCoordinates from exengine.kernel.data_storage_base import DataStorage +if TYPE_CHECKING: + from typing import Any + class MockDataStorage(DataStorage): def __init__(self): self.data = {} self.metadata = {} self.finished = False - def put(self, coords: DataCoordinates, image: np.ndarray, metadata: Dict): + def put(self, coords: DataCoordinates, image: npt.NDArray["Any"], metadata: Dict): self.data[coords] = image self.metadata[coords] = metadata - def get_data(self, coords: DataCoordinates) -> np.ndarray: + def get_data(self, coords: DataCoordinates) -> npt.NDArray["Any"]: return self.data.get(coords) def get_metadata(self, coords: DataCoordinates) -> Dict: diff --git a/src/exengine/notifications.py b/src/exengine/notifications.py index 8469328..d6f43fe 100644 --- a/src/exengine/notifications.py +++ b/src/exengine/notifications.py @@ -2,4 +2,10 @@ Convenience file for imports """ from .kernel.notification_base import NotificationCategory -from .kernel.notification_base import EventExecutedNotification, DataStoredNotification \ No newline at end of file +from .kernel.notification_base import EventExecutedNotification, DataStoredNotification + +__all__ = [ + "NotificationCategory", + "EventExecutedNotification", + "DataStoredNotification" +] \ No newline at end of file