Skip to content

Commit

Permalink
finish first-pass at signal analysis program and integration with pip…
Browse files Browse the repository at this point in the history
…eline, gerlichlab#337
  • Loading branch information
vreuter committed Nov 24, 2024
1 parent b444a3e commit 8845256
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 42 deletions.
21 changes: 19 additions & 2 deletions bin/cli/run_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import sys
from typing import *

from expression import Result
from expression import Option, Result
from gertils import ExtantFile, ExtantFolder
import pandas as pd
import pypiper
Expand Down Expand Up @@ -41,6 +41,7 @@
from zip_spot_image_files_for_tracing import workflow as run_spot_zipping
from tracing import workflow as run_chromatin_tracing
from locus_spot_visualisation_data_preparation import workflow as get_locus_spot_data_file_src_dst_pairs
from run_processing_pipeline import workflow as signal_analysis_workflow


DECON_STAGE_NAME = "deconvolution"
Expand Down Expand Up @@ -434,10 +435,19 @@ def filter_spots_for_nuclei(rounds_config: ExtantFile, params_config: ExtantFile
class LooptracePipeline(pypiper.Pipeline):
"""Main looptrace processing pipeline"""

def __init__(self, rounds_config: ExtantFile, params_config: ExtantFile, images_folder: ExtantFolder, output_folder: ExtantFolder, **pl_mgr_kwargs: Any) -> None:
def __init__(
self,
rounds_config: ExtantFile,
params_config: ExtantFile,
images_folder: ExtantFolder,
output_folder: ExtantFolder,
signal_config: Option[ExtantFile],
**pl_mgr_kwargs: Any,
) -> None:
self.rounds_config = rounds_config
self.params_config = params_config
self.images_folder = images_folder
self.signal_config = signal_config
super(LooptracePipeline, self).__init__(name=PIPE_NAME, outfolder=str(output_folder.path), **pl_mgr_kwargs)

def stages(self) -> list[pypiper.Stage]:
Expand Down Expand Up @@ -544,6 +554,11 @@ def stages(self) -> list[pypiper.Stage]:
func=run_locus_spot_viewing_prep,
f_kwargs=rounds_params,
),
pypiper.Stage(
name="cross_channel_signal_analysis",
func=signal_analysis_workflow,
f_kwargs={**rounds_params, "maybe_signal_config": self.signal_config},
),
]


