From d845519baf2d9eda94206d65d8baa24fe0fe9ea7 Mon Sep 17 00:00:00 2001 From: Kenneth Yang <82800265+kjy5@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:30:02 -0700 Subject: [PATCH] 362 new scale mpm bindings (#374) * Read bindings, draft write, not real export yet * Put TODO * Version bump * Move manipulator and handle system getting stuck * WIP set depth * Better movement code * Fix some linting errors, update aquarium * Switch to local coordinate system * Merged depth binding and position * Update python module launch instructions * Version bump * MPM echoing like other platforms. Need to limit HTTP debug * Restrict debug log level to just ephys link * Switch step mode * Correct movement direction * fixed position update and target coordinates for depth * Progress with duplicated coordinate. * Revert "Progress with duplicated coordinate." This reverts commit 3eed59b3ccf27f49e4315b353c4b1051d498208e. * Dual axis movement does not work * MPM movement working * Apply poll limit to position * Auto format code --------- Co-authored-by: kjy5 --- README.md | 7 +- pyproject.toml | 1 + scripts/move_tester.py | 11 +- src/ephys_link/__about__.py | 2 +- src/ephys_link/__main__.py | 2 +- src/ephys_link/back_end/platform_handler.py | 61 +++-- src/ephys_link/back_end/server.py | 2 +- src/ephys_link/bindings/fake_bindings.py | 11 +- src/ephys_link/bindings/mpm_bindings.py | 278 ++++++++++++++++++++ src/ephys_link/bindings/ump_4_bindings.py | 44 ++-- src/ephys_link/util/base_bindings.py | 21 +- src/ephys_link/util/common.py | 20 +- src/ephys_link/util/console.py | 11 +- 13 files changed, 390 insertions(+), 81 deletions(-) create mode 100644 src/ephys_link/bindings/mpm_bindings.py diff --git a/README.md b/README.md index 8c2a905..6ef071c 100644 --- a/README.md +++ b/README.md @@ -80,13 +80,12 @@ window instead of `localhost`. pip install ephys-link ``` -Import the modules you need and launch the server. +Import main and run (this will launch the setup GUI). ```python -from ephys_link.server import Server +from ephys_link.__main__ import main -server = Server() -server.launch("sensapex", args.proxy_address, 8081) +main() ``` ## Install for Development diff --git a/pyproject.toml b/pyproject.toml index e54499d..b91cbf6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ exclude = ["/.github", "/.idea"] [tool.hatch.envs.default] python = "3.12" dependencies = [ + "mypy", "coverage[toml]>=6.5", "pytest", ] diff --git a/scripts/move_tester.py b/scripts/move_tester.py index 2c1ad9f..b595c40 100644 --- a/scripts/move_tester.py +++ b/scripts/move_tester.py @@ -1,15 +1,16 @@ from asyncio import run -from vbl_aquarium.models.ephys_link import SetPositionRequest +from vbl_aquarium.models.ephys_link import EphysLinkOptions, SetDepthRequest from vbl_aquarium.models.unity import Vector4 from ephys_link.back_end.platform_handler import PlatformHandler from ephys_link.util.console import Console c = Console(enable_debug=True) -p = PlatformHandler("ump-4", c) -target = Vector4() -# target = Vector4(x=10, y=10, z=10, w=10) +p = PlatformHandler(EphysLinkOptions(type="pathfinder-mpm"), c) +# target = Vector4() +target = Vector4(x=7.5, y=7.5, z=7.5, w=7.5) -print(run(p.set_position(SetPositionRequest(manipulator_id="6", position=target, speed=5))).to_json_string()) +# print(run(p.set_position(SetPositionRequest(manipulator_id="A", position=target, speed=5))).to_json_string()) +print(run(p.set_depth(SetDepthRequest(manipulator_id="A", depth=7.5, speed=0.15))).to_json_string()) print("Done!") diff --git a/src/ephys_link/__about__.py b/src/ephys_link/__about__.py index 0016201..8b3e3d8 100644 --- a/src/ephys_link/__about__.py +++ b/src/ephys_link/__about__.py @@ -1 +1 @@ -__version__ = "2.0.0b3" +__version__ = "2.0.0b5" diff --git a/src/ephys_link/__main__.py b/src/ephys_link/__main__.py index 7184a5e..ddb1284 100644 --- a/src/ephys_link/__main__.py +++ b/src/ephys_link/__main__.py @@ -31,7 +31,7 @@ def main() -> None: console = Console(enable_debug=options.debug) # 3. Instantiate the Platform Handler with the appropriate platform bindings. - platform_handler = PlatformHandler(options.type, console) + platform_handler = PlatformHandler(options, console) # 4. Instantiate the Emergency Stop service. diff --git a/src/ephys_link/back_end/platform_handler.py b/src/ephys_link/back_end/platform_handler.py index 1d21986..bbe51ee 100644 --- a/src/ephys_link/back_end/platform_handler.py +++ b/src/ephys_link/back_end/platform_handler.py @@ -12,6 +12,7 @@ from vbl_aquarium.models.ephys_link import ( AngularResponse, BooleanStateResponse, + EphysLinkOptions, GetManipulatorsResponse, PositionalResponse, SetDepthRequest, @@ -25,6 +26,7 @@ from ephys_link.__about__ import __version__ from ephys_link.bindings.fake_bindings import FakeBindings +from ephys_link.bindings.mpm_bindings import MPMBinding from ephys_link.bindings.ump_4_bindings import Ump4Bindings from ephys_link.util.base_bindings import BaseBindings from ephys_link.util.common import vector4_to_array @@ -34,21 +36,21 @@ class PlatformHandler: """Handler for platform commands.""" - def __init__(self, platform_type: str, console: Console) -> None: + def __init__(self, options: EphysLinkOptions, console: Console) -> None: """Initialize platform handler. - :param platform_type: Platform type to initialize bindings from. - :type platform_type: str + :param options: CLI options. + :type options: EphysLinkOptions """ - # Store the platform type. - self._platform_type = platform_type + # Store the CLI options. + self._options = options # Store the console. self._console = console # Define bindings based on platform type. - self._bindings = self._match_platform_type(platform_type) + self._bindings = self._match_platform_type(options) # Record which IDs are inside the brain. self._inside_brain: set[str] = set() @@ -56,21 +58,23 @@ def __init__(self, platform_type: str, console: Console) -> None: # Generate a Pinpoint ID for proxy usage. self._pinpoint_id = str(uuid4())[:8] - def _match_platform_type(self, platform_type: str) -> BaseBindings: + def _match_platform_type(self, options: EphysLinkOptions) -> BaseBindings: """Match the platform type to the appropriate bindings. - :param platform_type: Platform type. - :type platform_type: str + :param options: CLI options. + :type options: EphysLinkOptions :returns: Bindings for the specified platform type. :rtype: :class:`ephys_link.util.base_bindings.BaseBindings` """ - match platform_type: + match options.type: case "ump-4": return Ump4Bindings() + case "pathfinder-mpm": + return MPMBinding(options.mpm_port) case "fake": return FakeBindings() case _: - error_message = f'Platform type "{platform_type}" not recognized.' + error_message = f'Platform type "{options.type}" not recognized.' self._console.critical_print(error_message) raise ValueError(error_message) @@ -99,7 +103,7 @@ def get_platform_type(self) -> str: :returns: Platform type config identifier (see CLI options for examples). :rtype: str """ - return self._platform_type + return str(self._options.type) # Manipulator commands. @@ -111,7 +115,7 @@ async def get_manipulators(self) -> GetManipulatorsResponse: """ try: manipulators = await self._bindings.get_manipulators() - num_axes = await self._bindings.get_num_axes() + num_axes = await self._bindings.get_axes_count() dimensions = self._bindings.get_dimensions() except Exception as e: self._console.exception_error_print("Get Manipulators", e) @@ -198,16 +202,16 @@ async def set_position(self, request: SetPositionRequest) -> PositionalResponse: # Return error if movement did not reach target within tolerance. for index, axis in enumerate(vector4_to_array(final_unified_position - request.position)): - # End once index is greater than the number of axes. - if index >= await self._bindings.get_num_axes(): + # End once index is the number of axes. + if index == await self._bindings.get_axes_count(): break # Check if the axis is within the movement tolerance. - if abs(axis) > await self._bindings.get_movement_tolerance(): + if abs(axis) > self._bindings.get_movement_tolerance(): error_message = ( f"Manipulator {request.manipulator_id} did not reach target" f" position on axis {list(Vector4.model_fields.keys())[index]}." - f"Requested: {request.position}, got: {final_unified_position}." + f" Requested: {request.position}, got: {final_unified_position}." ) self._console.error_print("Set Position", error_message) return PositionalResponse(error=error_message) @@ -226,24 +230,27 @@ async def set_depth(self, request: SetDepthRequest) -> SetDepthResponse: :rtype: :class:`vbl_aquarium.models.ephys_link.DriveToDepthResponse` """ try: - # Create a position based on the new depth. - current_platform_position = await self._bindings.get_position(request.manipulator_id) - current_unified_position = self._bindings.platform_space_to_unified_space(current_platform_position) - target_unified_position = current_unified_position.model_copy(update={"w": request.depth}) - target_platform_position = self._bindings.unified_space_to_platform_space(target_unified_position) - # Move to the new depth. - final_platform_position = await self._bindings.set_position( + final_platform_depth = await self._bindings.set_depth( manipulator_id=request.manipulator_id, - position=target_platform_position, + depth=self._bindings.unified_space_to_platform_space(Vector4(w=request.depth)).w, speed=request.speed, ) - final_unified_position = self._bindings.platform_space_to_unified_space(final_platform_position) + final_unified_depth = self._bindings.platform_space_to_unified_space(Vector4(w=final_platform_depth)).w + + # Return error if movement did not reach target within tolerance. + if abs(final_unified_depth - request.depth) > self._bindings.get_movement_tolerance(): + error_message = ( + f"Manipulator {request.manipulator_id} did not reach target depth." + f" Requested: {request.depth}, got: {final_unified_depth}." + ) + self._console.error_print("Set Depth", error_message) + return SetDepthResponse(error=error_message) except Exception as e: self._console.exception_error_print("Set Depth", e) return SetDepthResponse(error=self._console.pretty_exception(e)) else: - return SetDepthResponse(depth=final_unified_position.w) + return SetDepthResponse(depth=final_unified_depth) async def set_inside_brain(self, request: SetInsideBrainRequest) -> BooleanStateResponse: """Mark a manipulator as inside the brain or not. diff --git a/src/ephys_link/back_end/server.py b/src/ephys_link/back_end/server.py index 584b0b9..9ead2e3 100644 --- a/src/ephys_link/back_end/server.py +++ b/src/ephys_link/back_end/server.py @@ -12,7 +12,7 @@ SetInsideBrainRequest, SetPositionRequest, ) -from vbl_aquarium.models.generic import VBLBaseModel +from vbl_aquarium.utils.vbl_base_model import VBLBaseModel from ephys_link.back_end.platform_handler import PlatformHandler from ephys_link.util.common import PORT, check_for_updates, server_preamble diff --git a/src/ephys_link/bindings/fake_bindings.py b/src/ephys_link/bindings/fake_bindings.py index 0187fd0..8a2a292 100644 --- a/src/ephys_link/bindings/fake_bindings.py +++ b/src/ephys_link/bindings/fake_bindings.py @@ -1,6 +1,7 @@ from vbl_aquarium.models.unity import Vector3, Vector4 from ephys_link.util.base_bindings import BaseBindings +from ephys_link.util.common import array_to_vector4 class FakeBindings(BaseBindings): @@ -22,11 +23,11 @@ def __init__(self) -> None: async def get_manipulators(self) -> list[str]: return list(map(str, range(8))) - async def get_num_axes(self) -> int: + async def get_axes_count(self) -> int: return 4 def get_dimensions(self) -> Vector4: - return Vector4(x=20, y=20, z=20, w=20) + return array_to_vector4([20] * 4) async def get_position(self, manipulator_id: str) -> Vector4: return self._positions[int(manipulator_id)] @@ -37,13 +38,17 @@ async def get_angles(self, manipulator_id: str) -> Vector3: async def get_shank_count(self, _: str) -> int: return 1 - async def get_movement_tolerance(self) -> float: + def get_movement_tolerance(self) -> float: return 0.001 async def set_position(self, manipulator_id: str, position: Vector4, _: float) -> Vector4: self._positions[int(manipulator_id)] = position return position + async def set_depth(self, manipulator_id: str, depth: float, _: float) -> float: + self._positions[int(manipulator_id)].w = depth + return depth + async def stop(self, _: str) -> None: pass diff --git a/src/ephys_link/bindings/mpm_bindings.py b/src/ephys_link/bindings/mpm_bindings.py new file mode 100644 index 0000000..31dd41e --- /dev/null +++ b/src/ephys_link/bindings/mpm_bindings.py @@ -0,0 +1,278 @@ +"""Bindings for New Scale Pathfinder MPM HTTP server platform. + +MPM works slightly differently than the other platforms since it operates in stereotactic coordinates. +This means exceptions need to be made for its API. + +Usage: Instantiate MPMBindings to interact with the New Scale Pathfinder MPM HTTP server platform. +""" + +from asyncio import get_running_loop, sleep +from json import dumps +from typing import Any + +from requests import JSONDecodeError, get, put +from vbl_aquarium.models.unity import Vector3, Vector4 + +from ephys_link.util.base_bindings import BaseBindings +from ephys_link.util.common import scalar_mm_to_um, vector4_to_array + + +class MPMBinding(BaseBindings): + """Bindings for New Scale Pathfinder MPM HTTP server platform.""" + + # Valid New Scale manipulator IDs + VALID_MANIPULATOR_IDS = ( + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "H", + "I", + "J", + "K", + "L", + "M", + "N", + "O", + "P", + "Q", + "R", + "S", + "T", + "U", + "V", + "W", + "X", + "Y", + "Z", + "AA", + "AB", + "AC", + "AD", + "AE", + "AF", + "AG", + "AH", + "AI", + "AJ", + "AK", + "AL", + "AM", + "AN", + ) + + # Movement polling preferences. + UNCHANGED_COUNTER_LIMIT = 10 + POLL_INTERVAL = 0.2 + + # Speed preferences (mm/s to use coarse mode). + COARSE_SPEED_THRESHOLD = 0.1 + INSERTION_SPEED_LIMIT = 9_000 + + def __init__(self, port: int) -> None: + """Initialize connection to MPM HTTP server. + + :param port: Port number for MPM HTTP server. + :type port: int + """ + self._url = f"http://localhost:{port}" + self._movement_stopped = False + + async def get_manipulators(self) -> list[str]: + return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]] + + async def get_axes_count(self) -> int: + return 3 + + def get_dimensions(self) -> Vector4: + return Vector4(x=15, y=15, z=15, w=15) + + async def get_position(self, manipulator_id: str) -> Vector4: + manipulator_data = await self._manipulator_data(manipulator_id) + stage_z = manipulator_data["Stage_Z"] + + await sleep(self.POLL_INTERVAL) # Wait for the stage to stabilize. + + return Vector4( + x=manipulator_data["Stage_X"], + y=manipulator_data["Stage_Y"], + z=stage_z, + w=stage_z, + ) + + async def get_angles(self, manipulator_id: str) -> Vector3: + manipulator_data = await self._manipulator_data(manipulator_id) + + # Apply PosteriorAngle to Polar to get the correct angle. + adjusted_polar = manipulator_data["Polar"] - (await self._query_data())["PosteriorAngle"] + + return Vector3( + x=adjusted_polar if adjusted_polar > 0 else 360 + adjusted_polar, + y=manipulator_data["Pitch"], + z=manipulator_data["ShankOrientation"], + ) + + async def get_shank_count(self, manipulator_id: str) -> int: + return int((await self._manipulator_data(manipulator_id))["ShankCount"]) + + def get_movement_tolerance(self) -> float: + return 0.01 + + async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: + # Keep track of the previous position to check if the manipulator stopped advancing. + current_position = await self.get_position(manipulator_id) + previous_position = current_position + unchanged_counter = 0 + + # Set step mode based on speed. + await self._put_request( + { + "PutId": "ProbeStepMode", + "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id), + "StepMode": 0 if speed > self.COARSE_SPEED_THRESHOLD else 1, + } + ) + + # Send move request. + await self._put_request( + { + "PutId": "ProbeMotion", + "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id), + "Absolute": 1, + "Stereotactic": 0, + "AxisMask": 7, + "X": position.x, + "Y": position.y, + "Z": position.z, + } + ) + + # Wait for the manipulator to reach the target position or be stopped or stuck. + while ( + not self._movement_stopped + and not self._is_vector_close(current_position, position) + and unchanged_counter < self.UNCHANGED_COUNTER_LIMIT + ): + # Wait for a short time before checking again. + await sleep(self.POLL_INTERVAL) + + # Update current position. + current_position = await self.get_position(manipulator_id) + + # Check if manipulator is not moving. + if self._is_vector_close(previous_position, current_position): + # Position did not change. + unchanged_counter += 1 + else: + # Position changed. + unchanged_counter = 0 + previous_position = current_position + + # Reset movement stopped flag. + self._movement_stopped = False + + # Return the final position. + return await self.get_position(manipulator_id) + + async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: + # Keep track of the previous depth to check if the manipulator stopped advancing unexpectedly. + current_depth = (await self.get_position(manipulator_id)).w + previous_depth = current_depth + unchanged_counter = 0 + + # Send move request. + # Convert mm/s to um/min and cap speed at the limit. + await self._put_request( + { + "PutId": "ProbeInsertion", + "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id), + "Distance": scalar_mm_to_um(current_depth - depth), + "Rate": min(scalar_mm_to_um(speed) * 60, self.INSERTION_SPEED_LIMIT), + } + ) + + # Wait for the manipulator to reach the target depth or be stopped or get stuck. + while not self._movement_stopped and not abs(current_depth - depth) <= self.get_movement_tolerance(): + # Wait for a short time before checking again. + await sleep(self.POLL_INTERVAL) + + # Get the current depth. + current_depth = (await self.get_position(manipulator_id)).w + + # Check if manipulator is not moving. + if abs(previous_depth - current_depth) <= self.get_movement_tolerance(): + # Depth did not change. + unchanged_counter += 1 + else: + # Depth changed. + unchanged_counter = 0 + previous_depth = current_depth + + # Reset movement stopped flag. + self._movement_stopped = False + + # Return the final depth. + return float((await self.get_position(manipulator_id)).w) + + async def stop(self, manipulator_id: str) -> None: + request = {"PutId": "ProbeStop", "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id)} + await self._put_request(request) + self._movement_stopped = True + + def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4: + # unified <- platform + # +x <- -x + # +y <- +z + # +z <- +y + # +w <- -w + + return Vector4( + x=self.get_dimensions().x - platform_space.x, + y=platform_space.z, + z=platform_space.y, + w=self.get_dimensions().w - platform_space.w, + ) + + def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4: + # platform <- unified + # +x <- -x + # +y <- +z + # +z <- +y + # +w <- -w + + return Vector4( + x=self.get_dimensions().x - unified_space.x, + y=unified_space.z, + z=unified_space.y, + w=self.get_dimensions().w - unified_space.w, + ) + + # Helper functions. + async def _query_data(self) -> Any: + try: + return (await get_running_loop().run_in_executor(None, get, self._url)).json() + except ConnectionError as connectionError: + error_message = f"Unable to connect to MPM HTTP server: {connectionError}" + raise RuntimeError(error_message) from connectionError + except JSONDecodeError as jsonDecodeError: + error_message = f"Unable to decode JSON response from MPM HTTP server: {jsonDecodeError}" + raise ValueError(error_message) from jsonDecodeError + + async def _manipulator_data(self, manipulator_id: str) -> Any: + probe_data = (await self._query_data())["ProbeArray"] + for probe in probe_data: + if probe["Id"] == manipulator_id: + return probe + + # If we get here, that means the manipulator doesn't exist. + error_message = f"Manipulator {manipulator_id} not found." + raise ValueError(error_message) + + async def _put_request(self, request: dict[str, Any]) -> None: + await get_running_loop().run_in_executor(None, put, self._url, dumps(request)) + + def _is_vector_close(self, target: Vector4, current: Vector4) -> bool: + return all(abs(axis) <= self.get_movement_tolerance() for axis in vector4_to_array(target - current)[:3]) diff --git a/src/ephys_link/bindings/ump_4_bindings.py b/src/ephys_link/bindings/ump_4_bindings.py index 6060529..62fd12c 100644 --- a/src/ephys_link/bindings/ump_4_bindings.py +++ b/src/ephys_link/bindings/ump_4_bindings.py @@ -9,7 +9,14 @@ from vbl_aquarium.models.unity import Vector3, Vector4 from ephys_link.util.base_bindings import BaseBindings -from ephys_link.util.common import RESOURCES_PATH, array_to_vector4, mm_to_um, mmps_to_umps, um_to_mm, vector4_to_array +from ephys_link.util.common import ( + RESOURCES_PATH, + array_to_vector4, + scalar_mm_to_um, + um_to_mm, + vector4_to_array, + vector_mm_to_um, +) class Ump4Bindings(BaseBindings): @@ -28,7 +35,7 @@ def __init__(self) -> None: async def get_manipulators(self) -> list[str]: return list(map(str, self._ump.list_devices())) - async def get_num_axes(self) -> int: + async def get_axes_count(self) -> int: return 4 def get_dimensions(self) -> Vector4: @@ -55,29 +62,17 @@ async def get_shank_count(self, _: str) -> int: error_message = "UMP-4 does not support getting shank count" raise AttributeError(error_message) - async def get_movement_tolerance(self) -> float: + def get_movement_tolerance(self) -> float: return 0.001 async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4: - """Set the position of the manipulator. - - Waits using Asyncio until the movement is finished. This assumes the application is running in an event loop. - - :param manipulator_id: Manipulator ID. - :type manipulator_id: str - :param position: Platform space position to set the manipulator to (mm). - :type position: Vector4 - :param speed: Speed to move the manipulator to the position (mm/s). - :type speed: float - :returns: Final position of the manipulator in platform space (mm). - :rtype: Vector4 - :raises RuntimeError: If the movement is interrupted. - """ # Convert position to micrometers. - target_position_um = mm_to_um(position) + target_position_um = vector_mm_to_um(position) # Request movement. - movement = self._get_device(manipulator_id).goto_pos(vector4_to_array(target_position_um), mmps_to_umps(speed)) + movement = self._get_device(manipulator_id).goto_pos( + vector4_to_array(target_position_um), scalar_mm_to_um(speed) + ) # Wait for movement to finish. await get_running_loop().run_in_executor(None, movement.finished_event.wait) @@ -89,6 +84,17 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa return um_to_mm(array_to_vector4(movement.last_pos)) + async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: + # Augment current position with depth. + current_position = await self.get_position(manipulator_id) + new_platform_position = current_position.model_copy(update={"w": depth}) + + # Make the movement. + final_platform_position = await self.set_position(manipulator_id, new_platform_position, speed) + + # Return the final depth. + return float(final_platform_position.w) + async def stop(self, manipulator_id: str) -> None: self._get_device(manipulator_id).stop() diff --git a/src/ephys_link/util/base_bindings.py b/src/ephys_link/util/base_bindings.py index bfa3354..54c32c7 100644 --- a/src/ephys_link/util/base_bindings.py +++ b/src/ephys_link/util/base_bindings.py @@ -25,7 +25,7 @@ async def get_manipulators(self) -> list[str]: """ @abstractmethod - async def get_num_axes(self) -> int: + async def get_axes_count(self) -> int: """Get the number of axes for the current platform. :returns: Number of axes. @@ -76,7 +76,7 @@ async def get_shank_count(self, manipulator_id: str) -> int: """ @abstractmethod - async def get_movement_tolerance(self) -> float: + def get_movement_tolerance(self) -> float: """Get the tolerance for how close the final position must be to the target position in a movement (mm). :returns: Movement tolerance (mm). @@ -88,7 +88,6 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa """Set the position of a manipulator. This will directly set the position in the original platform space. - Unified space coordinates will need to be converted to platform space. For 3-axis manipulators, the first 3 values of the position will be used. :param manipulator_id: Manipulator ID. @@ -101,6 +100,22 @@ async def set_position(self, manipulator_id: str, position: Vector4, speed: floa :rtype: Vector4 """ + @abstractmethod + async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float: + """Set the depth of a manipulator. + + This will directly set the depth stage in the original platform space. + + :param manipulator_id: Manipulator ID. + :type manipulator_id: str + :param depth: Depth to set the manipulator to (mm). + :type depth: float + :param speed: Speed to move the manipulator to the depth (mm/s). + :type speed: float + :returns: Final depth of the manipulator in platform space (mm). + :rtype: float + """ + @abstractmethod async def stop(self, manipulator_id: str) -> None: """Stop a manipulator.""" diff --git a/src/ephys_link/util/common.py b/src/ephys_link/util/common.py index d523d1a..d9c7c2f 100644 --- a/src/ephys_link/util/common.py +++ b/src/ephys_link/util/common.py @@ -53,23 +53,23 @@ def check_for_updates() -> None: # Unit conversions -def mmps_to_umps(mmps: float) -> float: - """Convert millimeters per second to micrometers per second. +def scalar_mm_to_um(mm: float) -> float: + """Convert scalar values of millimeters to micrometers. - :param mmps: Speed in millimeters per second. - :type mmps: float - :returns: Speed in micrometers per second. + :param mm: Scalar value in millimeters. + :type mm: float + :returns: Scalar value in micrometers. :rtype: float """ - return mmps * 1_000 + return mm * 1_000 -def mm_to_um(mm: Vector4) -> Vector4: - """Convert millimeters to micrometers. +def vector_mm_to_um(mm: Vector4) -> Vector4: + """Convert vector values of millimeters to micrometers. - :param mm: Length in millimeters. + :param mm: Vector in millimeters. :type mm: Vector4 - :returns: Length in micrometers. + :returns: Vector in micrometers. :rtype: Vector4 """ return mm * 1_000 diff --git a/src/ephys_link/util/console.py b/src/ephys_link/util/console.py index b259c48..5cdffe0 100644 --- a/src/ephys_link/util/console.py +++ b/src/ephys_link/util/console.py @@ -27,7 +27,7 @@ def __init__(self, *, enable_debug: bool) -> None: basicConfig( format="%(message)s", datefmt="[%I:%M:%S %p]", - handlers=[RichHandler(rich_tracebacks=True)], + handlers=[RichHandler(rich_tracebacks=True, markup=True)], ) self._log = getLogger("rich") self._log.setLevel(DEBUG if enable_debug else INFO) @@ -71,7 +71,7 @@ def critical_print(self, msg: str) -> None: :param msg: Critical message to print. :type msg: str """ - self._log.critical(f"[b i red]{msg}", extra={"markup": True}) + self._log.critical(f"[b i red]{msg}") @staticmethod def pretty_exception(exception: Exception) -> str: @@ -92,9 +92,7 @@ def exception_error_print(self, label: str, exception: Exception) -> None: :param exception: Exception to print. :type exception: Exception """ - self._log.exception( - f"[b magenta]{label}:[/] [magenta]{Console.pretty_exception(exception)}", extra={"markup": True} - ) + self._log.exception(f"[b magenta]{label}:[/] [magenta]{Console.pretty_exception(exception)}") # Helper methods. def _repeatable_log(self, log_type: int, label: str, message: str) -> None: @@ -124,10 +122,9 @@ def _repeatable_log(self, log_type: int, label: str, message: str) -> None: self._log.log( self._last_message[0], f"{self._last_message[1]}:[/] {self._last_message[2]}[/] x {self._repeat_counter}", - extra={"markup": True}, ) self._repeat_counter = 0 # Log new message. - self._log.log(log_type, f"{label}:[/] {message}", extra={"markup": True}) + self._log.log(log_type, f"{label}:[/] {message}") self._last_message = message_set