From def9db9aedae80b2641fa5e6e2b645a49e04abc0 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Sun, 26 May 2024 23:48:31 +0200 Subject: [PATCH 01/12] splitting out functionality into separate modules --- CHANGELOG.md | 6 + looptrace_loci_vis/__init__.py | 2 +- .../_parse_old_style_no_header.py | 117 +++++++++ looptrace_loci_vis/_types.py | 17 ++ looptrace_loci_vis/point_record.py | 119 +++++++++ looptrace_loci_vis/reader.py | 235 +----------------- 6 files changed, 264 insertions(+), 232 deletions(-) create mode 100644 looptrace_loci_vis/_parse_old_style_no_header.py create mode 100644 looptrace_loci_vis/_types.py create mode 100644 looptrace_loci_vis/point_record.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a9cfc6..e386c3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Changed +* This project now uses `pandas` and parses a table-like file (CSV) _with_ header, to support upstream changes in data generation by `looptrace`. +* Splitting old functionality out into separate modules + ## [v0.1.0] - 2024-04-20 ### Added diff --git a/looptrace_loci_vis/__init__.py b/looptrace_loci_vis/__init__.py index f16f268..d995f84 100644 --- a/looptrace_loci_vis/__init__.py +++ b/looptrace_loci_vis/__init__.py @@ -1,3 +1,3 @@ """Napari plugin for visualising locus-specific points from looptrace""" -__version__ = "0.1.0" +__version__ = "0.2dev" diff --git a/looptrace_loci_vis/_parse_old_style_no_header.py b/looptrace_loci_vis/_parse_old_style_no_header.py new file mode 100644 index 0000000..f05b139 --- /dev/null +++ b/looptrace_loci_vis/_parse_old_style_no_header.py @@ -0,0 +1,117 @@ +from numpydoc_decorator import doc + +from gertils.geometry import ImagePoint3D +from gertils.types import TimepointFrom0 as Timepoint +from gertils.types import TraceIdFrom0 as TraceId + +from .point_record import PointRecord, expand_along_z +from ._types import LayerParams, QCFailReasons + +CsvRow = list[str] + + +@doc( + summary="Parse records from points which passed QC.", + parameters=dict(rows="Records to parse"), + returns=""" + A pair in which the first element is the array-like of points coordinates, + and the second element is the mapping from attribute name to list of values (1 per point). + """, + notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", +) +def parse_passed( # noqa: D103 + rows: list[CsvRow], +) -> tuple[list["PointRecord"], list[bool], LayerParams]: + records = [parse_simple_record(r, exp_num_fields=5) for r in rows] + max_z = max(r.get_z_coordinate() for r in records) + points: list["PointRecord"] = [] + center_flags: list[bool] = [] + for rec in records: + new_points, new_flags = expand_along_z(rec, z_max=max_z) + points.extend(new_points) + center_flags.extend(new_flags) + sizes = [1.5 if is_center else 1.0 for is_center in center_flags] + return points, center_flags, {"size": sizes} + + +@doc( + summary="Parse records from points which failed QC.", + parameters=dict(rows="Records to parse"), + returns=""" + A pair in which the first element is the array-like of points coordinates, + and the second element is the mapping from attribute name to list of values (1 per point). + """, + notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", +) +def parse_failed( # noqa: D103 + rows: list[CsvRow], +) -> tuple[list["PointRecord"], list[bool], LayerParams]: + record_qc_pairs: list[tuple[PointRecord, QCFailReasons]] = [] + for row in rows: + try: + qc = row[InputFileColumn.QC.get] + rec = parse_simple_record(row, exp_num_fields=6) + except IndexError: + logging.exception("Bad row: %s", row) + raise + record_qc_pairs.append((rec, qc)) + max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) + points: list["PointRecord"] = [] + center_flags: list[bool] = [] + codes: list[QCFailReasons] = [] + for rec, qc in record_qc_pairs: + new_points, new_flags = expand_along_z(rec, z_max=max_z) + points.extend(new_points) + center_flags.extend(new_flags) + codes.extend([qc] * len(new_points)) + params = { + "size": 0, # Make the point invisible and just use text. + "text": { + "string": "{failCodes}", + "color": DEEP_SKY_BLUE, + }, + "properties": {"failCodes": codes}, + } + return points, center_flags, params + + +@doc( + summary="Parse single-point from a single record (e.g., row from a CSV file).", + parameters=dict( + r="Record (e.g. CSV row) to parse", + exp_num_fields=("The expected number of data fields (e.g., columns) in the record"), + ), + returns=""" + A pair of values in which the first element represents a locus spot's trace ID and timepoint, + and the second element represents the (z, y, x) coordinates of the centroid of the spot fit. + """, +) +def parse_simple_record(r: CsvRow, *, exp_num_fields: int) -> "PointRecord": + """Parse a single line from an input CSV file.""" + if not isinstance(r, list): + raise TypeError(f"Record to parse must be list, not {type(r).__name__}") + if len(r) != exp_num_fields: + raise ValueError(f"Expected record of length {exp_num_fields} but got {len(r)}: {r}") + trace = TraceId(int(r[InputFileColumn.TRACE.get])) + timepoint = Timepoint(int(r[InputFileColumn.TIMEPOINT.get])) + z = float(r[InputFileColumn.Z.get]) + y = float(r[InputFileColumn.Y.get]) + x = float(r[InputFileColumn.X.get]) + point = ImagePoint3D(z=z, y=y, x=x) + return PointRecord(trace_id=trace, timepoint=timepoint, point=point) + + +class InputFileColumn(Enum): + """Indices of the different columns to parse as particular fields""" + + TRACE = 0 + TIMEPOINT = 1 + Z = 2 + Y = 3 + X = 4 + QC = 5 + + @property + def get(self) -> int: + """Alias for the value of this enum member""" + return self.value diff --git a/looptrace_loci_vis/_types.py b/looptrace_loci_vis/_types.py new file mode 100644 index 0000000..91376c0 --- /dev/null +++ b/looptrace_loci_vis/_types.py @@ -0,0 +1,17 @@ +"""Type aliases used broadly""" + +from collections.abc import Callable +from pathlib import Path +from typing import Literal, Union + +from gertils.geometry import ZCoordinate +from gertils.types import PixelArray + +FlatPointRecord = list[Union[float, ZCoordinate]] +LayerParams = dict[str, object] +ImageLayer = tuple[PixelArray, LayerParams, Literal["image"]] +PointsLayer = tuple[list[FlatPointRecord], LayerParams, Literal["points"]] +PathLike = str | Path +PathOrPaths = PathLike | list[PathLike] +QCFailReasons = str +Reader = Callable[[PathLike], list[ImageLayer | PointsLayer]] diff --git a/looptrace_loci_vis/point_record.py b/looptrace_loci_vis/point_record.py new file mode 100644 index 0000000..0b5c0d0 --- /dev/null +++ b/looptrace_loci_vis/point_record.py @@ -0,0 +1,119 @@ +"""A single point's record in a file on disk.""" + +import dataclasses +from math import floor +from typing import Union + +import numpy as np +from numpydoc_decorator import doc + +from gertils.geometry import ImagePoint3D, LocatableXY, LocatableZ, ZCoordinate +from gertils.types import TimepointFrom0 as Timepoint +from gertils.types import TraceIdFrom0 as TraceId + +from ._types import FlatPointRecord + + +@doc( + summary="", + parameters=dict( + trace_id="ID of the trace with which the locus spot is associated", + timepoint="Imaging timepoint in from which the point is coming", + point="Coordinates of the centroid of the Gaussian fit to the spot image pixel data", + ), +) +@dataclasses.dataclass(frozen=True, kw_only=True) +class PointRecord(LocatableXY, LocatableZ): # noqa: D101 + trace_id: TraceId + timepoint: Timepoint + point: ImagePoint3D + + def __post_init__(self) -> None: + bads: dict[str, object] = {} + if not isinstance(self.trace_id, TraceId): + bads["trace ID"] = self.trace_id # type: ignore[unreachable] + if not isinstance(self.timepoint, Timepoint): + bads["timepoint"] = self.timepoint # type: ignore[unreachable] + if not isinstance(self.point, ImagePoint3D): + bads["point"] = self.point # type: ignore[unreachable] + if bads: + messages = "; ".join(f"Bad type ({type(v).__name__}) for {k}" for k, v in bads.items()) + raise TypeError(f"Cannot create point record: {messages}") + + @doc(summary="Flatten") + def flatten(self) -> FlatPointRecord: + """Create a simple list of components, as a row of layer data.""" + return [ + self.trace_id.get, + self.timepoint.get, + self.get_z_coordinate(), + self.get_y_coordinate(), + self.get_x_coordinate(), + ] + + def get_x_coordinate(self) -> float: # noqa: D102 + return self.point.x + + def get_y_coordinate(self) -> float: # noqa: D102 + return self.point.y + + def get_z_coordinate(self) -> ZCoordinate: # noqa: D102 + return self.point.z + + @doc(summary="Round point position to nearest z-slice") + def with_truncated_z(self) -> "PointRecord": # noqa: D102 + new_z: int = floor(self.get_z_coordinate()) + result: PointRecord = self.with_new_z(new_z) + return result + + @doc( + summary="Replace this instance's point with a copy with updated z.", + parameters=dict(z="New z-coordinate value"), + ) + def with_new_z(self, z: int) -> "PointRecord": # noqa: D102 + pt = ImagePoint3D(x=self.point.x, y=self.point.y, z=z) + return dataclasses.replace(self, point=pt) + + +@doc( + summary="Create ancillary points from main point", + parameters=dict( + r="The record to expand along z-axis", + z_max="The maximum z-coordinate", + ), + returns=""" + List of layer data rows to represent the original point along + entire length of z-axis, paired with flag for each row + indicating whether it's true center or not + """, +) +def expand_along_z( # noqa: D103 + r: PointRecord, *, z_max: Union[float, np.float64] +) -> tuple[list[PointRecord], list[bool]]: + if not isinstance(z_max, int | float | np.float64): + raise TypeError(f"Bad type for z_max: {type(z_max).__name__}") + + r = r.with_truncated_z() + z_center = int(r.get_z_coordinate()) + z_max = int(floor(z_max)) + if not isinstance(z_center, int) or not isinstance(z_max, int): + raise TypeError( + f"Z center and Z max must be int; got {type(z_center).__name__} and" + f" {type(z_max).__name__}" + ) + + # Check that max z and center z make sense together. + if z_max < z_center: + raise ValueError(f"Max z must be at least as great as central z ({z_center})") + + # Build the records and flags of where the center in z really is. + predecessors = [(r.with_new_z(i), False) for i in range(z_center)] + successors = [(r.with_new_z(i), False) for i in range(z_center + 1, z_max + 1)] + points, params = zip(*[*predecessors, (r, True), *successors], strict=False) + + # Each record should give rise to a total of 1 + z_max records, since numbering from 0. + if len(points) != 1 + z_max: + raise RuntimeError( + f"Number of points generated from single spot center isn't as expected! Point={r}, z_max={z_max}, len(points)={len(points)}" + ) + return points, params # type: ignore[return-value] diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index acd9199..dce7bc8 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -1,36 +1,21 @@ """Reading locus-specific spots and points data from looptrace for visualisation in napari""" import csv -import dataclasses import logging import os from collections.abc import Callable from enum import Enum -from math import floor from pathlib import Path -from typing import Literal, Optional, Union +from typing import Optional -import numpy as np -from gertils.geometry import ImagePoint3D, LocatableXY, LocatableZ, ZCoordinate from gertils.pathtools import find_multiple_paths_by_fov, get_fov_sort_key -from gertils.types import FieldOfViewFrom1, PixelArray -from gertils.types import TimepointFrom0 as Timepoint -from gertils.types import TraceIdFrom0 as TraceId +from gertils.types import FieldOfViewFrom1 from gertils.zarr_tools import read_zarr from numpydoc_decorator import doc # type: ignore[import-untyped] -__author__ = "Vince Reuter" -__credits__ = ["Vince Reuter"] +from ._parse_old_style_no_header import parse_failed, parse_passed +from ._types import ImageLayer, PathLike, PathOrPaths, PointsLayer, Reader -CsvRow = list[str] -FlatPointRecord = list[Union[float, ZCoordinate]] -LayerParams = dict[str, object] -ImageLayer = tuple[PixelArray, LayerParams, Literal["image"]] -PointsLayer = tuple[list[FlatPointRecord], LayerParams, Literal["points"]] -QCFailReasons = str -PathLike = str | Path -PathOrPaths = PathLike | list[PathLike] -Reader = Callable[[PathLike], list[ImageLayer | PointsLayer]] # See: https://davidmathlogic.com/colorblind/ DEEP_SKY_BLUE = "#0C7BDC" @@ -169,215 +154,3 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: params = {**static_params, **base_meta, **extra_meta, **shape_meta} return [pt_rec.flatten() for pt_rec in point_records], params, "points" - - -@doc( - summary="Parse records from points which passed QC.", - parameters=dict(rows="Records to parse"), - returns=""" - A pair in which the first element is the array-like of points coordinates, - and the second element is the mapping from attribute name to list of values (1 per point). - """, - notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", -) -def parse_passed( # noqa: D103 - rows: list[CsvRow], -) -> tuple[list["PointRecord"], list[bool], LayerParams]: - records = [parse_simple_record(r, exp_num_fields=5) for r in rows] - max_z = max(r.get_z_coordinate() for r in records) - points: list["PointRecord"] = [] - center_flags: list[bool] = [] - for rec in records: - new_points, new_flags = expand_along_z(rec, z_max=max_z) - points.extend(new_points) - center_flags.extend(new_flags) - sizes = [1.5 if is_center else 1.0 for is_center in center_flags] - return points, center_flags, {"size": sizes} - - -@doc( - summary="Parse records from points which failed QC.", - parameters=dict(rows="Records to parse"), - returns=""" - A pair in which the first element is the array-like of points coordinates, - and the second element is the mapping from attribute name to list of values (1 per point). - """, - notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", -) -def parse_failed( # noqa: D103 - rows: list[CsvRow], -) -> tuple[list["PointRecord"], list[bool], LayerParams]: - record_qc_pairs: list[tuple[PointRecord, QCFailReasons]] = [] - for row in rows: - try: - qc = row[InputFileColumn.QC.get] - rec = parse_simple_record(row, exp_num_fields=6) - except IndexError: - logging.exception("Bad row: %s", row) - raise - record_qc_pairs.append((rec, qc)) - max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) - points: list["PointRecord"] = [] - center_flags: list[bool] = [] - codes: list[QCFailReasons] = [] - for rec, qc in record_qc_pairs: - new_points, new_flags = expand_along_z(rec, z_max=max_z) - points.extend(new_points) - center_flags.extend(new_flags) - codes.extend([qc] * len(new_points)) - params = { - "size": 0, # Make the point invisible and just use text. - "text": { - "string": "{failCodes}", - "color": DEEP_SKY_BLUE, - }, - "properties": {"failCodes": codes}, - } - return points, center_flags, params - - -@doc( - summary="Parse single-point from a single record (e.g., row from a CSV file).", - parameters=dict( - r="Record (e.g. CSV row) to parse", - exp_num_fields=("The expected number of data fields (e.g., columns) in the record"), - ), - returns=""" - A pair of values in which the first element represents a locus spot's trace ID and timepoint, - and the second element represents the (z, y, x) coordinates of the centroid of the spot fit. - """, -) -def parse_simple_record(r: CsvRow, *, exp_num_fields: int) -> "PointRecord": - """Parse a single line from an input CSV file.""" - if not isinstance(r, list): - raise TypeError(f"Record to parse must be list, not {type(r).__name__}") - if len(r) != exp_num_fields: - raise ValueError(f"Expected record of length {exp_num_fields} but got {len(r)}: {r}") - trace = TraceId(int(r[InputFileColumn.TRACE.get])) - timepoint = Timepoint(int(r[InputFileColumn.TIMEPOINT.get])) - z = float(r[InputFileColumn.Z.get]) - y = float(r[InputFileColumn.Y.get]) - x = float(r[InputFileColumn.X.get]) - point = ImagePoint3D(z=z, y=y, x=x) - return PointRecord(trace_id=trace, timepoint=timepoint, point=point) - - -@doc( - summary="", - parameters=dict( - trace_id="ID of the trace with which the locus spot is associated", - timepoint="Imaging timepoint in from which the point is coming", - point="Coordinates of the centroid of the Gaussian fit to the spot image pixel data", - ), -) -@dataclasses.dataclass(frozen=True, kw_only=True) -class PointRecord(LocatableXY, LocatableZ): # noqa: D101 - trace_id: TraceId - timepoint: Timepoint - point: ImagePoint3D - - def __post_init__(self) -> None: - bads: dict[str, object] = {} - if not isinstance(self.trace_id, TraceId): - bads["trace ID"] = self.trace_id # type: ignore[unreachable] - if not isinstance(self.timepoint, Timepoint): - bads["timepoint"] = self.timepoint # type: ignore[unreachable] - if not isinstance(self.point, ImagePoint3D): - bads["point"] = self.point # type: ignore[unreachable] - if bads: - messages = "; ".join(f"Bad type ({type(v).__name__}) for {k}" for k, v in bads.items()) - raise TypeError(f"Cannot create point record: {messages}") - - @doc(summary="Flatten") - def flatten(self) -> FlatPointRecord: - """Create a simple list of components, as a row of layer data.""" - return [ - self.trace_id.get, - self.timepoint.get, - self.get_z_coordinate(), - self.get_y_coordinate(), - self.get_x_coordinate(), - ] - - def get_x_coordinate(self) -> float: # noqa: D102 - return self.point.x - - def get_y_coordinate(self) -> float: # noqa: D102 - return self.point.y - - def get_z_coordinate(self) -> ZCoordinate: # noqa: D102 - return self.point.z - - @doc(summary="Round point position to nearest z-slice") - def with_truncated_z(self) -> "PointRecord": # noqa: D102 - new_z: int = floor(self.get_z_coordinate()) - result: PointRecord = self.with_new_z(new_z) - return result - - @doc( - summary="Replace this instance's point with a copy with updated z.", - parameters=dict(z="New z-coordinate value"), - ) - def with_new_z(self, z: int) -> "PointRecord": # noqa: D102 - pt = ImagePoint3D(x=self.point.x, y=self.point.y, z=z) - return dataclasses.replace(self, point=pt) - - -@doc( - summary="Create ancillary points from main point", - parameters=dict( - r="The record to expand along z-axis", - z_max="The maximum z-coordinate", - ), - returns=""" - List of layer data rows to represent the original point along - entire length of z-axis, paired with flag for each row - indicating whether it's true center or not - """, -) -def expand_along_z( # noqa: D103 - r: PointRecord, *, z_max: Union[float, np.float64] -) -> tuple[list[PointRecord], list[bool]]: - if not isinstance(z_max, int | float | np.float64): - raise TypeError(f"Bad type for z_max: {type(z_max).__name__}") - - r = r.with_truncated_z() - z_center = int(r.get_z_coordinate()) - z_max = int(floor(z_max)) - if not isinstance(z_center, int) or not isinstance(z_max, int): - raise TypeError( - f"Z center and Z max must be int; got {type(z_center).__name__} and" - f" {type(z_max).__name__}" - ) - - # Check that max z and center z make sense together. - if z_max < z_center: - raise ValueError(f"Max z must be at least as great as central z ({z_center})") - - # Build the records and flags of where the center in z really is. - predecessors = [(r.with_new_z(i), False) for i in range(z_center)] - successors = [(r.with_new_z(i), False) for i in range(z_center + 1, z_max + 1)] - points, params = zip(*[*predecessors, (r, True), *successors], strict=False) - - # Each record should give rise to a total of 1 + z_max records, since numbering from 0. - if len(points) != 1 + z_max: - raise RuntimeError( - f"Number of points generated from single spot center isn't as expected! Point={r}, z_max={z_max}, len(points)={len(points)}" - ) - return points, params # type: ignore[return-value] - - -class InputFileColumn(Enum): - """Indices of the different columns to parse as particular fields""" - - TRACE = 0 - TIMEPOINT = 1 - Z = 2 - Y = 3 - X = 4 - QC = 5 - - @property - def get(self) -> int: - """Alias for the value of this enum member""" - return self.value From fafb8561067dd703f47b7e0f644364d09080b524 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Mon, 27 May 2024 08:58:26 +0200 Subject: [PATCH 02/12] better organisation of parts and separation of concerns, for reusability; better filename --- looptrace_loci_vis/_const.py | 5 ++ .../_parse_new_style_with_header.py | 34 ++++++++++++ ....py => _parse_old_style_without_header.py} | 49 ++++------------- looptrace_loci_vis/_types.py | 1 + looptrace_loci_vis/reader.py | 55 +++++++++++++++---- 5 files changed, 96 insertions(+), 48 deletions(-) create mode 100644 looptrace_loci_vis/_const.py create mode 100644 looptrace_loci_vis/_parse_new_style_with_header.py rename looptrace_loci_vis/{_parse_old_style_no_header.py => _parse_old_style_without_header.py} (65%) diff --git a/looptrace_loci_vis/_const.py b/looptrace_loci_vis/_const.py new file mode 100644 index 0000000..9b4dddf --- /dev/null +++ b/looptrace_loci_vis/_const.py @@ -0,0 +1,5 @@ +"""Plugin-wide constants""" + +# See: https://davidmathlogic.com/colorblind/ +DEEP_SKY_BLUE = "#0C7BDC" +GOLDENROD = "#FFC20A" diff --git a/looptrace_loci_vis/_parse_new_style_with_header.py b/looptrace_loci_vis/_parse_new_style_with_header.py new file mode 100644 index 0000000..e739b14 --- /dev/null +++ b/looptrace_loci_vis/_parse_new_style_with_header.py @@ -0,0 +1,34 @@ +"""Definitions related to parsing table-like file with header""" + +import logging +from typing import Protocol, runtime_checkable + +import pandas as pd + +from gertils.geometry import ImagePoint3D +from gertils.types import TimepointFrom0 as Timepoint +from gertils.types import TraceIdFrom0 as TraceId + +from .point_record import PointRecord +from ._types import LayerParams, PathLike + + +@runtime_checkable +class MappingLike(Protocol): + def __getitem__(k: str) -> object: ... + + +def parse_passed(points_file: PathLike) -> tuple[list[PointRecord], list[bool], LayerParams]: + logging.debug("Reading as QC-pass: %s", points_file) + points_table: pd.DataFrame = pd.read_csv(points_file) + return [parse_simple_record(row) for _, row in points_table.iterrows()] + + +def parse_simple_record(r: MappingLike) -> PointRecord: + trace = TraceId(int(r["traceId"])) + timepoint = Timepoint(int(r["timeIndex"])) + z = float(r["z"]) + y = float(r["y"]) + x = float(r["x"]) + point = ImagePoint3D(z=z, y=y, x=x) + return PointRecord(trace_id=trace, timepoint=timepoint, point=point) diff --git a/looptrace_loci_vis/_parse_old_style_no_header.py b/looptrace_loci_vis/_parse_old_style_without_header.py similarity index 65% rename from looptrace_loci_vis/_parse_old_style_no_header.py rename to looptrace_loci_vis/_parse_old_style_without_header.py index f05b139..29693fc 100644 --- a/looptrace_loci_vis/_parse_old_style_no_header.py +++ b/looptrace_loci_vis/_parse_old_style_without_header.py @@ -1,13 +1,16 @@ +"""Definitions related to parsing simple table-like file with no header""" + +from enum import Enum +import logging + from numpydoc_decorator import doc from gertils.geometry import ImagePoint3D from gertils.types import TimepointFrom0 as Timepoint from gertils.types import TraceIdFrom0 as TraceId -from .point_record import PointRecord, expand_along_z -from ._types import LayerParams, QCFailReasons - -CsvRow = list[str] +from .point_record import PointRecord +from ._types import CsvRow, QCFailReasons @doc( @@ -19,19 +22,8 @@ """, notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", ) -def parse_passed( # noqa: D103 - rows: list[CsvRow], -) -> tuple[list["PointRecord"], list[bool], LayerParams]: - records = [parse_simple_record(r, exp_num_fields=5) for r in rows] - max_z = max(r.get_z_coordinate() for r in records) - points: list["PointRecord"] = [] - center_flags: list[bool] = [] - for rec in records: - new_points, new_flags = expand_along_z(rec, z_max=max_z) - points.extend(new_points) - center_flags.extend(new_flags) - sizes = [1.5 if is_center else 1.0 for is_center in center_flags] - return points, center_flags, {"size": sizes} +def parse_passed_records(rows: list[CsvRow]) -> list[PointRecord]: # noqa: D103 + return [parse_simple_record(r, exp_num_fields=5) for r in rows] @doc( @@ -43,9 +35,7 @@ def parse_passed( # noqa: D103 """, notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", ) -def parse_failed( # noqa: D103 - rows: list[CsvRow], -) -> tuple[list["PointRecord"], list[bool], LayerParams]: +def parse_failed(rows: list[CsvRow]) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D103 record_qc_pairs: list[tuple[PointRecord, QCFailReasons]] = [] for row in rows: try: @@ -55,24 +45,7 @@ def parse_failed( # noqa: D103 logging.exception("Bad row: %s", row) raise record_qc_pairs.append((rec, qc)) - max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) - points: list["PointRecord"] = [] - center_flags: list[bool] = [] - codes: list[QCFailReasons] = [] - for rec, qc in record_qc_pairs: - new_points, new_flags = expand_along_z(rec, z_max=max_z) - points.extend(new_points) - center_flags.extend(new_flags) - codes.extend([qc] * len(new_points)) - params = { - "size": 0, # Make the point invisible and just use text. - "text": { - "string": "{failCodes}", - "color": DEEP_SKY_BLUE, - }, - "properties": {"failCodes": codes}, - } - return points, center_flags, params + return record_qc_pairs @doc( diff --git a/looptrace_loci_vis/_types.py b/looptrace_loci_vis/_types.py index 91376c0..d6213ce 100644 --- a/looptrace_loci_vis/_types.py +++ b/looptrace_loci_vis/_types.py @@ -7,6 +7,7 @@ from gertils.geometry import ZCoordinate from gertils.types import PixelArray +CsvRow = list[str] FlatPointRecord = list[Union[float, ZCoordinate]] LayerParams = dict[str, object] ImageLayer = tuple[PixelArray, LayerParams, Literal["image"]] diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index dce7bc8..efb26c9 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -1,9 +1,9 @@ """Reading locus-specific spots and points data from looptrace for visualisation in napari""" +from collections.abc import Callable import csv import logging import os -from collections.abc import Callable from enum import Enum from pathlib import Path from typing import Optional @@ -13,13 +13,10 @@ from gertils.zarr_tools import read_zarr from numpydoc_decorator import doc # type: ignore[import-untyped] -from ._parse_old_style_no_header import parse_failed, parse_passed -from ._types import ImageLayer, PathLike, PathOrPaths, PointsLayer, Reader - - -# See: https://davidmathlogic.com/colorblind/ -DEEP_SKY_BLUE = "#0C7BDC" -GOLDENROD = "#FFC20A" +from ._const import DEEP_SKY_BLUE, GOLDENROD +from ._parse_old_style_without_header import parse_failed_records, parse_passed_records +from .point_record import PointRecord, expand_along_z +from ._types import CsvRow, ImageLayer, LayerParams, PathLike, PathOrPaths, PointsLayer, QCFailReasons, Reader class QCStatus(Enum): @@ -127,14 +124,19 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: # Determine how to read and display the points layer to be parsed. qc = QCStatus.from_csv_path(path) + read_rows: Callable[[CsvRow], tuple[list[PointRecord], list[bool], LayerParams]] if qc == QCStatus.PASS: logging.debug("Will parse sas QC-pass: %s", path) color = GOLDENROD - read_rows = parse_passed + def read_rows(rows): + records = parse_passed_records(rows) + return records_to_qcpass_layer_data(records) elif qc == QCStatus.FAIL: logging.debug("Will parse as QC-fail: %s", path) color = DEEP_SKY_BLUE - read_rows = parse_failed + def read_rows(rows): + record_qc_pairs = parse_failed_records(rows) + return records_to_qcfail_layer_data(record_qc_pairs) else: do_not_parse(path=path, why="Could not infer QC status", level=logging.ERROR) raise ValueError( @@ -154,3 +156,36 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: params = {**static_params, **base_meta, **extra_meta, **shape_meta} return [pt_rec.flatten() for pt_rec in point_records], params, "points" + + +def records_to_qcpass_layer_data(records: list[PointRecord]) -> tuple[list[PointRecord], list[bool], LayerParams]: + max_z = max(r.get_z_coordinate() for r in records) + points: list[PointRecord] = [] + center_flags: list[bool] = [] + for rec in records: + new_points, new_flags = expand_along_z(rec, z_max=max_z) + points.extend(new_points) + center_flags.extend(new_flags) + sizes = [1.5 if is_center else 1.0 for is_center in center_flags] + return points, center_flags, {"size": sizes} + + +def records_to_qcfail_layer_data(record_qc_pairs: list[tuple[PointRecord, QCFailReasons]]) -> tuple[list[PointRecord], list[bool], LayerParams]: + max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) + points: list["PointRecord"] = [] + center_flags: list[bool] = [] + codes: list[QCFailReasons] = [] + for rec, qc in record_qc_pairs: + new_points, new_flags = expand_along_z(rec, z_max=max_z) + points.extend(new_points) + center_flags.extend(new_flags) + codes.extend([qc] * len(new_points)) + params = { + "size": 0, # Make the point invisible and just use text. + "text": { + "string": "{failCodes}", + "color": DEEP_SKY_BLUE, + }, + "properties": {"failCodes": codes}, + } + return points, center_flags, params From 14149ab1e46aa4396613affee9db6bf2dba9e866 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Mon, 27 May 2024 09:01:21 +0200 Subject: [PATCH 03/12] better function names --- looptrace_loci_vis/_parse_new_style_with_header.py | 2 +- looptrace_loci_vis/_parse_old_style_without_header.py | 4 ++-- looptrace_loci_vis/reader.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/looptrace_loci_vis/_parse_new_style_with_header.py b/looptrace_loci_vis/_parse_new_style_with_header.py index e739b14..1a55794 100644 --- a/looptrace_loci_vis/_parse_new_style_with_header.py +++ b/looptrace_loci_vis/_parse_new_style_with_header.py @@ -18,7 +18,7 @@ class MappingLike(Protocol): def __getitem__(k: str) -> object: ... -def parse_passed(points_file: PathLike) -> tuple[list[PointRecord], list[bool], LayerParams]: +def parse_passed_file(points_file: PathLike) -> tuple[list[PointRecord], list[bool], LayerParams]: logging.debug("Reading as QC-pass: %s", points_file) points_table: pd.DataFrame = pd.read_csv(points_file) return [parse_simple_record(row) for _, row in points_table.iterrows()] diff --git a/looptrace_loci_vis/_parse_old_style_without_header.py b/looptrace_loci_vis/_parse_old_style_without_header.py index 29693fc..84b2ad7 100644 --- a/looptrace_loci_vis/_parse_old_style_without_header.py +++ b/looptrace_loci_vis/_parse_old_style_without_header.py @@ -22,7 +22,7 @@ """, notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", ) -def parse_passed_records(rows: list[CsvRow]) -> list[PointRecord]: # noqa: D103 +def parse_passed_rows(rows: list[CsvRow]) -> list[PointRecord]: # noqa: D103 return [parse_simple_record(r, exp_num_fields=5) for r in rows] @@ -35,7 +35,7 @@ def parse_passed_records(rows: list[CsvRow]) -> list[PointRecord]: # noqa: D103 """, notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", ) -def parse_failed(rows: list[CsvRow]) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D103 +def parse_failed_rows(rows: list[CsvRow]) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D103 record_qc_pairs: list[tuple[PointRecord, QCFailReasons]] = [] for row in rows: try: diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index efb26c9..b2a41bd 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -14,7 +14,7 @@ from numpydoc_decorator import doc # type: ignore[import-untyped] from ._const import DEEP_SKY_BLUE, GOLDENROD -from ._parse_old_style_without_header import parse_failed_records, parse_passed_records +from ._parse_old_style_without_header import parse_failed_rows, parse_passed_rows from .point_record import PointRecord, expand_along_z from ._types import CsvRow, ImageLayer, LayerParams, PathLike, PathOrPaths, PointsLayer, QCFailReasons, Reader @@ -129,13 +129,13 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: logging.debug("Will parse sas QC-pass: %s", path) color = GOLDENROD def read_rows(rows): - records = parse_passed_records(rows) + records = parse_passed_rows(rows) return records_to_qcpass_layer_data(records) elif qc == QCStatus.FAIL: logging.debug("Will parse as QC-fail: %s", path) color = DEEP_SKY_BLUE def read_rows(rows): - record_qc_pairs = parse_failed_records(rows) + record_qc_pairs = parse_failed_rows(rows) return records_to_qcfail_layer_data(record_qc_pairs) else: do_not_parse(path=path, why="Could not infer QC status", level=logging.ERROR) From 9177e5f9cd0bf6657e49720470e04999e947dcd0 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Mon, 27 May 2024 16:35:55 +0200 Subject: [PATCH 04/12] better abstractions and implementation sharing for the points parsers --- looptrace_loci_vis/_const.py | 9 +- .../_parse_new_style_with_header.py | 34 ----- .../_parse_old_style_without_header.py | 90 -------------- looptrace_loci_vis/points_parser.py | 117 ++++++++++++++++++ looptrace_loci_vis/reader.py | 92 ++++++++------ 5 files changed, 175 insertions(+), 167 deletions(-) delete mode 100644 looptrace_loci_vis/_parse_new_style_with_header.py delete mode 100644 looptrace_loci_vis/_parse_old_style_without_header.py create mode 100644 looptrace_loci_vis/points_parser.py diff --git a/looptrace_loci_vis/_const.py b/looptrace_loci_vis/_const.py index 9b4dddf..4fa82c1 100644 --- a/looptrace_loci_vis/_const.py +++ b/looptrace_loci_vis/_const.py @@ -1,5 +1,8 @@ """Plugin-wide constants""" -# See: https://davidmathlogic.com/colorblind/ -DEEP_SKY_BLUE = "#0C7BDC" -GOLDENROD = "#FFC20A" +from enum import Enum + +class PointColor(Enum): + # See: https://davidmathlogic.com/colorblind/ + DEEP_SKY_BLUE = "#0C7BDC" + GOLDENROD = "#FFC20A" diff --git a/looptrace_loci_vis/_parse_new_style_with_header.py b/looptrace_loci_vis/_parse_new_style_with_header.py deleted file mode 100644 index 1a55794..0000000 --- a/looptrace_loci_vis/_parse_new_style_with_header.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Definitions related to parsing table-like file with header""" - -import logging -from typing import Protocol, runtime_checkable - -import pandas as pd - -from gertils.geometry import ImagePoint3D -from gertils.types import TimepointFrom0 as Timepoint -from gertils.types import TraceIdFrom0 as TraceId - -from .point_record import PointRecord -from ._types import LayerParams, PathLike - - -@runtime_checkable -class MappingLike(Protocol): - def __getitem__(k: str) -> object: ... - - -def parse_passed_file(points_file: PathLike) -> tuple[list[PointRecord], list[bool], LayerParams]: - logging.debug("Reading as QC-pass: %s", points_file) - points_table: pd.DataFrame = pd.read_csv(points_file) - return [parse_simple_record(row) for _, row in points_table.iterrows()] - - -def parse_simple_record(r: MappingLike) -> PointRecord: - trace = TraceId(int(r["traceId"])) - timepoint = Timepoint(int(r["timeIndex"])) - z = float(r["z"]) - y = float(r["y"]) - x = float(r["x"]) - point = ImagePoint3D(z=z, y=y, x=x) - return PointRecord(trace_id=trace, timepoint=timepoint, point=point) diff --git a/looptrace_loci_vis/_parse_old_style_without_header.py b/looptrace_loci_vis/_parse_old_style_without_header.py deleted file mode 100644 index 84b2ad7..0000000 --- a/looptrace_loci_vis/_parse_old_style_without_header.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Definitions related to parsing simple table-like file with no header""" - -from enum import Enum -import logging - -from numpydoc_decorator import doc - -from gertils.geometry import ImagePoint3D -from gertils.types import TimepointFrom0 as Timepoint -from gertils.types import TraceIdFrom0 as TraceId - -from .point_record import PointRecord -from ._types import CsvRow, QCFailReasons - - -@doc( - summary="Parse records from points which passed QC.", - parameters=dict(rows="Records to parse"), - returns=""" - A pair in which the first element is the array-like of points coordinates, - and the second element is the mapping from attribute name to list of values (1 per point). - """, - notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", -) -def parse_passed_rows(rows: list[CsvRow]) -> list[PointRecord]: # noqa: D103 - return [parse_simple_record(r, exp_num_fields=5) for r in rows] - - -@doc( - summary="Parse records from points which failed QC.", - parameters=dict(rows="Records to parse"), - returns=""" - A pair in which the first element is the array-like of points coordinates, - and the second element is the mapping from attribute name to list of values (1 per point). - """, - notes="https://napari.org/stable/plugins/guides.html#layer-data-tuples", -) -def parse_failed_rows(rows: list[CsvRow]) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D103 - record_qc_pairs: list[tuple[PointRecord, QCFailReasons]] = [] - for row in rows: - try: - qc = row[InputFileColumn.QC.get] - rec = parse_simple_record(row, exp_num_fields=6) - except IndexError: - logging.exception("Bad row: %s", row) - raise - record_qc_pairs.append((rec, qc)) - return record_qc_pairs - - -@doc( - summary="Parse single-point from a single record (e.g., row from a CSV file).", - parameters=dict( - r="Record (e.g. CSV row) to parse", - exp_num_fields=("The expected number of data fields (e.g., columns) in the record"), - ), - returns=""" - A pair of values in which the first element represents a locus spot's trace ID and timepoint, - and the second element represents the (z, y, x) coordinates of the centroid of the spot fit. - """, -) -def parse_simple_record(r: CsvRow, *, exp_num_fields: int) -> "PointRecord": - """Parse a single line from an input CSV file.""" - if not isinstance(r, list): - raise TypeError(f"Record to parse must be list, not {type(r).__name__}") - if len(r) != exp_num_fields: - raise ValueError(f"Expected record of length {exp_num_fields} but got {len(r)}: {r}") - trace = TraceId(int(r[InputFileColumn.TRACE.get])) - timepoint = Timepoint(int(r[InputFileColumn.TIMEPOINT.get])) - z = float(r[InputFileColumn.Z.get]) - y = float(r[InputFileColumn.Y.get]) - x = float(r[InputFileColumn.X.get]) - point = ImagePoint3D(z=z, y=y, x=x) - return PointRecord(trace_id=trace, timepoint=timepoint, point=point) - - -class InputFileColumn(Enum): - """Indices of the different columns to parse as particular fields""" - - TRACE = 0 - TIMEPOINT = 1 - Z = 2 - Y = 3 - X = 4 - QC = 5 - - @property - def get(self) -> int: - """Alias for the value of this enum member""" - return self.value diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py new file mode 100644 index 0000000..2407ec6 --- /dev/null +++ b/looptrace_loci_vis/points_parser.py @@ -0,0 +1,117 @@ +"""Abstractions related to points parsing""" + +from collections.abc import Iterable, Sized +import csv +from enum import Enum +from typing import Generic, Protocol, TypeVar + +import pandas as pd + +from gertils.geometry import ImagePoint3D +from gertils.types import TimepointFrom0 as Timepoint +from gertils.types import TraceIdFrom0 as TraceId + +from .point_record import PointRecord +from ._types import CsvRow, PathLike, QCFailReasons + +Input = TypeVar("Input", contravariant=True) +I1 = TypeVar("I1") +I2 = TypeVar("I2", bound=Sized) + + +class MappingLike(Protocol): + def __getitem__(key: str) -> object: ... + + +class PointsParser(Protocol, Generic[Input]): + + def parse_all_qcpass(self, data: Input) -> list[PointRecord]: ... + + def parse_all_qcfail(self, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... + + +class IterativePointsParser(Generic[I1, I2], PointsParser[I1]): + + def _gen_records(self, data: I1) -> Iterable[I2]: ... + + def _parse_single_qcpass_record(self, record: I2) -> PointRecord: ... + + def _parse_single_qcfail_record(self, record: I2) -> tuple[PointRecord, QCFailReasons]: ... + + def parse_all_qcpass(self, data: I1) -> list[PointRecord]: + return [self._parse_single_qcpass_record(r) for r in self._gen_records(data)] + + def parse_all_qcfail(self, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: + return [self._parse_single_qcfail_record(r) for r in self._gen_records(data)] + + +class HeadedTraceTimePointParser(IterativePointsParser[PathLike, MappingLike]): + + TIME_INDEX_COLUMN = "timeIndex" + + def _gen_records(self, data: PathLike) -> pd.DataFrame: + return pd.read_csv(data) + + def _parse_single_qcpass_record(self, record: MappingLike) -> PointRecord: + trace = TraceId(int(record["traceId"])) + timepoint = Timepoint(int(record[self.TIME_INDEX_COLUMN])) + z = float(record["z"]) + y = float(record["y"]) + x = float(record["x"]) + point = ImagePoint3D(z=z, y=y, x=x) + return PointRecord(trace_id=trace, timepoint=timepoint, point=point) + + def _parse_single_qcfail_record(self, record: MappingLike) -> tuple[PointRecord, QCFailReasons]: + """A fail record parses the same as a pass one, just with one additional field for QC fail reasons.""" + pt_rec = self._parse_single_qcpass_record(record) + fail_code = record["failCode"] + return pt_rec, fail_code + + +class HeadlessTraceTimePointParser(IterativePointsParser[PathLike, CsvRow]): + """Parser for input file with no header, and field for trace ID and timepoint in addition to coordinates""" + + class InputFileColumn(Enum): + """Indices of the different columns to parse as particular fields""" + TRACE = 0 + TIMEPOINT = 1 + Z = 2 + Y = 3 + X = 4 + QC = 5 + + @property + def get(self) -> int: + """Alias for the value of this enum member""" + return self.value + + def __init__(self) -> None: + super().__init__() + self._number_of_columns = sum(1 for _ in self.InputFileColumn) + + def _parse_single_record(self, r: CsvRow, *, exp_len: int) -> PointRecord: + if not isinstance(r, list): + raise TypeError(f"Record to parse must be list, not {type(r).__name__}") + if len(r) != exp_len: + raise ValueError(f"Expected record of length {exp_len} but got {len(r)}: {r}") + trace = TraceId(int(r[self.InputFileColumn.TRACE.get])) + timepoint = Timepoint(int(r[self.InputFileColumn.TIMEPOINT.get])) + z = float(r[self.InputFileColumn.Z.get]) + y = float(r[self.InputFileColumn.Y.get]) + x = float(r[self.InputFileColumn.X.get]) + point = ImagePoint3D(z=z, y=y, x=x) + return PointRecord(trace_id=trace, timepoint=timepoint, point=point) + + def _gen_records(self, data: PathLike) -> Iterable[CsvRow]: + with open(data, newline="") as fh: # noqa: PTH123 + rows = list(csv.reader(fh)) + return rows + + def _parse_single_qcpass_record(self, record: CsvRow) -> PointRecord: + return self._parse_single_record(record, exp_len=self._number_of_columns - 1) + + def _parse_single_qcfail_record(self, record: CsvRow) -> tuple[PointRecord, QCFailReasons]: + pt_rec = self._parse_single_record(record, exp_len=self._number_of_columns) + fail_code = record[self.InputFileColumn.QC.get] + return pt_rec, fail_code + diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index b2a41bd..f2f81a4 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -1,7 +1,6 @@ """Reading locus-specific spots and points data from looptrace for visualisation in napari""" from collections.abc import Callable -import csv import logging import os from enum import Enum @@ -13,10 +12,10 @@ from gertils.zarr_tools import read_zarr from numpydoc_decorator import doc # type: ignore[import-untyped] -from ._const import DEEP_SKY_BLUE, GOLDENROD -from ._parse_old_style_without_header import parse_failed_rows, parse_passed_rows +from .points_parser import HeadedTraceTimePointParser, HeadlessTraceTimePointParser, PointsParser from .point_record import PointRecord, expand_along_z -from ._types import CsvRow, ImageLayer, LayerParams, PathLike, PathOrPaths, PointsLayer, QCFailReasons, Reader +from ._const import PointColor +from ._types import ImageLayer, LayerParams, PathLike, PathOrPaths, PointsLayer, QCFailReasons, Reader class QCStatus(Enum): @@ -43,16 +42,6 @@ def filename_extension(self) -> str: # noqa: D102 return f".qc{self.value}.csv" -def do_not_parse(*, path: PathLike, why: str, level: int = logging.DEBUG) -> None: - """Log a message about why a path can't be parsed.""" - logging.log( - level, - "%s, cannot be read as looptrace locus-specific points: %s", - why, - path, - ) - - @doc( summary="Read and display locus-specific spots from looptrace.", parameters=dict(path="Path from which to parse layers"), @@ -63,19 +52,19 @@ def get_reader(path: PathOrPaths) -> Optional[Reader]: # noqa: D103 if not isinstance(path, str | Path): return None if not os.path.isdir(path): # noqa: PTH112 - do_not_parse(path=path, why="Not a folder/directory") + _do_not_parse(path=path, why="Not a folder/directory") return None path_by_fov: dict[FieldOfViewFrom1, list[Path]] = find_multiple_paths_by_fov( path, extensions=(".zarr", *(qc.filename_extension for qc in QCStatus)) ) if len(path_by_fov) != 1: - do_not_parse( + _do_not_parse( path=path, why=f"Not exactly 1 FOV found, but rather {len(path_by_fov)}, found" ) return None fov, files = next(iter(path_by_fov.items())) if len(files) != 3: # noqa: PLR2004 - do_not_parse( + _do_not_parse( path=path, why=f"Not exactly 3 files, but rather {len(files)}, found for {fov}" ) return None @@ -88,7 +77,7 @@ def get_reader(path: PathOrPaths) -> Optional[Reader]: # noqa: D103 fail_path = path_by_status.pop(QCStatus.FAIL) pass_path = path_by_status.pop(QCStatus.PASS) except KeyError: - do_not_parse(path=path, why="Could not find 1 each of QC status (pass/fail)") + _do_not_parse(path=path, why="Could not find 1 each of QC status (pass/fail)") return None if len(path_by_status) != 0: raise RuntimeError(f"Extra QC status/path pairs! {path_by_status}") @@ -102,7 +91,7 @@ def get_reader(path: PathOrPaths) -> Optional[Reader]: # noqa: D103 potential_zarr.suffix != ".zarr" or get_fov_sort_key(potential_zarr, extension=".zarr") != fov ): - do_not_parse(path=path, why=f"Could not find ZARR for FOV {fov}") + _do_not_parse(path=path, why=f"Could not find ZARR for FOV {fov}") return None def parse(_): # type: ignore[no-untyped-def] # noqa: ANN202 ANN001 @@ -122,38 +111,45 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: "n_dimensional": False, } - # Determine how to read and display the points layer to be parsed. qc = QCStatus.from_csv_path(path) - read_rows: Callable[[CsvRow], tuple[list[PointRecord], list[bool], LayerParams]] + + # Determine how to read and display the points layer to be parsed. + # First, determine the parsing strategy based on file header. + parser: PointsParser[PathLike] + read_file: Callable[[PathLike], list[PointRecord]] + process_records: Callable[[list[PointRecord]], tuple[list[PointRecord], list[bool], LayerParams]] + if _has_header(path): + logging.debug("Will parse has having header: %s", path) + parser = HeadedTraceTimePointParser + else: + logging.debug("Will parse as headless: %s", path) + parser = HeadlessTraceTimePointParser + # Then, determine the functions to used based on inferred QC status. if qc == QCStatus.PASS: logging.debug("Will parse sas QC-pass: %s", path) - color = GOLDENROD - def read_rows(rows): - records = parse_passed_rows(rows) - return records_to_qcpass_layer_data(records) + color = PointColor.GOLDENROD + read_file = parser.parse_all_qcpass + process_records = records_to_qcpass_layer_data elif qc == QCStatus.FAIL: logging.debug("Will parse as QC-fail: %s", path) - color = DEEP_SKY_BLUE - def read_rows(rows): - record_qc_pairs = parse_failed_rows(rows) - return records_to_qcfail_layer_data(record_qc_pairs) + color = PointColor.DEEP_SKY_BLUE + read_file = parser.parse_all_qcfail + process_records = records_to_qcfail_layer_data else: - do_not_parse(path=path, why="Could not infer QC status", level=logging.ERROR) + _do_not_parse(path=path, why="Could not infer QC status", level=logging.ERROR) raise ValueError( f"Despite undertaking parse, file from which QC status could not be parsed was encountered: {path}" ) + + # Use the information gleaned from filename and from file header to determine point color and to read data. + color_meta = {"edge_color": color.value, "face_color": color.value} + base_point_records = read_file(path) + point_records, center_flags, extra_meta = process_records(base_point_records) - base_meta = {"edge_color": color, "face_color": color} - - with open(path, newline="") as fh: # noqa: PTH123 - rows = list(csv.reader(fh)) - point_records, center_flags, extra_meta = read_rows(rows) if not point_records: logging.warning("No data rows parsed!") - shape_meta = { - "symbol": ["*" if is_center else "o" for is_center in center_flags], - } - params = {**static_params, **base_meta, **extra_meta, **shape_meta} + shape_meta = {"symbol": ["*" if is_center else "o" for is_center in center_flags]} + params = {**static_params, **color_meta, **extra_meta, **shape_meta} return [pt_rec.flatten() for pt_rec in point_records], params, "points" @@ -184,8 +180,24 @@ def records_to_qcfail_layer_data(record_qc_pairs: list[tuple[PointRecord, QCFail "size": 0, # Make the point invisible and just use text. "text": { "string": "{failCodes}", - "color": DEEP_SKY_BLUE, + "color": PointColor.DEEP_SKY_BLUE.value, }, "properties": {"failCodes": codes}, } return points, center_flags, params + + +def _do_not_parse(*, path: PathLike, why: str, level: int = logging.DEBUG) -> None: + """Log a message about why a path can't be parsed.""" + logging.log( + level, + "%s, cannot be read as looptrace locus-specific points: %s", + why, + path, + ) + + +def _has_header(path: PathLike) -> bool: + with open(path, "r") as fh: # noqa: PTH123 + header = fh.readline() + return HeadedTraceTimePointParser.TIME_INDEX_COLUMN in header From 77ee95b6fa17b508f00b3b212967addecacd0388 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Mon, 27 May 2024 17:03:06 +0200 Subject: [PATCH 05/12] first-pass at full support for looptrace v0.5.0 --- looptrace_loci_vis/points_parser.py | 73 +++++++++++++++++------------ tests/test_locus_points_smoke.py | 18 +++---- 2 files changed, 52 insertions(+), 39 deletions(-) diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py index 2407ec6..4538729 100644 --- a/looptrace_loci_vis/points_parser.py +++ b/looptrace_loci_vis/points_parser.py @@ -25,45 +25,55 @@ def __getitem__(key: str) -> object: ... class PointsParser(Protocol, Generic[Input]): - def parse_all_qcpass(self, data: Input) -> list[PointRecord]: ... + @classmethod + def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... - def parse_all_qcfail(self, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... + @classmethod + def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... class IterativePointsParser(Generic[I1, I2], PointsParser[I1]): - def _gen_records(self, data: I1) -> Iterable[I2]: ... + @classmethod + def _gen_records(cls, data: I1) -> Iterable[I2]: ... - def _parse_single_qcpass_record(self, record: I2) -> PointRecord: ... + @classmethod + def _parse_single_qcpass_record(cls, record: I2) -> PointRecord: ... - def _parse_single_qcfail_record(self, record: I2) -> tuple[PointRecord, QCFailReasons]: ... + @classmethod + def _parse_single_qcfail_record(cls, record: I2) -> tuple[PointRecord, QCFailReasons]: ... - def parse_all_qcpass(self, data: I1) -> list[PointRecord]: - return [self._parse_single_qcpass_record(r) for r in self._gen_records(data)] + @classmethod + def parse_all_qcpass(cls, data: I1) -> list[PointRecord]: + return [cls._parse_single_qcpass_record(r) for r in cls._gen_records(data)] - def parse_all_qcfail(self, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: - return [self._parse_single_qcfail_record(r) for r in self._gen_records(data)] + @classmethod + def parse_all_qcfail(cls, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: + return [cls._parse_single_qcfail_record(r) for r in cls._gen_records(data)] class HeadedTraceTimePointParser(IterativePointsParser[PathLike, MappingLike]): TIME_INDEX_COLUMN = "timeIndex" - def _gen_records(self, data: PathLike) -> pd.DataFrame: + @classmethod + def _gen_records(cls, data: PathLike) -> pd.DataFrame: return pd.read_csv(data) - def _parse_single_qcpass_record(self, record: MappingLike) -> PointRecord: + @classmethod + def _parse_single_qcpass_record(cls, record: MappingLike) -> PointRecord: trace = TraceId(int(record["traceId"])) - timepoint = Timepoint(int(record[self.TIME_INDEX_COLUMN])) + timepoint = Timepoint(int(record[cls.TIME_INDEX_COLUMN])) z = float(record["z"]) y = float(record["y"]) x = float(record["x"]) point = ImagePoint3D(z=z, y=y, x=x) return PointRecord(trace_id=trace, timepoint=timepoint, point=point) - def _parse_single_qcfail_record(self, record: MappingLike) -> tuple[PointRecord, QCFailReasons]: + @classmethod + def _parse_single_qcfail_record(cls, record: MappingLike) -> tuple[PointRecord, QCFailReasons]: """A fail record parses the same as a pass one, just with one additional field for QC fail reasons.""" - pt_rec = self._parse_single_qcpass_record(record) + pt_rec = cls._parse_single_qcpass_record(record) fail_code = record["failCode"] return pt_rec, fail_code @@ -85,33 +95,34 @@ def get(self) -> int: """Alias for the value of this enum member""" return self.value - def __init__(self) -> None: - super().__init__() - self._number_of_columns = sum(1 for _ in self.InputFileColumn) + _number_of_columns = sum(1 for _ in InputFileColumn) - def _parse_single_record(self, r: CsvRow, *, exp_len: int) -> PointRecord: + @classmethod + def _parse_single_record(cls, r: CsvRow, *, exp_len: int) -> PointRecord: if not isinstance(r, list): raise TypeError(f"Record to parse must be list, not {type(r).__name__}") if len(r) != exp_len: raise ValueError(f"Expected record of length {exp_len} but got {len(r)}: {r}") - trace = TraceId(int(r[self.InputFileColumn.TRACE.get])) - timepoint = Timepoint(int(r[self.InputFileColumn.TIMEPOINT.get])) - z = float(r[self.InputFileColumn.Z.get]) - y = float(r[self.InputFileColumn.Y.get]) - x = float(r[self.InputFileColumn.X.get]) + trace = TraceId(int(r[cls.InputFileColumn.TRACE.get])) + timepoint = Timepoint(int(r[cls.InputFileColumn.TIMEPOINT.get])) + z = float(r[cls.InputFileColumn.Z.get]) + y = float(r[cls.InputFileColumn.Y.get]) + x = float(r[cls.InputFileColumn.X.get]) point = ImagePoint3D(z=z, y=y, x=x) return PointRecord(trace_id=trace, timepoint=timepoint, point=point) - def _gen_records(self, data: PathLike) -> Iterable[CsvRow]: + @classmethod + def _gen_records(cls, data: PathLike) -> Iterable[CsvRow]: with open(data, newline="") as fh: # noqa: PTH123 rows = list(csv.reader(fh)) return rows - def _parse_single_qcpass_record(self, record: CsvRow) -> PointRecord: - return self._parse_single_record(record, exp_len=self._number_of_columns - 1) - - def _parse_single_qcfail_record(self, record: CsvRow) -> tuple[PointRecord, QCFailReasons]: - pt_rec = self._parse_single_record(record, exp_len=self._number_of_columns) - fail_code = record[self.InputFileColumn.QC.get] + @classmethod + def _parse_single_qcpass_record(cls, record: CsvRow) -> PointRecord: + return cls._parse_single_record(record, exp_len=cls._number_of_columns - 1) + + @classmethod + def _parse_single_qcfail_record(cls, record: CsvRow) -> tuple[PointRecord, QCFailReasons]: + pt_rec = cls._parse_single_record(record, exp_len=cls._number_of_columns) + fail_code = record[cls.InputFileColumn.QC.get] return pt_rec, fail_code - diff --git a/tests/test_locus_points_smoke.py b/tests/test_locus_points_smoke.py index 851afd4..d0e1e66 100644 --- a/tests/test_locus_points_smoke.py +++ b/tests/test_locus_points_smoke.py @@ -1,10 +1,8 @@ """Smoketests for locus-specific points""" from math import ceil - -import pytest - -from looptrace_loci_vis.reader import parse_failed +from looptrace_loci_vis.points_parser import HeadlessTraceTimePointParser +from looptrace_loci_vis.reader import records_to_qcfail_layer_data FAIL_LINES_SAMPLE = """0,13,5.880338307654485,12.20211975317036,10.728294496728491,S 0,17,10.594366532607864,10.95875680073854,20.711938561802768,R;S;xy;z @@ -19,9 +17,12 @@ """ -@pytest.mark.parametrize("keep_line_ends", [False, True]) -def test_failed_sample_line_count(keep_line_ends): - lines = FAIL_LINES_SAMPLE.splitlines(keepends=keep_line_ends) +def test_failed_sample_line_count(tmp_path): + lines = FAIL_LINES_SAMPLE.splitlines(keepends=True) + data_file = tmp_path / "spots.qcfail.csv" + with data_file.open(mode="w") as fh: + for data_line in lines: + fh.write(data_line) exp_line_count = 10 assert len(lines) == exp_line_count, f"Expected {exp_line_count} lines but got {len(lines)}" z_field = 2 @@ -29,7 +30,8 @@ def test_failed_sample_line_count(keep_line_ends): exp_z_ceil = 11 assert obs_z_ceil == exp_z_ceil, f"Expected max Z of {exp_z_ceil} but got {obs_z_ceil}" exp_record_count = exp_z_ceil * exp_line_count - records, _, _ = parse_failed([l.split(",") for l in lines]) # noqa: E741 + init_recs = HeadlessTraceTimePointParser.parse_all_qcfail(data_file) # noqa: E741 + records, _, _ = records_to_qcfail_layer_data(init_recs) assert ( len(records) == exp_record_count ), f"Expected {exp_record_count} records but got {len(records)}" From e5cd4ea86a42bec443ac386d875fa3b1603af0bf Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Mon, 27 May 2024 17:14:29 +0200 Subject: [PATCH 06/12] fix record generation for header file case to be DataFrame rows, not whole table --- looptrace_loci_vis/points_parser.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py index 4538729..3396e71 100644 --- a/looptrace_loci_vis/points_parser.py +++ b/looptrace_loci_vis/points_parser.py @@ -57,8 +57,9 @@ class HeadedTraceTimePointParser(IterativePointsParser[PathLike, MappingLike]): TIME_INDEX_COLUMN = "timeIndex" @classmethod - def _gen_records(cls, data: PathLike) -> pd.DataFrame: - return pd.read_csv(data) + def _gen_records(cls, data: PathLike) -> Iterable[MappingLike]: + for _, row in pd.read_csv(data).iterrows(): + yield row @classmethod def _parse_single_qcpass_record(cls, record: MappingLike) -> PointRecord: From a63fda2508bc254bc36e3810dabfe64ab2d8d612 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Tue, 28 May 2024 12:32:48 +0200 Subject: [PATCH 07/12] adjust to updated looptrace output column name; https://github.com/gerlichlab/looptrace/commit/0646ab6673d97eda74e3101320c6f726b81a8529 --- looptrace_loci_vis/points_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py index 3396e71..3fe51b7 100644 --- a/looptrace_loci_vis/points_parser.py +++ b/looptrace_loci_vis/points_parser.py @@ -63,7 +63,7 @@ def _gen_records(cls, data: PathLike) -> Iterable[MappingLike]: @classmethod def _parse_single_qcpass_record(cls, record: MappingLike) -> PointRecord: - trace = TraceId(int(record["traceId"])) + trace = TraceId(int(record["traceIndex"])) timepoint = Timepoint(int(record[cls.TIME_INDEX_COLUMN])) z = float(record["z"]) y = float(record["y"]) From cfa87c6d13cf0a2df1e905628f962be11235c4ef Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Thu, 30 May 2024 18:09:01 +0200 Subject: [PATCH 08/12] v0.2.0 prep --- CHANGELOG.md | 4 ++-- looptrace_loci_vis/__init__.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e386c3c..43dcfda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [v0.2.0] - 2024-05-30 ### Changed -* This project now uses `pandas` and parses a table-like file (CSV) _with_ header, to support upstream changes in data generation by `looptrace`. +* This project now can use `pandas` and parses a table-like file (CSV) _with_ header, to support upstream changes in data generation by `looptrace`. * Splitting old functionality out into separate modules ## [v0.1.0] - 2024-04-20 diff --git a/looptrace_loci_vis/__init__.py b/looptrace_loci_vis/__init__.py index d995f84..f44e686 100644 --- a/looptrace_loci_vis/__init__.py +++ b/looptrace_loci_vis/__init__.py @@ -1,3 +1,3 @@ """Napari plugin for visualising locus-specific points from looptrace""" -__version__ = "0.2dev" +__version__ = "0.2.0" From 140eb401af4d788666aa9a8349832ca78a3f20c5 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Thu, 30 May 2024 18:56:27 +0200 Subject: [PATCH 09/12] pass ruff check --- looptrace_loci_vis/_const.py | 1 + looptrace_loci_vis/point_record.py | 3 +-- looptrace_loci_vis/points_parser.py | 38 +++++++++++++++-------------- looptrace_loci_vis/reader.py | 26 ++++++++++++++------ pyproject.toml | 1 + tests/test_locus_points_smoke.py | 3 ++- 6 files changed, 43 insertions(+), 29 deletions(-) diff --git a/looptrace_loci_vis/_const.py b/looptrace_loci_vis/_const.py index 4fa82c1..1313976 100644 --- a/looptrace_loci_vis/_const.py +++ b/looptrace_loci_vis/_const.py @@ -2,6 +2,7 @@ from enum import Enum + class PointColor(Enum): # See: https://davidmathlogic.com/colorblind/ DEEP_SKY_BLUE = "#0C7BDC" diff --git a/looptrace_loci_vis/point_record.py b/looptrace_loci_vis/point_record.py index 0b5c0d0..6c139e3 100644 --- a/looptrace_loci_vis/point_record.py +++ b/looptrace_loci_vis/point_record.py @@ -5,11 +5,10 @@ from typing import Union import numpy as np -from numpydoc_decorator import doc - from gertils.geometry import ImagePoint3D, LocatableXY, LocatableZ, ZCoordinate from gertils.types import TimepointFrom0 as Timepoint from gertils.types import TraceIdFrom0 as TraceId +from numpydoc_decorator import doc from ._types import FlatPointRecord diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py index 3fe51b7..8fd160f 100644 --- a/looptrace_loci_vis/points_parser.py +++ b/looptrace_loci_vis/points_parser.py @@ -1,39 +1,40 @@ """Abstractions related to points parsing""" -from collections.abc import Iterable, Sized import csv +from collections.abc import Iterable, Sized from enum import Enum from typing import Generic, Protocol, TypeVar import pandas as pd - from gertils.geometry import ImagePoint3D from gertils.types import TimepointFrom0 as Timepoint from gertils.types import TraceIdFrom0 as TraceId -from .point_record import PointRecord from ._types import CsvRow, PathLike, QCFailReasons +from .point_record import PointRecord Input = TypeVar("Input", contravariant=True) I1 = TypeVar("I1") I2 = TypeVar("I2", bound=Sized) -class MappingLike(Protocol): - def __getitem__(key: str) -> object: ... +class MappingLike(Protocol): # noqa: D101 + def __getitem__(self, key: str) -> object: ... class PointsParser(Protocol, Generic[Input]): + """Something capable of parsing a QC-pass or -fail CSV file""" @classmethod - def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... - + def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... # noqa: D102 + @classmethod - def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... + def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... # noqa: D102 class IterativePointsParser(Generic[I1, I2], PointsParser[I1]): - + """Something that yields records, each of type I2 from value of type I1, to parse QC-pass/-fail points""" + @classmethod def _gen_records(cls, data: I1) -> Iterable[I2]: ... @@ -44,15 +45,16 @@ def _parse_single_qcpass_record(cls, record: I2) -> PointRecord: ... def _parse_single_qcfail_record(cls, record: I2) -> tuple[PointRecord, QCFailReasons]: ... @classmethod - def parse_all_qcpass(cls, data: I1) -> list[PointRecord]: + def parse_all_qcpass(cls, data: I1) -> list[PointRecord]: # noqa: D102 return [cls._parse_single_qcpass_record(r) for r in cls._gen_records(data)] - + @classmethod - def parse_all_qcfail(cls, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: + def parse_all_qcfail(cls, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D102 return [cls._parse_single_qcfail_record(r) for r in cls._gen_records(data)] class HeadedTraceTimePointParser(IterativePointsParser[PathLike, MappingLike]): + """Something capable of parsing a headed CSV of QC-pass/-fail points records""" TIME_INDEX_COLUMN = "timeIndex" @@ -60,7 +62,7 @@ class HeadedTraceTimePointParser(IterativePointsParser[PathLike, MappingLike]): def _gen_records(cls, data: PathLike) -> Iterable[MappingLike]: for _, row in pd.read_csv(data).iterrows(): yield row - + @classmethod def _parse_single_qcpass_record(cls, record: MappingLike) -> PointRecord: trace = TraceId(int(record["traceIndex"])) @@ -81,9 +83,10 @@ def _parse_single_qcfail_record(cls, record: MappingLike) -> tuple[PointRecord, class HeadlessTraceTimePointParser(IterativePointsParser[PathLike, CsvRow]): """Parser for input file with no header, and field for trace ID and timepoint in addition to coordinates""" - + class InputFileColumn(Enum): """Indices of the different columns to parse as particular fields""" + TRACE = 0 TIMEPOINT = 1 Z = 2 @@ -115,14 +118,13 @@ def _parse_single_record(cls, r: CsvRow, *, exp_len: int) -> PointRecord: @classmethod def _gen_records(cls, data: PathLike) -> Iterable[CsvRow]: with open(data, newline="") as fh: # noqa: PTH123 - rows = list(csv.reader(fh)) - return rows - + return list(csv.reader(fh)) + @classmethod def _parse_single_qcpass_record(cls, record: CsvRow) -> PointRecord: return cls._parse_single_record(record, exp_len=cls._number_of_columns - 1) - @classmethod + @classmethod def _parse_single_qcfail_record(cls, record: CsvRow) -> tuple[PointRecord, QCFailReasons]: pt_rec = cls._parse_single_record(record, exp_len=cls._number_of_columns) fail_code = record[cls.InputFileColumn.QC.get] diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index f2f81a4..e8d11f2 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -1,8 +1,8 @@ """Reading locus-specific spots and points data from looptrace for visualisation in napari""" -from collections.abc import Callable import logging import os +from collections.abc import Callable from enum import Enum from pathlib import Path from typing import Optional @@ -12,10 +12,18 @@ from gertils.zarr_tools import read_zarr from numpydoc_decorator import doc # type: ignore[import-untyped] -from .points_parser import HeadedTraceTimePointParser, HeadlessTraceTimePointParser, PointsParser -from .point_record import PointRecord, expand_along_z from ._const import PointColor -from ._types import ImageLayer, LayerParams, PathLike, PathOrPaths, PointsLayer, QCFailReasons, Reader +from ._types import ( + ImageLayer, + LayerParams, + PathLike, + PathOrPaths, + PointsLayer, + QCFailReasons, + Reader, +) +from .point_record import PointRecord, expand_along_z +from .points_parser import HeadedTraceTimePointParser, HeadlessTraceTimePointParser, PointsParser class QCStatus(Enum): @@ -112,7 +120,7 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: } qc = QCStatus.from_csv_path(path) - + # Determine how to read and display the points layer to be parsed. # First, determine the parsing strategy based on file header. parser: PointsParser[PathLike] @@ -140,7 +148,7 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: raise ValueError( f"Despite undertaking parse, file from which QC status could not be parsed was encountered: {path}" ) - + # Use the information gleaned from filename and from file header to determine point color and to read data. color_meta = {"edge_color": color.value, "face_color": color.value} base_point_records = read_file(path) @@ -155,6 +163,7 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: def records_to_qcpass_layer_data(records: list[PointRecord]) -> tuple[list[PointRecord], list[bool], LayerParams]: + """Extend the given records partially through a z-stack, designate appropriately as central-plane or not.""" max_z = max(r.get_z_coordinate() for r in records) points: list[PointRecord] = [] center_flags: list[bool] = [] @@ -163,10 +172,11 @@ def records_to_qcpass_layer_data(records: list[PointRecord]) -> tuple[list[Point points.extend(new_points) center_flags.extend(new_flags) sizes = [1.5 if is_center else 1.0 for is_center in center_flags] - return points, center_flags, {"size": sizes} + return points, center_flags, {"size": sizes} def records_to_qcfail_layer_data(record_qc_pairs: list[tuple[PointRecord, QCFailReasons]]) -> tuple[list[PointRecord], list[bool], LayerParams]: + """Extend the given records partially through a z-stack, designate appropriately as central-plane or not; also set fail codes text.""" max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) points: list["PointRecord"] = [] center_flags: list[bool] = [] @@ -198,6 +208,6 @@ def _do_not_parse(*, path: PathLike, why: str, level: int = logging.DEBUG) -> No def _has_header(path: PathLike) -> bool: - with open(path, "r") as fh: # noqa: PTH123 + with open(path) as fh: # noqa: PTH123 header = fh.readline() return HeadedTraceTimePointParser.TIME_INDEX_COLUMN in header diff --git a/pyproject.toml b/pyproject.toml index 7e2dcb0..c3845e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,7 @@ ignore = [ "N818", # Exception should be named with an Error suffix "PLR0913", # Too many arguments in function definition "D401", # First line of docstring should be in imperative mood + "PLC0105", # This suggests suffixing a type variable with variance type (e.g., co/contra) # Imports and type annotations "ANN003", # Missing type annotation for kwargs diff --git a/tests/test_locus_points_smoke.py b/tests/test_locus_points_smoke.py index d0e1e66..f97f017 100644 --- a/tests/test_locus_points_smoke.py +++ b/tests/test_locus_points_smoke.py @@ -1,6 +1,7 @@ """Smoketests for locus-specific points""" from math import ceil + from looptrace_loci_vis.points_parser import HeadlessTraceTimePointParser from looptrace_loci_vis.reader import records_to_qcfail_layer_data @@ -30,7 +31,7 @@ def test_failed_sample_line_count(tmp_path): exp_z_ceil = 11 assert obs_z_ceil == exp_z_ceil, f"Expected max Z of {exp_z_ceil} but got {obs_z_ceil}" exp_record_count = exp_z_ceil * exp_line_count - init_recs = HeadlessTraceTimePointParser.parse_all_qcfail(data_file) # noqa: E741 + init_recs = HeadlessTraceTimePointParser.parse_all_qcfail(data_file) records, _, _ = records_to_qcfail_layer_data(init_recs) assert ( len(records) == exp_record_count From 83bcd4a8b4a897966e02062857d9707bcad4f9cd Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Thu, 30 May 2024 18:57:13 +0200 Subject: [PATCH 10/12] apply ruff formatting --- looptrace_loci_vis/points_parser.py | 12 ++++++------ looptrace_loci_vis/reader.py | 12 +++++++++--- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py index 8fd160f..9e8bfaf 100644 --- a/looptrace_loci_vis/points_parser.py +++ b/looptrace_loci_vis/points_parser.py @@ -18,7 +18,7 @@ I2 = TypeVar("I2", bound=Sized) -class MappingLike(Protocol): # noqa: D101 +class MappingLike(Protocol): # noqa: D101 def __getitem__(self, key: str) -> object: ... @@ -26,10 +26,10 @@ class PointsParser(Protocol, Generic[Input]): """Something capable of parsing a QC-pass or -fail CSV file""" @classmethod - def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... # noqa: D102 + def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... # noqa: D102 @classmethod - def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... # noqa: D102 + def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... # noqa: D102 class IterativePointsParser(Generic[I1, I2], PointsParser[I1]): @@ -45,11 +45,11 @@ def _parse_single_qcpass_record(cls, record: I2) -> PointRecord: ... def _parse_single_qcfail_record(cls, record: I2) -> tuple[PointRecord, QCFailReasons]: ... @classmethod - def parse_all_qcpass(cls, data: I1) -> list[PointRecord]: # noqa: D102 + def parse_all_qcpass(cls, data: I1) -> list[PointRecord]: # noqa: D102 return [cls._parse_single_qcpass_record(r) for r in cls._gen_records(data)] @classmethod - def parse_all_qcfail(cls, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D102 + def parse_all_qcfail(cls, data: I1) -> list[tuple[PointRecord, QCFailReasons]]: # noqa: D102 return [cls._parse_single_qcfail_record(r) for r in cls._gen_records(data)] @@ -117,7 +117,7 @@ def _parse_single_record(cls, r: CsvRow, *, exp_len: int) -> PointRecord: @classmethod def _gen_records(cls, data: PathLike) -> Iterable[CsvRow]: - with open(data, newline="") as fh: # noqa: PTH123 + with open(data, newline="") as fh: # noqa: PTH123 return list(csv.reader(fh)) @classmethod diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index e8d11f2..9249e81 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -125,7 +125,9 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: # First, determine the parsing strategy based on file header. parser: PointsParser[PathLike] read_file: Callable[[PathLike], list[PointRecord]] - process_records: Callable[[list[PointRecord]], tuple[list[PointRecord], list[bool], LayerParams]] + process_records: Callable[ + [list[PointRecord]], tuple[list[PointRecord], list[bool], LayerParams] + ] if _has_header(path): logging.debug("Will parse has having header: %s", path) parser = HeadedTraceTimePointParser @@ -162,7 +164,9 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: return [pt_rec.flatten() for pt_rec in point_records], params, "points" -def records_to_qcpass_layer_data(records: list[PointRecord]) -> tuple[list[PointRecord], list[bool], LayerParams]: +def records_to_qcpass_layer_data( + records: list[PointRecord], +) -> tuple[list[PointRecord], list[bool], LayerParams]: """Extend the given records partially through a z-stack, designate appropriately as central-plane or not.""" max_z = max(r.get_z_coordinate() for r in records) points: list[PointRecord] = [] @@ -175,7 +179,9 @@ def records_to_qcpass_layer_data(records: list[PointRecord]) -> tuple[list[Point return points, center_flags, {"size": sizes} -def records_to_qcfail_layer_data(record_qc_pairs: list[tuple[PointRecord, QCFailReasons]]) -> tuple[list[PointRecord], list[bool], LayerParams]: +def records_to_qcfail_layer_data( + record_qc_pairs: list[tuple[PointRecord, QCFailReasons]], +) -> tuple[list[PointRecord], list[bool], LayerParams]: """Extend the given records partially through a z-stack, designate appropriately as central-plane or not; also set fail codes text.""" max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) points: list["PointRecord"] = [] From ca20e9fca361258e9ab03eba83a55931d63f3146 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Thu, 30 May 2024 19:18:30 +0200 Subject: [PATCH 11/12] satisfy ruff and mypy --- looptrace_loci_vis/point_record.py | 2 +- looptrace_loci_vis/points_parser.py | 25 +++++++++++++++++++------ looptrace_loci_vis/reader.py | 9 ++------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/looptrace_loci_vis/point_record.py b/looptrace_loci_vis/point_record.py index 6c139e3..2d31081 100644 --- a/looptrace_loci_vis/point_record.py +++ b/looptrace_loci_vis/point_record.py @@ -8,7 +8,7 @@ from gertils.geometry import ImagePoint3D, LocatableXY, LocatableZ, ZCoordinate from gertils.types import TimepointFrom0 as Timepoint from gertils.types import TraceIdFrom0 as TraceId -from numpydoc_decorator import doc +from numpydoc_decorator import doc # type: ignore[import-untyped] from ._types import FlatPointRecord diff --git a/looptrace_loci_vis/points_parser.py b/looptrace_loci_vis/points_parser.py index 9e8bfaf..4cf2a4e 100644 --- a/looptrace_loci_vis/points_parser.py +++ b/looptrace_loci_vis/points_parser.py @@ -1,5 +1,6 @@ """Abstractions related to points parsing""" +import abc import csv from collections.abc import Iterable, Sized from enum import Enum @@ -18,17 +19,23 @@ I2 = TypeVar("I2", bound=Sized) -class MappingLike(Protocol): # noqa: D101 +class MappingLike(Protocol, Sized): # noqa: D101 + @abc.abstractmethod def __getitem__(self, key: str) -> object: ... + @abc.abstractmethod + def __len__(self) -> int: ... + class PointsParser(Protocol, Generic[Input]): """Something capable of parsing a QC-pass or -fail CSV file""" @classmethod + @abc.abstractmethod def parse_all_qcpass(cls, data: Input) -> list[PointRecord]: ... # noqa: D102 @classmethod + @abc.abstractmethod def parse_all_qcfail(cls, data: Input) -> list[tuple[PointRecord, QCFailReasons]]: ... # noqa: D102 @@ -36,12 +43,15 @@ class IterativePointsParser(Generic[I1, I2], PointsParser[I1]): """Something that yields records, each of type I2 from value of type I1, to parse QC-pass/-fail points""" @classmethod + @abc.abstractmethod def _gen_records(cls, data: I1) -> Iterable[I2]: ... @classmethod + @abc.abstractmethod def _parse_single_qcpass_record(cls, record: I2) -> PointRecord: ... @classmethod + @abc.abstractmethod def _parse_single_qcfail_record(cls, record: I2) -> tuple[PointRecord, QCFailReasons]: ... @classmethod @@ -65,11 +75,11 @@ def _gen_records(cls, data: PathLike) -> Iterable[MappingLike]: @classmethod def _parse_single_qcpass_record(cls, record: MappingLike) -> PointRecord: - trace = TraceId(int(record["traceIndex"])) - timepoint = Timepoint(int(record[cls.TIME_INDEX_COLUMN])) - z = float(record["z"]) - y = float(record["y"]) - x = float(record["x"]) + trace = TraceId(int(record["traceIndex"])) # type: ignore[call-overload] + timepoint = Timepoint(int(record[cls.TIME_INDEX_COLUMN])) # type: ignore[call-overload] + z = float(record["z"]) # type: ignore[arg-type] + y = float(record["y"]) # type: ignore[arg-type] + x = float(record["x"]) # type: ignore[arg-type] point = ImagePoint3D(z=z, y=y, x=x) return PointRecord(trace_id=trace, timepoint=timepoint, point=point) @@ -78,6 +88,9 @@ def _parse_single_qcfail_record(cls, record: MappingLike) -> tuple[PointRecord, """A fail record parses the same as a pass one, just with one additional field for QC fail reasons.""" pt_rec = cls._parse_single_qcpass_record(record) fail_code = record["failCode"] + if not isinstance(fail_code, str): + raise TypeError(f"failCode is not str, but {type(fail_code).__name__}") + fail_code: str = str(fail_code) # type: ignore[no-redef] return pt_rec, fail_code diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index 9249e81..9e01c88 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -2,7 +2,6 @@ import logging import os -from collections.abc import Callable from enum import Enum from pathlib import Path from typing import Optional @@ -124,10 +123,6 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: # Determine how to read and display the points layer to be parsed. # First, determine the parsing strategy based on file header. parser: PointsParser[PathLike] - read_file: Callable[[PathLike], list[PointRecord]] - process_records: Callable[ - [list[PointRecord]], tuple[list[PointRecord], list[bool], LayerParams] - ] if _has_header(path): logging.debug("Will parse has having header: %s", path) parser = HeadedTraceTimePointParser @@ -143,8 +138,8 @@ def build_single_file_points_layer(path: PathLike) -> PointsLayer: elif qc == QCStatus.FAIL: logging.debug("Will parse as QC-fail: %s", path) color = PointColor.DEEP_SKY_BLUE - read_file = parser.parse_all_qcfail - process_records = records_to_qcfail_layer_data + read_file = parser.parse_all_qcfail # type: ignore[assignment] + process_records = records_to_qcfail_layer_data # type: ignore[assignment] else: _do_not_parse(path=path, why="Could not infer QC status", level=logging.ERROR) raise ValueError( From 7457a79bd694c3b26c2c6aedd561578a5b89b549 Mon Sep 17 00:00:00 2001 From: Vince Reuter Date: Thu, 30 May 2024 19:27:29 +0200 Subject: [PATCH 12/12] use and apply newer rough; fix version to prevent local/CI mismatch --- looptrace_loci_vis/reader.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/looptrace_loci_vis/reader.py b/looptrace_loci_vis/reader.py index 9e01c88..6c7d852 100644 --- a/looptrace_loci_vis/reader.py +++ b/looptrace_loci_vis/reader.py @@ -179,7 +179,7 @@ def records_to_qcfail_layer_data( ) -> tuple[list[PointRecord], list[bool], LayerParams]: """Extend the given records partially through a z-stack, designate appropriately as central-plane or not; also set fail codes text.""" max_z = max(r.get_z_coordinate() for r, _ in record_qc_pairs) - points: list["PointRecord"] = [] + points: list[PointRecord] = [] center_flags: list[bool] = [] codes: list[QCFailReasons] = [] for rec, qc in record_qc_pairs: diff --git a/pyproject.toml b/pyproject.toml index c3845e1..dda80b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ looptrace-loci-vis = "looptrace_loci_vis:napari.yaml" [project.optional-dependencies] formatting = [ "codespell >= 2.2.4", - "ruff >= 0.3", + "ruff >= 0.4.6", ] linting = [ "mypy >= 1.0.1",