Expand All @@ -553,6 +568,7 @@ def parse_cli(args: Iterable[str]) -> argparse.Namespace:
parser.add_argument("--params-config", type=ExtantFile.from_string, required=True, help="Path to the parameters configuration file")
parser.add_argument("-I", "--images-folder", type=ExtantFolder.from_string, required=True, help="Path to the root folder with imaging data to process")
parser.add_argument("--pypiper-folder", type=ExtantFolder.from_string, required=True, help="Path to folder for pypiper output")
parser.add_argument("--signal-config", type=ExtantFile.from_string, help="Path to signal analysis config file, if using that feature")
parser.add_argument(NO_TEE_LOGS_OPTNAME, action="store_true", help="Do not tee logging output from pypiper manager")
parser = pypiper.add_pypiper_args(
parser,
Expand All @@ -566,6 +582,7 @@ def init(opts: argparse.Namespace) -> LooptracePipeline:
kwargs = {
"rounds_config": opts.rounds_config,
"params_config": opts.params_config,
"signal_config": Option.of_obj(opts.signal_config),
"images_folder": opts.images_folder,
"output_folder": opts.pypiper_folder,
}
Expand Down
159 changes: 119 additions & 40 deletions bin/cli/run_signal_analysis.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,60 @@
"""Analyze pixel value statistics in regions of interest, across channels."""

import argparse
from collections import Counter
from collections import Counter, defaultdict
from dataclasses import dataclass
from enum import Enum
import json
import logging
from operator import itemgetter
from pathlib import Path
import sys
from typing import Iterable, Mapping, Optional, TypeAlias, TypeVar
from typing import TYPE_CHECKING, Iterable, Mapping, TypeAlias, TypeVar

from expression import Option, Result, compose, snd
from expression.collections import Seq, seq
from expression.core import result
from expression.core import option, result
from gertils import ExtantFile, compute_pixel_statistics
from gertils.geometry import ImagePoint3D
from gertils.pixel_value_statistics import Numeric as PixelStatValue
from gertils.types import ImagingChannel
if TYPE_CHECKING:
import numpy.typing as npt
import pandas as pd
from spotfishing.roi_tools import get_centroid_from_record

from looptrace import FIELD_OF_VIEW_COLUMN
from looptrace.ImageHandler import ImageHandler
from looptrace.SpotPicker import SpotPicker
from looptrace.utilities import find_first_option, get_either, wrap_exception, wrap_error_message

_A = TypeVar("_")

SIGNAL_CHANNEL_COLUMN = "signalChannel"


def _parse_cmdl(cmdl: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Analyze pixel value statistics in regions of interest, across channels.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument()
parser.add_argument(
"--rounds-config",
required=True,
type=ExtantFile.from_string,
help="Path to the imaging rounds configuration file",
)
parser.add_argument(
"--params-config",
required=True,
type=ExtantFile.from_string,
help="Path to the looptrace parameters configuration file",
)
parser.add_argument(
"--signal-config",
type=ExtantFile.from_string,
help="Path to the configuration file declaring ROI diameters and channels to analyse",
)
return parser.parse_args(cmdl)


Expand All @@ -35,7 +63,7 @@ class RoiType(Enum):
Regional = "spots_for_voxels_definition_file"

@property
def image_handler_attribute(self) -> str:
def file_attribute_on_image_handler(self) -> str:
return self.value

@classmethod
Expand Down Expand Up @@ -93,6 +121,79 @@ def from_mapping(cls, data: Mapping[str, object]) -> Result["AnalyticalSpecifica
)))


# TODO: refactor with https://github.com/gerlichlab/gertils/issues/32
def run_cross_channel_signal_analysis(
*,
img: npt.ArrayLike,
roi_diameter: int,
channels: Iterable[ImagingChannel],
points: Iterable[ImagePoint3D],
) -> Iterable[dict[str, PixelStatValue]]:
for pt in points:
yield compute_pixel_statistics(
img=img,
pt=pt,
channels=channels,
diameter=roi_diameter,
channel_column=SIGNAL_CHANNEL_COLUMN,
)


def workflow(*, rounds_config: ExtantFile, params_config: ExtantFile, maybe_signal_config: Option[ExtantFile]) -> None:
match maybe_signal_config:
case option.Option(tag="none", none=_):
logging.info("No signal config file, skipping cross-channel signal analysis")
return
case option.Option(tag="some", some=signal_config_file):
conf_path: Path = signal_config_file.path
logging.info("Parsing signal analysis configuration: %s", conf_path)
with conf_path.open(mode="r") as fh:
conf_data = json.load(fh)
if not isinstance(conf_data, list):
raise TypeError(f"Parsed signal config data (from {conf_path}) is {type(conf_data).__name__}")
try:
analysis_specs: list[AnalyticalSpecification] = list(map(AnalyticalSpecification.from_mapping, conf_data))
except (TypeError, ValueError) as e:
logging.error("Failed to parse analytical specifications (from %s): %s", conf_path, e)
raise

H = ImageHandler(rounds_config=rounds_config, params_config=params_config)
S = SpotPicker(H)

for spec in analysis_specs:
# Get the ROIs of this type.
roi_type: RoiType = spec.roi_type
logging.info("Analyzing signal for ROI type '%s'", roi_type.name)
rois_file: Path = getattr(H, roi_type.file_attribute_on_image_handler)
all_rois: pd.DataFrame = pd.read_csv(rois_file, index_col=False)

