From 563bd6cd03a1a204a6314431ff7b3b1713b6b932 Mon Sep 17 00:00:00 2001 From: Philipp Schlegel Date: Sun, 29 Sep 2024 17:56:57 +0100 Subject: [PATCH] read_mesh: switch to using BaseReader subclass --- navis/io/mesh_io.py | 283 +++++++++++++++++++++++--------------------- 1 file changed, 150 insertions(+), 133 deletions(-) diff --git a/navis/io/mesh_io.py b/navis/io/mesh_io.py index fb5a0c3f..19ee47ff 100644 --- a/navis/io/mesh_io.py +++ b/navis/io/mesh_io.py @@ -12,13 +12,13 @@ # GNU General Public License for more details. import os +import io -import multiprocessing as mp import trimesh as tm -from pathlib import Path from typing import Union, Iterable, Optional, Dict, Any from typing_extensions import Literal +from urllib3 import HTTPResponse from .. import config, utils, core from . import base @@ -26,28 +26,118 @@ # Set up logging logger = config.get_logger(__name__) +# Mesh files can have all sort of extensions +DEFAULT_FMT = "{name}.{file_ext}" + +# Mesh extensions supported by trimesh +MESH_LOAD_EXT = tuple(tm.exchange.load.mesh_loaders.keys()) +MESH_WRITE_EXT = tuple(tm.exchange.export._mesh_exporters.keys()) + + +class MeshReader(base.BaseReader): + def __init__( + self, + output: str, + fmt: str = DEFAULT_FMT, + attrs: Optional[Dict[str, Any]] = None, + ): + super().__init__( + fmt=fmt, + attrs=attrs, + file_ext=MESH_LOAD_EXT, + name_fallback="MESH", + read_binary=True, + ) + self.output = output + + def format_output(self, x): + # This function replaces the BaseReader.format_output() + # This is to avoid trying to convert multiple (image, header) to NeuronList + if self.output == "trimesh": + return x + elif x: + return core.NeuronList(x) + else: + return core.NeuronList([]) + + @base.handle_errors + def read_buffer( + self, f, attrs: Optional[Dict[str, Any]] = None + ) -> Union[tm.Trimesh, "core.Volume", "core.MeshNeuron"]: + """Read buffer into mesh. + + Parameters + ---------- + f : IO + Readable buffer (must be bytes). + attrs : dict | None + Arbitrary attributes to include in the neurons. + + Returns + ------- + Trimesh | MeshNeuron | Volume + + """ + if isinstance(f, HTTPResponse): + f = io.StringIO(f.content) + + if isinstance(f, bytes): + f = io.BytesIO(f) + + # We need to tell trimesh what file type we are reading + if "file" not in attrs: + raise KeyError( + f'Unable to parse file type. "file" not in attributes: {attrs}' + ) + + file_type = attrs["file"].split(".")[-1] + + mesh = tm.load_mesh(f, file_type=file_type) + + if self.output == "trimesh": + return mesh + elif self.output == "volume": + return core.Volume(mesh.vertices, mesh.faces, **attrs) + + # Turn into a MeshNeuron + n = core.MeshNeuron(mesh) + + # Try adding properties one-by-one. If one fails, we'll keep track of it + # in the `.meta` attribute + meta = {} + for k, v in attrs.items(): + try: + n._register_attr(k, v) + except (AttributeError, ValueError, TypeError): + meta[k] = v + + if meta: + n.meta = meta + + return n -def read_mesh(f: Union[str, Iterable], - include_subdirs: bool = False, - parallel: Union[bool, int] = 'auto', - output: Union[Literal['neuron'], - Literal['volume'], - Literal['trimesh']] = 'neuron', - errors: Union[Literal['raise'], - Literal['log'], - Literal['ignore']] = 'log', - limit: Optional[int] = None, - **kwargs) -> 'core.NeuronObject': - """Create Neuron/List from mesh. + +def read_mesh( + f: Union[str, Iterable], + include_subdirs: bool = False, + parallel: Union[bool, int] = "auto", + output: Union[Literal["neuron"], Literal["volume"], Literal["trimesh"]] = "neuron", + errors: Literal["raise", "log", "ignore"] = "raise", + limit: Optional[int] = None, + fmt: str = "{name}.", + **kwargs, +) -> "core.NeuronObject": + """Load mesh file into Neuron/List. This is a thin wrapper around `trimesh.load_mesh` which supports most - common formats (obj, ply, stl, etc.). + commonly used formats (obj, ply, stl, etc.). Parameters ---------- f : str | iterable - Filename(s) or folder. If folder must include file - extension (e.g. `my/dir/*.ply`). + Filename(s) or folder. If folder should include file + extension (e.g. `my/dir/*.ply`) otherwise all + mesh files in the folder will be read. include_subdirs : bool, optional If True and `f` is a folder, will also search subdirectories for meshes. @@ -59,9 +149,10 @@ def read_mesh(f: Union[str, Iterable], neurons. Integer will be interpreted as the number of cores (otherwise defaults to `os.cpu_count() - 2`). output : "neuron" | "volume" | "trimesh" - Determines function's output. See Returns. + Determines function's output - see `Returns`. errors : "raise" | "log" | "ignore" - If "log" or "ignore", errors will not be raised. + If "log" or "ignore", errors will not be raised and the + mesh will be skipped. Can result in empty output. limit : int | str | slice | list, optional When reading from a folder or archive you can use this parameter to restrict the which files read: @@ -81,19 +172,24 @@ def read_mesh(f: Union[str, Iterable], Returns ------- - navis.MeshNeuron + MeshNeuron If `output="neuron"` (default). - navis.Volume + Volume If `output="volume"`. - trimesh.Trimesh - If `output='trimesh'`. - navis.NeuronList + Trimesh + If `output="trimesh"`. + NeuronList If `output="neuron"` and import has multiple meshes will return NeuronList of MeshNeurons. list If `output!="neuron"` and import has multiple meshes will return list of Volumes or Trimesh. + See Also + -------- + [`navis.read_precomputed`][] + Read meshes and skeletons from Neuroglancer's precomputed format. + Examples -------- @@ -114,101 +210,19 @@ def read_mesh(f: Union[str, Iterable], >>> nl = navis.read_mesh('mesh.obj', output='volume') # doctest: +SKIP """ - utils.eval_param(output, name='output', - allowed_values=('neuron', 'volume', 'trimesh')) - - # If is directory, compile list of filenames - if isinstance(f, str) and '*' in f: - f, ext = f.split('*') - f = Path(f).expanduser() - - if not f.is_dir(): - raise ValueError(f'{f} does not appear to exist') - - if not include_subdirs: - f = list(f.glob(f'*{ext}')) - else: - f = list(f.rglob(f'*{ext}')) - - if limit: - f = f[:limit] - - if utils.is_iterable(f): - # Do not use if there is only a small batch to import - if isinstance(parallel, str) and parallel.lower() == 'auto': - if len(f) < 100: - parallel = False - - if parallel: - # Do not swap this as `isinstance(True, int)` returns `True` - if isinstance(parallel, (bool, str)): - n_cores = os.cpu_count() - 2 - else: - n_cores = int(parallel) - - with mp.Pool(processes=n_cores) as pool: - results = pool.imap(_worker_wrapper, [dict(f=x, - output=output, - errors=errors, - include_subdirs=include_subdirs, - parallel=False) for x in f], - chunksize=1) - - res = list(config.tqdm(results, - desc='Importing', - total=len(f), - disable=config.pbar_hide, - leave=config.pbar_leave)) - - else: - # If not parallel just import the good 'ole way: sequentially - res = [read_mesh(x, - include_subdirs=include_subdirs, - output=output, - errors=errors, - parallel=parallel, - **kwargs) - for x in config.tqdm(f, desc='Importing', - disable=config.pbar_hide, - leave=config.pbar_leave)] - - if output == 'neuron': - return core.NeuronList([r for r in res if r]) - - return res - - try: - # Open the file - fname = '.'.join(os.path.basename(f).split('.')[:-1]) - mesh = tm.load_mesh(f) - - if output == 'trimesh': - return mesh - - attrs = {'name': fname, 'origin': f} - attrs.update(kwargs) - if output == 'volume': - return core.Volume(mesh.vertices, mesh.faces, **attrs) - else: - return core.MeshNeuron(mesh, **attrs) - except BaseException as e: - msg = f'Error reading file {fname}.' - if errors == 'raise': - raise ImportError(msg) from e - elif errors == 'log': - logger.error(f'{msg}: {e}') - return - + utils.eval_param( + output, name="output", allowed_values=("neuron", "volume", "trimesh") + ) -def _worker_wrapper(kwargs): - """Helper for importing meshes using multiple processes.""" - return read_mesh(**kwargs) + reader = MeshReader(fmt=fmt, output=output, errors=errors, attrs=kwargs) + return reader.read_any(f, include_subdirs, parallel, limit=limit) -def write_mesh(x: Union['core.NeuronList', 'core.MeshNeuron', 'core.Volume', 'tm.Trimesh'], - filepath: Optional[str] = None, - filetype: str = None, - ) -> None: +def write_mesh( + x: Union["core.NeuronList", "core.MeshNeuron", "core.Volume", "tm.Trimesh"], + filepath: Optional[str] = None, + filetype: str = None, +) -> None: """Export meshes (MeshNeurons, Volumes, Trimeshes) to disk. Under the hood this is using trimesh to export meshes. @@ -264,41 +278,44 @@ def write_mesh(x: Union['core.NeuronList', 'core.MeshNeuron', 'core.Volume', 'tm >>> navis.write_mesh(nl, tmp_dir / 'meshes.zip', filetype='obj') """ - ALLOWED_FILETYPES = ('stl', 'ply', 'obj') if filetype is not None: - utils.eval_param(filetype, name='filetype', allowed_values=ALLOWED_FILETYPES) + utils.eval_param(filetype, name="filetype", allowed_values=MESH_WRITE_EXT) else: # See if we can get filetype from filepath if filepath is not None: - for f in ALLOWED_FILETYPES: - if str(filepath).endswith(f'.{f}'): + for f in MESH_WRITE_EXT: + if str(filepath).endswith(f".{f}"): filetype = f break if not filetype: - raise ValueError('Must provide mesh type either explicitly via ' - '`filetype` variable or implicitly via the ' - 'file extension in `filepath`') + raise ValueError( + "Must provide mesh type either explicitly via " + "`filetype` variable or implicitly via the " + "file extension in `filepath`" + ) - writer = base.Writer(_write_mesh, ext=f'.{filetype}') + writer = base.Writer(_write_mesh, ext=f".{filetype}") - return writer.write_any(x, - filepath=filepath) + return writer.write_any(x, filepath=filepath) -def _write_mesh(x: Union['core.MeshNeuron', 'core.Volume', 'tm.Trimesh'], - filepath: Optional[str] = None) -> None: +def _write_mesh( + x: Union["core.MeshNeuron", "core.Volume", "tm.Trimesh"], + filepath: Optional[str] = None, +) -> None: """Write single mesh to disk.""" if filepath and os.path.isdir(filepath): if isinstance(x, core.MeshNeuron): if not x.id: - raise ValueError('Neuron(s) must have an ID when destination ' - 'is a folder') - filepath = os.path.join(filepath, f'{x.id}') + raise ValueError( + "Neuron(s) must have an ID when destination " "is a folder" + ) + filepath = os.path.join(filepath, f"{x.id}") elif isinstance(x, core.Volume): - filepath = os.path.join(filepath, f'{x.name}') + filepath = os.path.join(filepath, f"{x.name}") else: - raise ValueError(f'Unable to generate filename for {type(x)}') + raise ValueError(f"Unable to generate filename for {type(x)}") if isinstance(x, core.MeshNeuron): mesh = x.trimesh