# Build up the records for this ROI type, for all FOVs.
by_raw_channel: Mapping[int, list[dict]] = defaultdict
for fov, img in S.iter_fov_img_pairs():
logging.info(f"Analysing signal for FOV: {fov}")
rois: pd.DataFrame = all_rois[all_rois[FIELD_OF_VIEW_COLUMN] == fov]
logging.debug("ROI count: %d", rois.shape[0])
for _, r in rois.iterrows():
pt: ImagePoint3D = get_centroid_from_record(r)
for stats in compute_pixel_statistics(
img=img,
pt=pt,
channels=spec.channels,
diameter=spec.roi_diameter,
channel_column=SIGNAL_CHANNEL_COLUMN,
):
ch: int = stats[SIGNAL_CHANNEL_COLUMN]
by_raw_channel[ch].append({**r.to_dict(), **stats})

# Write the output file for this ROI type, across all FOVs.
for raw_channel, records in sorted(by_raw_channel.items(), key=itemgetter(0)):
stats_frame: pd.DataFrame = pd.DataFrame(records)
fn = f"signal_analysis__rois_{roi_type.name}__channel_{raw_channel}.csv"
outfile: Path = Path(H.analysis_path) / fn
logging.info("Writing output file: %s", outfile)
stats_frame.to_csv(outfile, index=False)


def _ensure_unique(items: Iterable[_A]) -> Result[set[_A], str]:
try:
counts = Counter(items)
Expand All @@ -102,14 +203,6 @@ def _ensure_unique(items: Iterable[_A]) -> Result[set[_A], str]:
return Result.Ok(set(counts.keys())) if not repeats else Result.Error(f"{len(repeats)} repeated item(s); counts: {repeats}")


def _parse_roi_diameter(obj: object) -> Result[int, str]:
match obj:
case int():
return Result.Ok(obj)
case _:
return Result.Error(f"ROI diameter must be integer, not {type(obj).__name__} ({obj})")


def _parse_imaging_channels(channels: object) -> Result[list[ImagingChannel], Seq[str]]:
State: TypeAlias = Result[Seq[ImagingChannel], Seq[str]]

Expand All @@ -130,6 +223,14 @@ def proc1(acc: State, obj: object) -> State:
.map(lambda seq: seq.to_list())


def _parse_roi_diameter(obj: object) -> Result[int, str]:
match obj:
case int():
return Result.Ok(obj)
case _:
return Result.Error(f"ROI diameter must be integer, not {type(obj).__name__} ({obj})")


@wrap_error_message("Getting list from object")
@wrap_exception(TypeError)
def _list_from_object(obj: object) -> Result[list[object], str]:
Expand All @@ -142,35 +243,13 @@ def _unsafe_parse_imaging_channel(obj: object) -> Result[ImagingChannel, str]:
return ImagingChannel(obj)


def run_cross_channel_signal_analysis(
rounds_config: ExtantFile,
params_config: ExtantFile,
signal_config: Optional[ExtantFile],
) -> None:
if signal_config is None:
logging.info("No signal config file, skipping cross-channel signal analysis")
return
H = ImageHandler(rounds_config=rounds_config, params_config=params_config)
logging.info("Reading ROIs file: %s", str(H.region))
rois = pd.read_csv(H.spots_for_voxels_definition_file, index_col=False)
# TODO: apply drift correction (at least coarse for now)
compute_pixel_statistics(
# TODO: image
# TODO: point
channels=H.iter_signal_analysis_channels(),
diameter=H.signal_analysis_roi_diameter,
channel_column="signalChannel",
)
# TODO: write to output files
logging.info("Done with cross-channel signal analysis")


def workflow() -> None:
pass


def main(cmdl: list[str]) -> None:
opts: argparse.Namespace = _parse_cmdl(cmdl)
workflow(
rounds_config=opts.rounds_config,
params_config=opts.params_config,
maybe_signal_config=Option.of_obj(opts.signal_config),
)


if __name__ == "__main__":
Expand Down

0 comments on commit 8845256

Please sign in to comment.