diff --git a/CHANGELOG.md b/CHANGELOG.md index 7109a88..6c7c6a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,32 @@ +# toasty 0.12.0 (2021-11-01) + +- Both toasty's AstroPix/Djangoplicity pipeline and the `wwt_data_formats` + module had WCS handling bugs that canceled each other out. The data-formats + bug was fixed in release 0.10.2 of that package, which caused the Toasty bug + to become apparent. Fix that (reported by @astrodavid10, #65; fixed by @pkgw, + #66). +- Fixed and features needed to process the SELENE Kaguya TC dataset (@pkgw, + #63). Unfortunately these changes are lacking corresponding documentation: + - Add a U8 image mode. + - Add APIs to filter out out subtrees when sampling TOAST pyramids. + - Add proper support for the planetary TOAST coordinate system, which is + rotated 180 degrees in longitude from the celestial one. + - Add support for JPEG2000 images. + - Add support for chunked TOAST tiling. + - Add a chunked plate-carree TOAST sampler. + - Fix out-of-date data when updating PIL-based images. + - Improve multiprocessing implementations to avoid race conditions on exit and + operate more robustly in multi-node (HPC) contexts. + - Add the ability for `toasty transform` (and underlying APIs) to emit the + transformed data into a separate pyramid; i.e. create a tree of only JPG + files from a tree of NPY files. + - Add `toasty transform u8-to-rgb` + - Don't create every directory when removing lockfiles +- Fix FITS file update on Windows (#67, @imbasimba) +- Improve FITS heuristics to ignore binary tables and other HDUs without a + defined shape (#62, @imbasimba). + + # toasty 0.11.0 (2021-09-17) - Fix up `toasty tile-study` to handle FITS files properly (@pkgw, #61). The diff --git a/README.md b/README.md index 9ea1e85..bc8e6ae 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ and [PyPI](https://pypi.org/project/toasty/#history). - [pytest] to run the test suite - [PyYAML] - [tqdm] -- [wwt_data_formats] >= 0.7 +- [wwt_data_formats] >= 0.10.2 [Astronomy Visualization Metadata]: https://virtualastronomy.org/avm_metadata.php [astropy]: https://www.astropy.org/ diff --git a/ci/azure-build-and-test.yml b/ci/azure-build-and-test.yml index 04e57a8..11357f2 100644 --- a/ci/azure-build-and-test.yml +++ b/ci/azure-build-and-test.yml @@ -97,6 +97,7 @@ jobs: \conda install -y \ cython \ filelock \ + glymur \ healpy \ numpy \ openexr-python \ diff --git a/docs/api/toasty.image.ImageMode.rst b/docs/api/toasty.image.ImageMode.rst index e8e8550..2289fb1 100644 --- a/docs/api/toasty.image.ImageMode.rst +++ b/docs/api/toasty.image.ImageMode.rst @@ -15,6 +15,7 @@ ImageMode ~ImageMode.F64 ~ImageMode.RGB ~ImageMode.RGBA + ~ImageMode.U8 .. rubric:: Methods Summary @@ -31,6 +32,7 @@ ImageMode .. autoattribute:: F64 .. autoattribute:: RGB .. autoattribute:: RGBA + .. autoattribute:: U8 .. rubric:: Methods Documentation diff --git a/docs/api/toasty.samplers.ChunkedPlateCarreeSampler.rst b/docs/api/toasty.samplers.ChunkedPlateCarreeSampler.rst new file mode 100644 index 0000000..8b850c4 --- /dev/null +++ b/docs/api/toasty.samplers.ChunkedPlateCarreeSampler.rst @@ -0,0 +1,29 @@ +ChunkedPlateCarreeSampler +========================= + +.. currentmodule:: toasty.samplers + +.. autoclass:: ChunkedPlateCarreeSampler + :show-inheritance: + + .. rubric:: Attributes Summary + + .. autosummary:: + + ~ChunkedPlateCarreeSampler.n_chunks + + .. rubric:: Methods Summary + + .. autosummary:: + + ~ChunkedPlateCarreeSampler.filter + ~ChunkedPlateCarreeSampler.sampler + + .. rubric:: Attributes Documentation + + .. autoattribute:: n_chunks + + .. rubric:: Methods Documentation + + .. automethod:: filter + .. automethod:: sampler diff --git a/docs/api/toasty.toast.ToastCoordinateSystem.rst b/docs/api/toasty.toast.ToastCoordinateSystem.rst new file mode 100644 index 0000000..4b7d23f --- /dev/null +++ b/docs/api/toasty.toast.ToastCoordinateSystem.rst @@ -0,0 +1,19 @@ +ToastCoordinateSystem +===================== + +.. currentmodule:: toasty.toast + +.. autoclass:: ToastCoordinateSystem + :show-inheritance: + + .. rubric:: Attributes Summary + + .. autosummary:: + + ~ToastCoordinateSystem.ASTRONOMICAL + ~ToastCoordinateSystem.PLANETARY + + .. rubric:: Attributes Documentation + + .. autoattribute:: ASTRONOMICAL + .. autoattribute:: PLANETARY diff --git a/docs/api/toasty.toast.count_tiles_matching_filter.rst b/docs/api/toasty.toast.count_tiles_matching_filter.rst new file mode 100644 index 0000000..3e1aac0 --- /dev/null +++ b/docs/api/toasty.toast.count_tiles_matching_filter.rst @@ -0,0 +1,6 @@ +count_tiles_matching_filter +=========================== + +.. currentmodule:: toasty.toast + +.. autofunction:: count_tiles_matching_filter diff --git a/docs/api/toasty.toast.create_single_tile.rst b/docs/api/toasty.toast.create_single_tile.rst new file mode 100644 index 0000000..3e82567 --- /dev/null +++ b/docs/api/toasty.toast.create_single_tile.rst @@ -0,0 +1,6 @@ +create_single_tile +================== + +.. currentmodule:: toasty.toast + +.. autofunction:: create_single_tile diff --git a/docs/api/toasty.toast.generate_tiles_filtered.rst b/docs/api/toasty.toast.generate_tiles_filtered.rst new file mode 100644 index 0000000..0bada75 --- /dev/null +++ b/docs/api/toasty.toast.generate_tiles_filtered.rst @@ -0,0 +1,6 @@ +generate_tiles_filtered +======================= + +.. currentmodule:: toasty.toast + +.. autofunction:: generate_tiles_filtered diff --git a/docs/api/toasty.toast.sample_layer_filtered.rst b/docs/api/toasty.toast.sample_layer_filtered.rst new file mode 100644 index 0000000..c3ce16f --- /dev/null +++ b/docs/api/toasty.toast.sample_layer_filtered.rst @@ -0,0 +1,6 @@ +sample_layer_filtered +===================== + +.. currentmodule:: toasty.toast + +.. autofunction:: sample_layer_filtered diff --git a/docs/api/toasty.toast.toast_tile_get_coords.rst b/docs/api/toasty.toast.toast_tile_get_coords.rst new file mode 100644 index 0000000..c47aeff --- /dev/null +++ b/docs/api/toasty.toast.toast_tile_get_coords.rst @@ -0,0 +1,6 @@ +toast_tile_get_coords +===================== + +.. currentmodule:: toasty.toast + +.. autofunction:: toast_tile_get_coords diff --git a/setup.py b/setup.py index 7dd44df..b99e3dc 100644 --- a/setup.py +++ b/setup.py @@ -9,107 +9,101 @@ import os from setuptools import setup, Extension + def get_long_desc(): in_preamble = True lines = [] - with open('README.md', 'rt', encoding='utf8') as f: + with open("README.md", "rt", encoding="utf8") as f: for line in f: if in_preamble: - if line.startswith(''): + if line.startswith(""): in_preamble = False else: - if line.startswith(''): + if line.startswith(""): break else: lines.append(line) - lines.append(''' + lines.append( + """ For more information, including installation instructions, please visit [the project homepage]. [the project homepage]: https://toasty.readthedocs.io/ -''') - return ''.join(lines) +""" + ) + return "".join(lines) setup_args = dict( - name = 'toasty', # cranko project-name - version = '0.11.0', # cranko project-version - description = 'Generate TOAST image tile pyramids from existing image data', - long_description = get_long_desc(), - long_description_content_type = 'text/markdown', - url = 'https://toasty.readthedocs.io/', - license = 'MIT', - platforms = 'Linux, Mac OS X', - - author = 'Chris Beaumont, AAS WorldWide Telescope Team', - author_email = 'wwt@aas.org', - - classifiers = [ - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Topic :: Scientific/Engineering :: Astronomy', - 'Topic :: Scientific/Engineering :: Visualization', + name="toasty", # cranko project-name + version="0.12.0", # cranko project-version + description="Generate TOAST image tile pyramids from existing image data", + long_description=get_long_desc(), + long_description_content_type="text/markdown", + url="https://toasty.readthedocs.io/", + license="MIT", + platforms="Linux, Mac OS X", + author="Chris Beaumont, AAS WorldWide Telescope Team", + author_email="wwt@aas.org", + classifiers=[ + "Intended Audience :: Science/Research", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Topic :: Scientific/Engineering :: Astronomy", + "Topic :: Scientific/Engineering :: Visualization", ], - - packages = [ - 'toasty', - 'toasty.pipeline', - 'toasty.tests', + packages=[ + "toasty", + "toasty.pipeline", + "toasty.tests", ], - include_package_data = True, - - entry_points = { - 'console_scripts': [ - 'toasty=toasty.cli:entrypoint', + include_package_data=True, + entry_points={ + "console_scripts": [ + "toasty=toasty.cli:entrypoint", ], }, - - install_requires = [ - 'filelock>=3', - 'numpy>=1.7', - 'pillow>=7.0', - 'PyYAML>=5.0', - 'tqdm>=4.0', - 'wwt_data_formats>=0.10.0', + install_requires=[ + "filelock>=3", + "numpy>=1.7", + "pillow>=7.0", + "PyYAML>=5.0", + "tqdm>=4.0", + "wwt_data_formats>=0.10.2", ], - - extras_require = { - 'test': [ - 'coveralls', - 'pytest-cov', + extras_require={ + "test": [ + "coveralls", + "pytest-cov", ], - 'docs': [ - 'astropy', - 'astropy-sphinx-theme', - 'numpydoc', - 'sphinx', - 'sphinx-automodapi', + "docs": [ + "astropy", + "astropy-sphinx-theme", + "numpydoc", + "sphinx", + "sphinx-automodapi", ], }, - - cmdclass = { - 'build_ext': build_ext, + cmdclass={ + "build_ext": build_ext, }, - - ext_modules = [ - Extension('toasty._libtoasty', ['toasty/_libtoasty.pyx']), + ext_modules=[ + Extension("toasty._libtoasty", ["toasty/_libtoasty.pyx"]), ], - - include_dirs = [ + include_dirs=[ np.get_include(), - ] + ], ) -for e in setup_args['ext_modules']: - e.cython_directives = {'language_level': '3'} +for e in setup_args["ext_modules"]: + e.cython_directives = {"language_level": "3"} -if __name__ == '__main__': +if __name__ == "__main__": setup(**setup_args) diff --git a/toasty/builder.py b/toasty/builder.py index 7e477a8..e6371e1 100644 --- a/toasty/builder.py +++ b/toasty/builder.py @@ -85,8 +85,8 @@ def prepare_study_tiling(self, image): tiling : `toasty.study.StudyTiling` The prepared tiling information - Remarks - ------- + Notes + ----- After calling this method, you should set up the WCS for the tiled imagery, using :meth:`default_tiled_study_astrometry` as a backstop if no real information is available. Then use :meth:`execute_study_tiling` diff --git a/toasty/cli.py b/toasty/cli.py index 754f6c1..b4b759b 100644 --- a/toasty/cli.py +++ b/toasty/cli.py @@ -539,7 +539,7 @@ def transform_getparser(parser): '--start', metavar = 'DEPTH', type = int, - help = 'The depth of the pyramid layer to start the cascade', + help = 'How deep in the tile pyramid to proceed wth the transformation', ) parser.add_argument( '--clip', '-c', @@ -548,10 +548,39 @@ def transform_getparser(parser): default = 1.0, help = 'The level at which to start flipping the floating-point data', ) + parser.add_argument( + '--outdir', + metavar = 'PATH', + help = 'Output transformed data into this directory, instead of operating in-place', + ) parser.add_argument( 'pyramid_dir', metavar = 'DIR', - help = 'The directory containing the tile pyramid to cascade', + help = 'The directory containing the tile pyramid to transform', + ) + + parser = subparsers.add_parser('u8-to-rgb') + parser.add_argument( + '--parallelism', '-j', + metavar = 'COUNT', + type = int, + help = 'The parallelization level (default: use all CPUs; specify `1` to force serial processing)', + ) + parser.add_argument( + '--start', + metavar = 'DEPTH', + type = int, + help = 'How deep in the tile pyramid to proceed wth the transformation', + ) + parser.add_argument( + '--outdir', + metavar = 'PATH', + help = 'Output transformed data into this directory, instead of operating in-place', + ) + parser.add_argument( + 'pyramid_dir', + metavar = 'DIR', + help = 'The directory containing the tile pyramid to transform', ) @@ -563,11 +592,31 @@ def transform_impl(settings): return if settings.transform_command == 'fx3-to-rgb': - from .transform import f16x3_to_rgb pio = PyramidIO(settings.pyramid_dir) + pio_out = None + + if settings.outdir is not None: + pio_out = PyramidIO(settings.outdir) + + from .transform import f16x3_to_rgb f16x3_to_rgb( pio, settings.start, clip = settings.clip, + pio_out = pio_out, + parallel = settings.parallelism, + cli_progress = True, + ) + elif settings.transform_command == 'u8-to-rgb': + pio = PyramidIO(settings.pyramid_dir) + pio_out = None + + if settings.outdir is not None: + pio_out = PyramidIO(settings.outdir) + + from .transform import u8_to_rgb + u8_to_rgb( + pio, settings.start, + pio_out = pio_out, parallel = settings.parallelism, cli_progress = True, ) diff --git a/toasty/collection.py b/toasty/collection.py index 557f708..38edacd 100644 --- a/toasty/collection.py +++ b/toasty/collection.py @@ -71,9 +71,12 @@ def _load(self, actually_load_data): hdu = hdul[self._hdu_index] else: for hdu in hdul: - if len(hdu.shape) > 1: + if (hasattr(hdu, 'shape') and len(hdu.shape) > 1 + and type(hdu) is not fits.hdu.table.BinTableHDU): break + if type(hdu) is fits.hdu.table.BinTableHDU: + raise Exception(f'cannot process input `{fits_path}`: Did not find any HDU with image data') wcs = WCS(hdu.header) shape = hdu.shape diff --git a/toasty/image.py b/toasty/image.py index 9f2c274..fe82b5b 100644 --- a/toasty/image.py +++ b/toasty/image.py @@ -26,6 +26,7 @@ import numpy as np import warnings import sys +import os try: from astropy.io import fits @@ -81,11 +82,13 @@ def get_format_vertical_parity_sign(format): return -1 def _array_to_mode(array): - if array.ndim == 2 and array.dtype.kind == 'f': - if array.dtype.itemsize == 4: + if array.ndim == 2: + if array.dtype.kind == 'f' and array.dtype.itemsize == 4: return ImageMode.F32 - elif array.dtype.itemsize == 8: + elif array.dtype.kind == 'f' and array.dtype.itemsize == 8: return ImageMode.F64 + elif array.dtype.kind == 'u' and array.dtype.itemsize == 1: + return ImageMode.U8 elif array.ndim == 3: if array.shape[2] == 3: if array.dtype.kind == 'f' and array.itemsize == 2: @@ -128,13 +131,17 @@ class ImageMode(Enum): stored in the OpenEXR file format. """ + U8 = 'U8' + @classmethod def from_array_info(cls, shape, dtype): - if len(shape) == 2 and dtype.kind == 'f': - if dtype.itemsize == 4: + if len(shape) == 2: + if dtype.kind == 'f' and dtype.itemsize == 4: return cls.F32 - elif dtype.itemsize == 8: + elif dtype.kind == 'f' and dtype.itemsize == 8: return cls.F64 + elif dtype.kind == 'u' and dtype.itemsize == 1: + return cls.U8 elif len(shape) == 3: if shape[2] == 3: if dtype.kind == 'f' and dtype.itemsize == 2: @@ -181,6 +188,8 @@ def make_maskable_buffer(self, buf_height, buf_width): arr = np.empty((buf_height, buf_width), dtype=np.float64) elif self == ImageMode.F16x3: arr = np.empty((buf_height, buf_width, 3), dtype=np.float16) + elif self == ImageMode.U8: + arr = np.empty((buf_height, buf_width), dtype=np.uint8) else: raise Exception('unhandled mode in make_maskable_buffer()') @@ -591,15 +600,16 @@ def load_path(self, path): # TODO: decide how to handle multiple HDUs # TODO: decide how to handle non-TAN projections - hdu = fits.open(path)[0] - arr = hdu.data - if ASTROPY_INSTALLED: - from astropy.wcs import WCS - wcs = WCS(hdu.header) - else: - wcs = None + with fits.open(path) as hdul: + arr = hdul[0].data + if ASTROPY_INSTALLED: + from astropy.wcs import WCS + wcs = WCS(hdul[0].header) + else: + wcs = None - return Image.from_array(arr, wcs=wcs, default_format='fits') + img = Image.from_array(arr, wcs=wcs, default_format='fits') + return img # Special handling for Photoshop files, used for some very large mosaics # with transparency (e.g. the PHAT M31/M33 images). @@ -724,6 +734,13 @@ def from_array(cls, array, wcs=None, default_format=None): _validate_format('default_format', default_format) + # Windows systems ('nt') cannot close a file while there are any variables pointing + # to data within the opened file. Therefore we have to copy the entire array from + # the opened file. In other, more permissive operating systems, pointing to the + # file data is ok. + if os.name == 'nt': + array = np.copy(array) + array = np.atleast_2d(array) inst = cls() @@ -805,7 +822,7 @@ def default_format(self): if self._default_format is None: if self.mode in (ImageMode.RGB, ImageMode.RGBA): return 'png' - elif self.mode in (ImageMode.F32, ImageMode.F16x3): + elif self.mode in (ImageMode.F32, ImageMode.F64, ImageMode.F16x3, ImageMode.U8): return 'npy' else: return self._default_format @@ -949,14 +966,18 @@ def fill_into_maskable_buffer(self, buffer, iy_idx, ix_idx, by_idx, bx_idx): i = self.asarray() b = buffer.asarray() + # Ensure that we don't try to use the PIL representation of the buffer anymore, + # since it will be out-of-date. + buffer._pil = None + if self.mode == ImageMode.RGB: b.fill(0) b[by_idx,bx_idx,:3] = i[iy_idx,ix_idx] b[by_idx,bx_idx,3] = 255 - elif self.mode == ImageMode.RGBA: + elif self.mode in (ImageMode.RGBA, ImageMode.U8): b.fill(0) b[by_idx,bx_idx] = i[iy_idx,ix_idx] - elif self.mode in (ImageMode.F32, ImageMode.F16x3): + elif self.mode in (ImageMode.F32, ImageMode.F64, ImageMode.F16x3): b.fill(np.nan) b[by_idx,bx_idx] = i[iy_idx,ix_idx] else: @@ -989,6 +1010,10 @@ def update_into_maskable_buffer(self, buffer, iy_idx, ix_idx, by_idx, bx_idx): i = self.asarray() b = buffer.asarray() + # Ensure that we don't try to use the PIL representation of the buffer anymore, + # since it will be out-of-date. + buffer._pil = None + sub_b = b[by_idx,bx_idx] sub_i = i[iy_idx,ix_idx] @@ -999,15 +1024,22 @@ def update_into_maskable_buffer(self, buffer, iy_idx, ix_idx, by_idx, bx_idx): valid = (sub_i[...,3] != 0) valid = np.broadcast_to(valid[...,None], sub_i.shape) np.putmask(sub_b, valid, sub_i) - elif self.mode == ImageMode.F32: + elif self.mode in (ImageMode.F32, ImageMode.F64): valid = ~np.isnan(sub_i) np.putmask(sub_b, valid, sub_i) elif self.mode == ImageMode.F16x3: valid = ~np.any(np.isnan(sub_i), axis=2) valid = np.broadcast_to(valid[...,None], sub_i.shape) np.putmask(sub_b, valid, sub_i) + elif self.mode == ImageMode.U8: + # zero is our maskval, so here's a convenient way to get pretty good + # update semantics. It will behave unusually if two buffers overlap + # and disagree on their non-zero pixel values: instead of the second + # buffer "winning", we'll effectively get a mix-and-match of which + # buffer "wins", biased towards the brighter values. + np.maximum(sub_b, sub_i, out=sub_b) else: - raise Exception('unhandled mode in update_into_maskable_buffer') + raise Exception(f'unhandled mode `{self.mode}` in update_into_maskable_buffer') def save(self, path_or_stream, format=None, mode=None): """ @@ -1116,7 +1148,7 @@ def clear(self): with NaNs. """ - if self._mode in (ImageMode.RGB, ImageMode.RGBA): + if self._mode in (ImageMode.RGB, ImageMode.RGBA, ImageMode.U8): self.asarray().fill(0) elif self._mode in (ImageMode.F32, ImageMode.F64, ImageMode.F16x3): self.asarray().fill(np.nan) diff --git a/toasty/jpeg2000.py b/toasty/jpeg2000.py new file mode 100644 index 0000000..405567f --- /dev/null +++ b/toasty/jpeg2000.py @@ -0,0 +1,141 @@ +# -*- mode: python; coding: utf-8 -*- +# Copyright 2021 the AAS WorldWide Telescope project +# Licensed under the MIT License. + +""" +Special handling for JPEG2000 files. + +This is very primitive support. Implemented for: + +https://planetarymaps.usgs.gov/mosaic/Lunar_Kaguya_TC_Ortho_Global_4096ppd_v02.jp2.html + +JPEG2000's can be tiled. We refer to these tiles as "chunks" to try to avoid +confusion with Toasty's tiling process (although perhaps this just makes things +worse). +""" + +from __future__ import absolute_import, division, print_function + +__all__ = ''' +ChunkedJPEG2000Reader +HAS_JPEG2000 +'''.split() + +try: + import glymur + HAS_JPEG2000 = True +except ImportError: + HAS_JPEG2000 = False + + +class ChunkedJPEG2000Reader(object): + """ + A simple reader for tiled JPEG2000 files. + + Parameters + ---------- + path : path-like + The path of the JP2 input file + fake_data : optional bool + The default is false. If true, no data are read from the file. Instead + an array of 255's is returned. This can be useful for testing, where in + my large test case (the Kaguya JP2) it can take 15 minutes to read a + single tile from the file. + """ + + def __init__(self, path, fake_data=False): + if not HAS_JPEG2000: + raise Exception('you must install the `glymur` Python package to process JPEG2000 files') + + self._jp2 = glymur.Jp2k(path) + self._tile_shape = None + self._fake_data = fake_data + + for seg in self._jp2.codestream.segment: + if seg.marker_id == 'SIZ': + self._tile_shape = (seg.ytsiz, seg.xtsiz) + + @property + def shape(self): + """ + A tuple of either ``(height, width)`` or ``(height, width, n_components)``. + """ + return self._jp2.shape + + @property + def n_chunks(self): + """ + The number of chunks (tiles) inside the JPEG2000 file. + """ + th, tw = self._tile_shape + return ((self._jp2.shape[0] + th - 1) // th) * ((self._jp2.shape[1] + tw - 1) // tw) + + def chunk_spec(self, ichunk): + """ + Get information about an image chunk that can be loaded. + + Parameters + ---------- + ichunk : :class:`int` + The index of the chunk to query, between 0 and ``self.n_chunks - 1`` + (inclusive). + + Returns + ------- + Tuple of ``(x, y, width, height)``, where the coordinates refer to + the top-left corner of the chunk relative to the overall image + (0-based). + """ + if ichunk < 0 or ichunk >= self.n_chunks: + raise ValueError(f'got ichunk={ichunk!r}; should be between 0 and {self.n_chunks-1}') + + th, tw = self._tile_shape + gh, gw = self._jp2.shape[:2] + chunks_per_row = (gw + tw - 1) // tw + icol = ichunk // chunks_per_row + irow = ichunk % chunks_per_row + + x0 = tw * irow + x1 = min(tw * (irow + 1), gw) + chunk_width = x1 - x0 + + y0 = th * icol + y1 = min(th * (icol + 1), gh) + chunk_height = y1 - y0 + + return x0, y0, chunk_width, chunk_height + + def chunk_data(self, ichunk): + """ + Load an image chunk. + + Parameters + ---------- + ichunk : :class:`int` + The index of the chunk to query, between 0 and ``self.n_chunks - 1`` + (inclusive). + + Returns + ------- + A numpy array of image data whose overall shape depends on the image + format (see :attr:`shape`), but whose width and height will agree with + the chunk specification. + + Notes + ----- + This can be extremely slow. With the large chunks used by the Kaguya JPEG2000 + file, it can take ~15 minutes to load a typical chunk. + """ + # The only way that the glymur package makes it possible to read + # explicitly tile-by-tile is through a deprecated API, but if we use the + # newer indexing API, it seems that we get the same effect (including + # performance) with the following approach. + x0, y0, w, h = self.chunk_spec(ichunk) + + if self._fake_data: + import numpy as np + a = np.empty((h, w), dtype=np.uint8) + a.fill(255) + return a + + return self._jp2[y0:y0+h,x0:x0+w] diff --git a/toasty/merge.py b/toasty/merge.py index 9ae5a50..97ebc9f 100644 --- a/toasty/merge.py +++ b/toasty/merge.py @@ -175,7 +175,8 @@ def _cascade_images_parallel(pio, start, merger, cli_progress, parallel): """ import multiprocessing as mp - from .pyramid import Pos + from queue import Empty + from .pyramid import Pos, pos_parent # The dispatcher process keeps track of finished tiles (reported in # `done_queue`) and notifiers worker when new tiles are ready to process @@ -185,25 +186,7 @@ def _cascade_images_parallel(pio, start, merger, cli_progress, parallel): n_todo = pyramid.depth2tiles(first_level_to_do) ready_queue = mp.Queue() done_queue = mp.Queue(maxsize = 2 * parallel) - - dispatcher = mp.Process( - target=_mp_cascade_dispatcher, - args=(done_queue, ready_queue, n_todo, cli_progress) - ) - dispatcher.start() - - # The workers pick up tiles that are ready to process and do the merging. - - workers = [] - - for _ in range(parallel): - w = mp.Process( - target=_mp_cascade_worker, - args=(done_queue, ready_queue, pio, merger), - ) - w.daemon = True - w.start() - workers.append(w) + done_event = mp.Event() # Seed the queue of ready tiles. We use generate_pos to try to seed the # queue in an order that will get us to generate higher-level tiles as early @@ -215,21 +198,20 @@ def _cascade_images_parallel(pio, start, merger, cli_progress, parallel): if pos.n == first_level_to_do: ready_queue.put(pos) - # Wait for the magic to happen. - - dispatcher.join() + # The workers pick up tiles that are ready to process and do the merging. - for w in workers: - w.join() + workers = [] + for _ in range(parallel): + w = mp.Process( + target=_mp_cascade_worker, + args=(done_queue, ready_queue, done_event, pio, merger), + ) + w.daemon = True + w.start() + workers.append(w) -def _mp_cascade_dispatcher(done_queue, ready_queue, n_todo, cli_progress): - """ - Monitor for tiles that are finished processing and enqueue ones that - become ready for processing (= all of their children are processed). - """ - from queue import Empty - from .pyramid import pos_parent + # Start dispatching tiles readiness = {} @@ -264,13 +246,20 @@ def _mp_cascade_dispatcher(done_queue, ready_queue, n_todo, cli_progress): else: readiness[ppos] = flags + # All done! + ready_queue.close() + ready_queue.join_thread() + done_event.set() + + for w in workers: + w.join() if cli_progress: print() -def _mp_cascade_worker(done_queue, ready_queue, pio, merger): +def _mp_cascade_worker(done_queue, ready_queue, done_event, pio, merger): """ Process tiles that are ready. """ @@ -287,10 +276,10 @@ def _mp_cascade_worker(done_queue, ready_queue, pio, merger): while True: try: pos = ready_queue.get(True, timeout=1) - except (OSError, ValueError, Empty): - # OSError or ValueError => queue closed. This signal seems not to - # cross multiprocess lines, though. - break + except Empty: + if done_event.is_set(): + break + continue # By construction, the children of this tile have all already been # processed. diff --git a/toasty/multi_tan.py b/toasty/multi_tan.py index ce1f241..828550d 100644 --- a/toasty/multi_tan.py +++ b/toasty/multi_tan.py @@ -6,9 +6,9 @@ Generate tiles from a collection of images on a common TAN projection. """ -__all__ = ''' +__all__ = """ MultiTanProcessor -'''.split() +""".split() from astropy.wcs import WCS import numpy as np @@ -18,16 +18,20 @@ from .study import StudyTiling MATCH_HEADERS = [ - 'CTYPE1', 'CTYPE2', - 'CRVAL1', 'CRVAL2', - 'CDELT1', 'CDELT2', + "CTYPE1", + "CTYPE2", + "CRVAL1", + "CRVAL2", + "CDELT1", + "CDELT2", ] # In my sample set, some of these files vary minutely from one header to the # next, so we save them but don't demand exact matching. We should use # np.isclose() or whatever it is. SAVE_HEADERS = [ - 'PC1_1', 'PC2_2', + "PC1_1", + "PC2_2", ] @@ -62,7 +66,6 @@ class MultiTanProcessor(object): def __init__(self, collection): self._collection = collection - def compute_global_pixelization(self, builder): """ Read the input images to determine the global pixelation of this data @@ -96,21 +99,33 @@ def compute_global_pixelization(self, builder): for h in SAVE_HEADERS: ref_headers[h] = header[h] - if ref_headers['CTYPE1'] != 'RA---TAN': - raise Exception('all inputs must be in a TAN projection, but {} is not'.format(input_id)) - if ref_headers['CTYPE2'] != 'DEC--TAN': - raise Exception('all inputs must be in a TAN projection, but {} is not'.format(input_id)) + if ref_headers["CTYPE1"] != "RA---TAN": + raise Exception( + "all inputs must be in a TAN projection, but {} is not".format( + desc.collection_id + ) + ) + if ref_headers["CTYPE2"] != "DEC--TAN": + raise Exception( + "all inputs must be in a TAN projection, but {} is not".format( + desc.collection_id + ) + ) else: for h in MATCH_HEADERS: expected = ref_headers[h] observed = header[h] if observed != expected: - raise Exception('inputs are not on uniform WCS grid; in file {}, expected ' - 'value {} for header {} but observed {}'.format(input_id, expected, h, observed)) + raise Exception( + "inputs are not on uniform WCS grid; in file {}, expected " + "value {} for header {} but observed {}".format( + desc.collection_id, expected, h, observed + ) + ) - this_crpix1 = header['CRPIX1'] - 1 - this_crpix2 = header['CRPIX2'] - 1 + this_crpix1 = header["CRPIX1"] - 1 + this_crpix2 = header["CRPIX2"] - 1 mtdesc = MultiTanDescriptor() mtdesc.ident = desc.collection_id @@ -140,8 +155,8 @@ def compute_global_pixelization(self, builder): height = int(global_crymax - global_crymin) + 1 self._tiling = StudyTiling(width, height) - ref_headers['CRPIX1'] = this_crpix1 + 1 + (mtdesc.crxmin - global_crxmin) - ref_headers['CRPIX2'] = this_crpix2 + 1 + (mtdesc.crymin - global_crymin) + ref_headers["CRPIX1"] = this_crpix1 + 1 + (mtdesc.crxmin - global_crxmin) + ref_headers["CRPIX2"] = this_crpix2 + 1 + (mtdesc.crymin - global_crymin) wcs = WCS(ref_headers) self._tiling.apply_to_imageset(builder.imgset) @@ -163,7 +178,9 @@ def compute_global_pixelization(self, builder): # tiles we'll need to process. if desc.imax < desc.imin or desc.jmax < desc.jmin: - raise Exception(f'segment {desc.ident} maps to zero size in the global mosaic') + raise Exception( + f"segment {desc.ident} maps to zero size in the global mosaic" + ) desc.sub_tiling = self._tiling.compute_for_subimage( desc.imin, @@ -176,7 +193,6 @@ def compute_global_pixelization(self, builder): return self # chaining convenience - def tile(self, pio, parallel=None, cli_progress=False, **kwargs): """ Tile the input images into the deepest layer of the pyramid. @@ -196,6 +212,7 @@ def tile(self, pio, parallel=None, cli_progress=False, **kwargs): """ from .par_util import resolve_parallelism + parallel = resolve_parallelism(parallel) if parallel > 1: @@ -207,7 +224,6 @@ def tile(self, pio, parallel=None, cli_progress=False, **kwargs): # that were generated. pio.clean_lockfiles(self._tiling._tile_levels) - def _tile_serial(self, pio, cli_progress, **kwargs): tile_parity_sign = pio.get_default_vertical_parity_sign() @@ -217,7 +233,15 @@ def _tile_serial(self, pio, cli_progress, **kwargs): if image.get_parity_sign() != tile_parity_sign: image.flip_parity() - for pos, width, height, image_x, image_y, tile_x, tile_y in desc.sub_tiling.generate_populated_positions(): + for ( + pos, + width, + height, + image_x, + image_y, + tile_x, + tile_y, + ) in desc.sub_tiling.generate_populated_positions(): # Tiling coordinate systems are always negative (top-down) # parity, with the Y=0 tile at the top. But the actual data # buffers might be positive (bottoms-up) parity -- this is @@ -235,25 +259,31 @@ def _tile_serial(self, pio, cli_progress, **kwargs): iy_idx = slice(image_y, image_y + height) by_idx = slice(tile_y, tile_y + height) - with pio.update_image(pos, masked_mode=image.mode, default='masked') as basis: - image.update_into_maskable_buffer(basis, iy_idx, ix_idx, by_idx, bx_idx) + with pio.update_image( + pos, masked_mode=image.mode, default="masked" + ) as basis: + image.update_into_maskable_buffer( + basis, iy_idx, ix_idx, by_idx, bx_idx + ) progress.update(1) if cli_progress: print() - def _tile_parallel(self, pio, cli_progress, parallel, **kwargs): import multiprocessing as mp # Start up the workers - queue = mp.Queue(maxsize = 2 * parallel) + done_event = mp.Event() + queue = mp.Queue(maxsize=2 * parallel) workers = [] for _ in range(parallel): - w = mp.Process(target=_mp_tile_worker, args=(queue, pio, kwargs)) + w = mp.Process( + target=_mp_tile_worker, args=(queue, done_event, pio, kwargs) + ) w.daemon = True w.start() workers.append(w) @@ -265,16 +295,20 @@ def _tile_parallel(self, pio, cli_progress, parallel, **kwargs): queue.put((image, desc)) progress.update(1) - queue.close() + # Finish up + + queue.close() + queue.join_thread() + done_event.set() - for w in workers: - w.join() + for w in workers: + w.join() if cli_progress: print() -def _mp_tile_worker(queue, pio, kwargs): +def _mp_tile_worker(queue, done_event, pio, _kwargs): """ Generate and enqueue the tiles that need to be processed. """ @@ -286,17 +320,25 @@ def _mp_tile_worker(queue, pio, kwargs): try: # un-pickling WCS objects always triggers warnings right now with warnings.catch_warnings(): - warnings.simplefilter('ignore') + warnings.simplefilter("ignore") image, desc = queue.get(True, timeout=1) - except (OSError, ValueError, Empty): - # OSError or ValueError => queue closed. This signal seems not to - # cross multiprocess lines, though. - break + except Empty: + if done_event.is_set(): + break + continue if image.get_parity_sign() != tile_parity_sign: image.flip_parity() - for pos, width, height, image_x, image_y, tile_x, tile_y in desc.sub_tiling.generate_populated_positions(): + for ( + pos, + width, + height, + image_x, + image_y, + tile_x, + tile_y, + ) in desc.sub_tiling.generate_populated_positions(): if tile_parity_sign == 1: image_y = image.height - (image_y + height) tile_y = 256 - (tile_y + height) @@ -306,5 +348,7 @@ def _mp_tile_worker(queue, pio, kwargs): iy_idx = slice(image_y, image_y + height) by_idx = slice(tile_y, tile_y + height) - with pio.update_image(pos, masked_mode=image.mode, default='masked') as basis: + with pio.update_image( + pos, masked_mode=image.mode, default="masked" + ) as basis: image.update_into_maskable_buffer(basis, iy_idx, ix_idx, by_idx, bx_idx) diff --git a/toasty/multi_wcs.py b/toasty/multi_wcs.py index 85fd1b9..5ab8da6 100644 --- a/toasty/multi_wcs.py +++ b/toasty/multi_wcs.py @@ -241,11 +241,12 @@ def _tile_parallel(self, pio, reproject_function, cli_progress, parallel, **kwar # Start up the workers + done_event = mp.Event() queue = mp.Queue(maxsize = 2 * parallel) workers = [] for _ in range(parallel): - w = mp.Process(target=_mp_tile_worker, args=(queue, pio, reproject_function, kwargs)) + w = mp.Process(target=_mp_tile_worker, args=(queue, done_event, pio, reproject_function, kwargs)) w.daemon = True w.start() workers.append(w) @@ -257,16 +258,20 @@ def _tile_parallel(self, pio, reproject_function, cli_progress, parallel, **kwar queue.put((image, desc, self._combined_wcs)) progress.update(1) - queue.close() + # Wrap up - for w in workers: - w.join() + queue.close() + queue.join_thread() + done_event.set() + + for w in workers: + w.join() if cli_progress: print() -def _mp_tile_worker(queue, pio, reproject_function, kwargs): +def _mp_tile_worker(queue, done_event, pio, reproject_function, kwargs): """ Generate and enqueue the tiles that need to be processed. """ @@ -280,10 +285,10 @@ def _mp_tile_worker(queue, pio, reproject_function, kwargs): with warnings.catch_warnings(): warnings.simplefilter('ignore') image, desc, combined_wcs = queue.get(True, timeout=10) - except (OSError, ValueError, Empty) as e: - # OSError or ValueError => queue closed. This signal seems not to - # cross multiprocess lines, though. - break + except Empty: + if done_event.is_set(): + break + continue input_array = image.asarray() diff --git a/toasty/pipeline/astropix.py b/toasty/pipeline/astropix.py index 2663f9f..bc1880e 100644 --- a/toasty/pipeline/astropix.py +++ b/toasty/pipeline/astropix.py @@ -15,10 +15,10 @@ """ -__all__ = ''' +__all__ = """ AstroPixImageSource AstroPixCandidateInput -'''.split() +""".split() import codecs from datetime import datetime @@ -34,7 +34,7 @@ EXTENSION_REMAPPING = { - 'jpeg': 'jpg', + "jpeg": "jpg", } @@ -47,15 +47,14 @@ class AstroPixImageSource(ImageSource): @classmethod def get_config_key(cls): - return 'astropix' + return "astropix" @classmethod def deserialize(cls, data): inst = cls() - inst._json_query_url = data['json_query_url'] + inst._json_query_url = data["json_query_url"] return inst - def query_candidates(self): with requests.get(self._json_query_url, stream=True) as resp: feed_data = json.load(resp.raw) @@ -63,16 +62,15 @@ def query_candidates(self): for item in feed_data: yield AstroPixCandidateInput(item) - def fetch_candidate(self, unique_id, cand_data_stream, cachedir): - with codecs.getreader('utf8')(cand_data_stream) as text_stream: + with codecs.getreader("utf8")(cand_data_stream) as text_stream: info = json.load(text_stream) - lower_id = info['image_id'].lower() - global_id = info['publisher_id'] + '_' + lower_id + lower_id = info["image_id"].lower() + global_id = info["publisher_id"] + "_" + lower_id - if info['resource_url'] and len(info['resource_url']): - source_url = info['resource_url'] + if info["resource_url"] and len(info["resource_url"]): + source_url = info["resource_url"] else: # Original image not findable. Get the best version available from # AstroPix. @@ -101,38 +99,40 @@ def fetch_candidate(self, unique_id, cand_data_stream, cachedir): ##else: ## best_astropix_size = 320 - source_url = 'http://astropix.ipac.caltech.edu/archive/%s/%s/%s_original.jpg' % ( - urlquote(info['publisher_id']), - urlquote(lower_id), - urlquote(global_id), + source_url = ( + "http://astropix.ipac.caltech.edu/archive/%s/%s/%s_original.jpg" + % ( + urlquote(info["publisher_id"]), + urlquote(lower_id), + urlquote(global_id), + ) ) # Now ready to download the image. - ext = source_url.rsplit('.', 1)[-1].lower() + ext = source_url.rsplit(".", 1)[-1].lower() ext = EXTENSION_REMAPPING.get(ext, ext) with requests.get(source_url, stream=True) as resp: if not resp.ok: - raise Exception(f'error downloading {source_url}: {resp.status_code}') + raise Exception(f"error downloading {source_url}: {resp.status_code}") - with open(os.path.join(cachedir, 'image.' + ext), 'wb') as f: + with open(os.path.join(cachedir, "image." + ext), "wb") as f: shutil.copyfileobj(resp.raw, f) - def process(self, unique_id, cand_data_stream, cachedir, builder): # Set up the metadata. - with codecs.getreader('utf8')(cand_data_stream) as text_stream: + with codecs.getreader("utf8")(cand_data_stream) as text_stream: info = json.load(text_stream) - if info['resource_url'] and len(info['resource_url']): - ext = info['resource_url'].rsplit('.', 1)[-1].lower() + if info["resource_url"] and len(info["resource_url"]): + ext = info["resource_url"].rsplit(".", 1)[-1].lower() ext = EXTENSION_REMAPPING.get(ext, ext) else: - ext = 'jpg' + ext = "jpg" - img_path = os.path.join(cachedir, 'image.' + ext) + img_path = os.path.join(cachedir, "image." + ext) md = AstroPixMetadata(info) # Load up the image. @@ -148,10 +148,10 @@ def process(self, unique_id, cand_data_stream, cachedir, builder): md.as_wcs_headers(img.width, img.height), img.width, img.height, - place = builder.place, + place=builder.place, ) - builder.set_name(info['title']) + builder.set_name(info["title"]) builder.imgset.credits_url = md.get_credit_url() builder.cascade() @@ -164,39 +164,42 @@ class AstroPixCandidateInput(CandidateInput): def __init__(self, json_dict): self._json = json_dict - self._lower_id = self._json['image_id'].lower() - self._global_id = self._json['publisher_id'] + '_' + self._lower_id + self._lower_id = self._json["image_id"].lower() + self._global_id = self._json["publisher_id"] + "_" + self._lower_id def get_unique_id(self): - return self._global_id.replace('/', '_') + return self._global_id.replace("/", "_") def save(self, stream): # First check that this input is usable. The NRAO feed contains an # item like this, and based on my investigations they are just not # usable right now because the server APIs don't work. So: skip any # like this. - if '/' in self._json['image_id']: - raise NotActionableError('AstroPix images with "/" in their IDs aren\'t retrievable') + if "/" in self._json["image_id"]: + raise NotActionableError( + 'AstroPix images with "/" in their IDs aren\'t retrievable' + ) # TODO? A few NRAO images have SIN projection. Try to recover them? - if self._json['wcs_projection'] != 'TAN': - raise NotActionableError('cannot ingest images in non-TAN projections') + if self._json["wcs_projection"] != "TAN": + raise NotActionableError("cannot ingest images in non-TAN projections") - with codecs.getwriter('utf8')(stream) as text_stream: + with codecs.getwriter("utf8")(stream) as text_stream: json.dump(self._json, text_stream, ensure_ascii=False, indent=2) ASTROPIX_FLOAT_ARRAY_KEYS = [ - 'wcs_reference_dimension', # NB: should be ints, but sometimes expressed with decimal points - 'wcs_reference_pixel', - 'wcs_reference_value', - 'wcs_scale', + "wcs_reference_dimension", # NB: should be ints, but sometimes expressed with decimal points + "wcs_reference_pixel", + "wcs_reference_value", + "wcs_scale", ] ASTROPIX_FLOAT_SCALAR_KEYS = [ - 'wcs_rotation', + "wcs_rotation", ] + class AstroPixMetadata(object): """ Metadata derived from AstroPix query results. @@ -210,7 +213,9 @@ class AstroPixMetadata(object): wcs_projection = None # ex: 'TAN' wcs_reference_dimension = None # ex: [7416.0, 4320.0] wcs_reference_value = None # ex: [187, 12.3] - wcs_reference_pixel = None # ex: [1000.4, 1000.7]; from examples, this seems to be 1-based + wcs_reference_pixel = ( + None # ex: [1000.4, 1000.7]; from examples, this seems to be 1-based + ) wcs_rotation = None # ex: -0.07 (deg, presumably) wcs_scale = None # ex: [-6e-7, 6e-7] @@ -228,26 +233,31 @@ def __init__(self, json_dict): for k, v in json_dict.items(): setattr(self, k, v) - def as_wcs_headers(self, width, height): + """ + The metadata here are essentially AVM headers. As described in + `Builder.apply_avm_info()`, the data that we've seen in the wild are a + bit wonky with regards to parity: the metadata essentially correspond to + FITS-like parity, and we need to flip them to JPEG-like parity. See also + very similar code in `djangoplicity.py`. + """ headers = {} - #headers['RADECSYS'] = self.wcs_coordinate_frame # causes Astropy warnings - headers['CTYPE1'] = 'RA---' + self.wcs_projection - headers['CTYPE2'] = 'DEC--' + self.wcs_projection - headers['CRVAL1'] = self.wcs_reference_value[0] - headers['CRVAL2'] = self.wcs_reference_value[1] + # headers['RADECSYS'] = self.wcs_coordinate_frame # causes Astropy warnings + headers["CTYPE1"] = "RA---" + self.wcs_projection + headers["CTYPE2"] = "DEC--" + self.wcs_projection + headers["CRVAL1"] = self.wcs_reference_value[0] + headers["CRVAL2"] = self.wcs_reference_value[1] # See Calabretta & Greisen (2002; DOI:10.1051/0004-6361:20021327), eqn 186 - crot = np.cos(self.wcs_rotation * np.pi / 180) srot = np.sin(self.wcs_rotation * np.pi / 180) lam = self.wcs_scale[1] / self.wcs_scale[0] - headers['PC1_1'] = crot - headers['PC1_2'] = -lam * srot - headers['PC2_1'] = srot / lam - headers['PC2_2'] = crot + pc1_1 = crot + pc1_2 = -lam * srot + pc2_1 = srot / lam + pc2_2 = crot # If we couldn't get the original image, the pixel density used for # the WCS parameters may not match the image resolution that we have @@ -263,22 +273,26 @@ def as_wcs_headers(self, width, height): factor0 = width / self.wcs_reference_dimension[0] factor1 = height / self.wcs_reference_dimension[1] - headers['CRPIX1'] = (self.wcs_reference_pixel[0] - 0.5) * factor0 + 0.5 - headers['CRPIX2'] = (self.wcs_reference_pixel[1] - 0.5) * factor1 + 0.5 - headers['CDELT1'] = self.wcs_scale[0] / factor0 + headers["CRPIX1"] = (self.wcs_reference_pixel[0] - 0.5) * factor0 + 0.5 + headers["CRPIX2"] = (self.wcs_reference_pixel[1] - 0.5) * factor1 + 0.5 - # We need the negation here to properly express the parity of this - # JPEG-type image in WCS: - headers['CDELT2'] = -self.wcs_scale[1] / factor1 + # Now finalize and apply the parity flip - return headers + cdelt1 = self.wcs_scale[0] / factor0 + cdelt2 = self.wcs_scale[1] / factor1 + headers["CD1_1"] = cdelt1 * pc1_1 + headers["CD1_2"] = -cdelt1 * pc1_2 + headers["CD2_1"] = cdelt2 * pc2_1 + headers["CD2_2"] = -cdelt2 * pc2_2 + headers["CRPIX2"] = height + 1 - headers["CRPIX2"] + return headers def get_credit_url(self): if self.reference_url: return self.reference_url - return 'http://astropix.ipac.caltech.edu/image/%s/%s' % ( + return "http://astropix.ipac.caltech.edu/image/%s/%s" % ( urlquote(self.publisher_id), urlquote(self.image_id), ) diff --git a/toasty/pipeline/djangoplicity.py b/toasty/pipeline/djangoplicity.py index 82da5bd..e56fdbb 100644 --- a/toasty/pipeline/djangoplicity.py +++ b/toasty/pipeline/djangoplicity.py @@ -6,10 +6,10 @@ Support for loading images from a Djangoplicity database. """ -__all__ = ''' +__all__ = """ DjangoplicityImageSource DjangoplicityCandidateInput -'''.split() +""".split() import codecs from contextlib import contextmanager @@ -40,29 +40,32 @@ class DjangoplicityImageSource(ImageSource): @classmethod def get_config_key(cls): - return 'djangoplicity' - + return "djangoplicity" @classmethod def deserialize(cls, data): inst = cls() - inst._base_url = data['base_url'] - inst._channel_name = data['channel_name'] - inst._search_page_name = data.get('search_page_name', 'page') - inst._force_insecure_tls = data.get('force_insecure_tls', True) # TODO: migrate to false + inst._base_url = data["base_url"] + inst._channel_name = data["channel_name"] + inst._search_page_name = data.get("search_page_name", "page") + inst._force_insecure_tls = data.get( + "force_insecure_tls", True + ) # TODO: migrate to false return inst @contextmanager def make_request(self, url, stream=False, none404=False): """force_insecure_tls is for noirlab.edu""" - with requests.get(url, stream=stream, verify=not self._force_insecure_tls) as resp: + with requests.get( + url, stream=stream, verify=not self._force_insecure_tls + ) as resp: if none404 and resp.status_code == 404: yield None return if not resp.ok: - raise Exception(f'error fetching url `{url}`: {resp.status_code}') + raise Exception(f"error fetching url `{url}`: {resp.status_code}") if stream: # By default, `resp.raw` does not perform content decoding. @@ -82,14 +85,17 @@ def query_candidates(self): page_num = 1 while True: - url = self._base_url + f'archive/search/{self._search_page_name}/{page_num}/?type=Observation' - print(f'requesting {url} ...') + url = ( + self._base_url + + f"archive/search/{self._search_page_name}/{page_num}/?type=Observation" + ) + print(f"requesting {url} ...") with self.make_request(url, stream=True, none404=True) as resp: if resp is None: break # got a 404 -- all done - text_stream = codecs.getreader('utf8')(resp.raw) + text_stream = codecs.getreader("utf8")(resp.raw) json_lines = [] # Cf. stream=True in make_request -- skip the zero-length result @@ -100,29 +106,30 @@ def query_candidates(self): for line in text_stream: if not len(json_lines): - if 'var images = [' in line: - json_lines.append('[') - elif '];' in line: - json_lines.append(']') + if "var images = [" in line: + json_lines.append("[") + elif "];" in line: + json_lines.append("]") break else: json_lines.append(line) if not len(json_lines): - raise Exception(f'error processing url {url}: no "var images" data found') + raise Exception( + f'error processing url {url}: no "var images" data found' + ) # This is really a JS literal, but YAML is compatible enough. # JSON does *not* work because the dict keys here aren't quoted. - data = yaml.safe_load(''.join(json_lines)) + data = yaml.safe_load("".join(json_lines)) for item in data: yield DjangoplicityCandidateInput(item) page_num += 1 - def fetch_candidate(self, unique_id, cand_data_stream, cachedir): - url = self._base_url + urlquote(unique_id) + '/api/json/' + url = self._base_url + urlquote(unique_id) + "/api/json/" with self.make_request(url) as resp: info = json.loads(resp.content) @@ -131,39 +138,40 @@ def fetch_candidate(self, unique_id, cand_data_stream, cachedir): fullsize_url = None - for resource in info['Resources']: - if resource.get('ResourceType') == 'Original': - fullsize_url = resource['URL'] + for resource in info["Resources"]: + if resource.get("ResourceType") == "Original": + fullsize_url = resource["URL"] break if fullsize_url is None: - raise Exception(f'error processing {unique_id}: can\'t identify \"fullsize original\" image URL') + raise Exception( + f'error processing {unique_id}: can\'t identify "fullsize original" image URL' + ) - ext = fullsize_url.rsplit('.', 1)[-1].lower() - info['toasty_image_extension'] = ext + ext = fullsize_url.rsplit(".", 1)[-1].lower() + info["toasty_image_extension"] = ext # Validate that there's actually WCS we can use - if not isinstance(info.get('Spatial.CoordsystemProjection', None), str): - raise NotActionableError('image does not have full WCS') + if not isinstance(info.get("Spatial.CoordsystemProjection", None), str): + raise NotActionableError("image does not have full WCS") # Download it with self.make_request(fullsize_url, stream=True) as resp: - with open(os.path.join(cachedir, 'image.' + ext), 'wb') as f: + with open(os.path.join(cachedir, "image." + ext), "wb") as f: shutil.copyfileobj(resp.raw, f) - with open(os.path.join(cachedir, 'metadata.json'), 'wt', encoding='utf8') as f: + with open(os.path.join(cachedir, "metadata.json"), "wt", encoding="utf8") as f: json.dump(info, f, ensure_ascii=False, indent=2) - def process(self, unique_id, cand_data_stream, cachedir, builder): # Set up the metadata. - with open(os.path.join(cachedir, 'metadata.json'), 'rt', encoding='utf8') as f: + with open(os.path.join(cachedir, "metadata.json"), "rt", encoding="utf8") as f: info = json.load(f) - img_path = os.path.join(cachedir, 'image.' + info['toasty_image_extension']) + img_path = os.path.join(cachedir, "image." + info["toasty_image_extension"]) md = DjangoplicityMetadata(info) # Load up the image. @@ -179,24 +187,24 @@ def process(self, unique_id, cand_data_stream, cachedir, builder): md.as_wcs_headers(img.width, img.height), img.width, img.height, - place = builder.place, + place=builder.place, ) - builder.set_name(info['Title']) - builder.imgset.credits_url = info['ReferenceURL'] - builder.imgset.credits = html.escape(info['Credit']) - builder.place.description = html.escape(info['Description']) + builder.set_name(info["Title"]) + builder.imgset.credits_url = info["ReferenceURL"] + builder.imgset.credits = html.escape(info["Credit"]) + builder.place.description = html.escape(info["Description"]) # Annotation metadata - pub_dt = datetime.fromisoformat(info['Date']) + pub_dt = datetime.fromisoformat(info["Date"]) if pub_dt.tzinfo is None: pub_dt = pub_dt.replace(tzinfo=timezone.utc) amd = { - 'channel': self._channel_name, - 'itemid': unique_id, - 'publishedUTCISO8601': pub_dt.isoformat(), + "channel": self._channel_name, + "itemid": unique_id, + "publishedUTCISO8601": pub_dt.isoformat(), } builder.place.annotation = json.dumps(amd) @@ -214,10 +222,10 @@ def __init__(self, info): self._info = info def get_unique_id(self): - return self._info['id'] + return self._info["id"] def save(self, stream): - with codecs.getwriter('utf8')(stream) as text_stream: + with codecs.getwriter("utf8")(stream) as text_stream: json.dump(self._info, text_stream, ensure_ascii=False, indent=2) @@ -228,31 +236,38 @@ def __init__(self, metadata): self.metadata = metadata def as_wcs_headers(self, width, height): + """ + The metadata here are essentially AVM headers. As described in + `Builder.apply_avm_info()`, the data that we've seen in the wild are a + bit wonky with regards to parity: the metadata essentially correspond to + FITS-like parity, and we need to flip them to JPEG-like parity. See also + very similar code in `astropix.py`. + """ headers = {} - #headers['RADECSYS'] = self.wcs_coordinate_frame # causes Astropy warnings - headers['CTYPE1'] = 'RA---' + self.metadata['Spatial.CoordsystemProjection'] - headers['CTYPE2'] = 'DEC--' + self.metadata['Spatial.CoordsystemProjection'] - headers['CRVAL1'] = float(self.metadata['Spatial.ReferenceValue'][0]) - headers['CRVAL2'] = float(self.metadata['Spatial.ReferenceValue'][1]) + # headers['RADECSYS'] = self.wcs_coordinate_frame # causes Astropy warnings + headers["CTYPE1"] = "RA---" + self.metadata["Spatial.CoordsystemProjection"] + headers["CTYPE2"] = "DEC--" + self.metadata["Spatial.CoordsystemProjection"] + headers["CRVAL1"] = float(self.metadata["Spatial.ReferenceValue"][0]) + headers["CRVAL2"] = float(self.metadata["Spatial.ReferenceValue"][1]) # See Calabretta & Greisen (2002; DOI:10.1051/0004-6361:20021327), eqn 186 - crot = np.cos(float(self.metadata['Spatial.Rotation']) * np.pi / 180) - srot = np.sin(float(self.metadata['Spatial.Rotation']) * np.pi / 180) - scale0 = float(self.metadata['Spatial.Scale'][0]) + crot = np.cos(float(self.metadata["Spatial.Rotation"]) * np.pi / 180) + srot = np.sin(float(self.metadata["Spatial.Rotation"]) * np.pi / 180) + scale0 = float(self.metadata["Spatial.Scale"][0]) # Seen in noao-02274; guessing how to handle this - if not self.metadata['Spatial.Scale'][1]: + if not self.metadata["Spatial.Scale"][1]: scale1 = np.abs(scale0) else: - scale1 = float(self.metadata['Spatial.Scale'][1]) + scale1 = float(self.metadata["Spatial.Scale"][1]) lam = scale1 / scale0 - headers['PC1_1'] = crot - headers['PC1_2'] = -lam * srot - headers['PC2_1'] = srot / lam - headers['PC2_2'] = crot + pc1_1 = crot + pc1_2 = -lam * srot + pc2_1 = srot / lam + pc2_2 = crot # If we couldn't get the original image, the pixel density used for # the WCS parameters may not match the image resolution that we have @@ -265,15 +280,24 @@ def as_wcs_headers(self, width, height): # should map to [W' + 0.5, H' + 0.5] (where the primed quantities are # the new width and height). - factor0 = width / float(self.metadata['Spatial.ReferenceDimension'][0]) - factor1 = height / float(self.metadata['Spatial.ReferenceDimension'][1]) - - headers['CRPIX1'] = (float(self.metadata['Spatial.ReferencePixel'][0]) - 0.5) * factor0 + 0.5 - headers['CRPIX2'] = (float(self.metadata['Spatial.ReferencePixel'][1]) - 0.5) * factor1 + 0.5 - headers['CDELT1'] = scale0 / factor0 - - # We need the negation here to properly express the parity of this - # JPEG-type image in WCS: - headers['CDELT2'] = -scale1 / factor1 + factor0 = width / float(self.metadata["Spatial.ReferenceDimension"][0]) + factor1 = height / float(self.metadata["Spatial.ReferenceDimension"][1]) + + headers["CRPIX1"] = ( + float(self.metadata["Spatial.ReferencePixel"][0]) - 0.5 + ) * factor0 + 0.5 + headers["CRPIX2"] = ( + float(self.metadata["Spatial.ReferencePixel"][1]) - 0.5 + ) * factor1 + 0.5 + + # Now finalize and apply the parity flip. + + cdelt1 = scale0 / factor0 + cdelt2 = scale1 / factor1 + headers["CD1_1"] = cdelt1 * pc1_1 + headers["CD1_2"] = -cdelt1 * pc1_2 + headers["CD2_1"] = cdelt2 * pc2_1 + headers["CD2_2"] = -cdelt2 * pc2_2 + headers["CRPIX2"] = height + 1 - headers["CRPIX2"] return headers diff --git a/toasty/pyramid.py b/toasty/pyramid.py index ec4d41f..b910319 100644 --- a/toasty/pyramid.py +++ b/toasty/pyramid.py @@ -220,13 +220,15 @@ def __init__(self, base_dir, scheme='L/Y/YX', default_format=None): self._default_format = default_format - def tile_path(self, pos, format=None): + def tile_path(self, pos, format=None, makedirs=True): """Get the path for a tile, creating its containing directories. Parameters ---------- pos : Pos The tile to get a path for.test_plate_carree_ecliptic + makedirs : optional :class:`bool` + If True (the default), create containing directories. Returns ------- @@ -234,35 +236,25 @@ def tile_path(self, pos, format=None): Notes ----- - This function does I/O itself — it creates the parent directories - containing the tile path. It is not an error for the parent - directories to already exist. - + In its default mode this function does I/O itself — it creates the + parent directories containing the tile path. It is not an error for + the parent directories to already exist. """ + level = str(pos.n) ix = str(pos.x) iy = str(pos.y) - return self._tile_path(level, ix, iy, format=format) + return self._tile_path(level, ix, iy, format=format, makedirs=makedirs) - def _tile_path_LsYsYX(self, level, ix, iy, format=None): + def _tile_path_LsYsYX(self, level, ix, iy, format=None, makedirs=True): d = os.path.join(self._base_dir, level, iy) - - # We can't use the `exist_ok` kwarg because it's not available in Python 2. - try: - os.makedirs(d) - except OSError as e: - if e.errno != 17: - raise # not EEXIST - + if makedirs: + os.makedirs(d, exist_ok=True) return os.path.join(d, '{}_{}.{}'.format(iy, ix, format or self._default_format)) - def _tile_path_LXY(self, level, ix, iy, format=None): - # We can't use the `exist_ok` kwarg because it's not available in Python 2. - try: - os.makedirs(self._base_dir) - except OSError as e: - if e.errno != 17: - raise # not EEXIST + def _tile_path_LXY(self, level, ix, iy, format=None, makedirs=True): + if makedirs: + os.makedirs(self._base_dir, exist_ok=True) return os.path.join( self._base_dir, @@ -364,11 +356,15 @@ def write_image(self, pos, image, format=None, mode=None): @contextmanager def update_image(self, pos, default='none', masked_mode=None, format=None): - from filelock import FileLock + # Plain FileLock doesn't work in HPC contexts, where we might be running + # multiple processes on different hosts simultaneously. But it might be + # more efficient in the non-HPC context? Should maybe choose the right + # one automagically. + from filelock import SoftFileLock p = self.tile_path(pos) - with FileLock(p + '.lock'): + with SoftFileLock(p + '.lock'): img = self.read_image( pos, default=default, @@ -398,7 +394,7 @@ def clean_lockfiles(self, level): for x in range(0, 2**level): for y in range(0, 2**level): pos = Pos(level, x, y) - p = self.tile_path(pos) + '.lock' + p = self.tile_path(pos, makedirs=False) + '.lock' try: os.unlink(p) diff --git a/toasty/samplers.py b/toasty/samplers.py index 408d00a..388cc4a 100644 --- a/toasty/samplers.py +++ b/toasty/samplers.py @@ -22,6 +22,7 @@ from __future__ import absolute_import, division, print_function __all__ = ''' +ChunkedPlateCarreeSampler plate_carree_sampler plate_carree_galactic_sampler plate_carree_ecliptic_sampler @@ -323,3 +324,191 @@ def vec2pix(lon, lat): return data[iy, ix] return vec2pix + + +class ChunkedPlateCarreeSampler(object): + """ + Setup for TOAST sampling of a chunked plate-carree image. + + This only works with the ChunkedJPEG2000Reader image right now, but in + principle we could extend to accept other chunked formats. + + Assume a typical image coordinate system, where (x,y) = (0,0) is the + top-left edge of the image. The "global" coordinate system refers to the + un-chunked image, which is probably too big to fit into memory. + + In the planetary plate carree projection, (global) X and Y measure latitude + and longitude orthogonally. The left edge of the X=0 column has lon = -pi, + while the right edge of the X=(W-1) column has lon = +pi. The top edge of + the Y=0 row has lat = +pi/2, and the bottom edge of the Y=(H-1) row has lat + = -pi/2. + """ + + def __init__(self, chunked_image, planetary=False): + self._image = chunked_image + + assert planetary, 'XXX non-planetary plate carree not implemented' + + self.sx = 2 * np.pi / self._image.shape[1] # radians per pixel, X direction + self.sy = np.pi / self._image.shape[0] # ditto, for the X direction + + @property + def n_chunks(self): + return self._image.n_chunks + + def _chunk_bounds(self, ichunk): + cx, cy, cw, ch = self._image.chunk_spec(ichunk) + + # Note that the following are all intended to be coordinates at + # pixel edges, in the appropriate direction, not pixel centers. + lon_l = self.sx * cx - np.pi + lon_r = self.sx * (cx + cw) - np.pi + lat_u = 0.5 * np.pi - self.sy * cy + lat_d = 0.5 * np.pi - self.sy * (cy + ch) # note: lat_u > lat_d + + return lon_l, lon_r, lat_d, lat_u + + def filter(self, ichunk): + """ + Get a TOAST tile-filter function for the specified chunk. + + Parameters + ---------- + ichunk : :class:`int` + The index of the chunk to query, between 0 and ``self.n_chunks - 1`` + (inclusive). + + Returns + ------- + A callable object, ``filter(tile) -> bool``, suitable for use as a tile + filter function. + """ + chunk_lon_min1, chunk_lon_max1, chunk_lat_min1, chunk_lat_max1 = self._chunk_bounds(ichunk) + + def latlon_tile_filter(tile): + """ + Return true if this tile might contain data from this chunk of the + source image. + """ + # Need to copy the outer arguments since we're going to modify them. + chunk_lon_min = chunk_lon_min1 + chunk_lon_max = chunk_lon_max1 + chunk_lat_min = chunk_lat_min1 + chunk_lat_max = chunk_lat_max1 + + corner_lonlats = np.asarray(tile.corners) # shape (4, 2) + + # Latitudes are easy -- no wrapping. + + tile_lat_min = np.min(corner_lonlats[:,1]) + tile_lat_max = np.max(corner_lonlats[:,1]) + + if chunk_lat_min > tile_lat_max: + return False + if chunk_lat_max < tile_lat_min: + return False + + # Can't reject based on latitudes. Longitudes are trickier. + # + # Step 1: If we hit one of the poles, longitudes become + # ~meaningless, so all we can really do (in our simplistic approach + # here) is accept the tile. We compare to ~(pi - 1e-8) here to + # account for inevitable roundoff. Note that if we don't apply the + # lat-bounds filter first, every single chunk will accept tiles at + # the poles, even if it's right on the equator. + + if tile_lat_max > 1.5707963 or tile_lat_min < -1.5707963: + return True + + # Step 2: get good a coverage range for tile longitudes. Some tiles + # span a full pi in longitude, and sometimes the "corners" + # longitudes span all sorts of values from -2pi to 2pi. So just + # shuffle them around until we get a self-consistent min/max. + + lons = corner_lonlats[:,0] + keep_going = True + + while keep_going: + keep_going = False + + imin = np.argmin(lons) + tile_lon_min = lons[imin] + tile_lon_max = np.max(lons) + + if tile_lon_max - tile_lon_min > np.pi: + keep_going = True + lons[imin] += 2 * np.pi + + # Step 3: Now, shuffle the chunk longitudes by +/-2pi so that they + # line up with the tile longitude bounds. We know that we'll + # ultimately be comparing the chunk_lon_min to tile_lon_max, and + # chunk/max to tile/min, so we can treat those pairs individually. + + while chunk_lon_min - tile_lon_max > np.pi: + chunk_lon_min -= 2 * np.pi + + while tile_lon_max - chunk_lon_min > np.pi: + chunk_lon_min += 2 * np.pi + + while chunk_lon_max - tile_lon_min > np.pi: + chunk_lon_max -= 2 * np.pi + + while tile_lon_min - chunk_lon_max > np.pi: + chunk_lon_max += 2 * np.pi + + # Finally, we can reject this tile if it lies beyond the boundaries + # of the chunk we're considering. + + if chunk_lon_min > tile_lon_max: + return False + if chunk_lon_max < tile_lon_min: + return False + + # Well, we couldn't reject it, so we must accept: + return True + + return latlon_tile_filter + + def sampler(self, ichunk): + """ + Get a TOAST sampler function for the specified chunk. + + Parameters + ---------- + ichunk : :class:`int` + The index of the chunk to query, between 0 and ``self.n_chunks - 1`` + (inclusive). + + Returns + ------- + A callable object, ``sampler(lon, lat) -> data``, suitable for use as a tile + sampler function. + """ + from .image import Image + + chunk_lon_min, chunk_lon_max, chunk_lat_min, chunk_lat_max = self._chunk_bounds(ichunk) + data = self._image.chunk_data(ichunk) + data_img = Image.from_array(data) + buffer = data_img.mode.make_maskable_buffer(256, 256) + biy, bix = np.indices((256, 256)) + + ny, nx = data.shape[:2] + dx = nx / (chunk_lon_max - chunk_lon_min) # pixels per radian in the X direction + dy = ny / (chunk_lat_max - chunk_lat_min) # ditto, for the Y direction + lon0 = chunk_lon_min + 0.5 / dx # longitudes of the centers of the pixels with ix = 0 + lat0 = chunk_lat_max - 0.5 / dy # latitudes of the centers of the pixels with iy = 0 + + def plate_carree_planet_sampler(lon, lat): + lon = (lon + np.pi) % (2 * np.pi) - np.pi # ensure in range [-pi, pi] + ix = (lon - lon0) * dx + ix = np.round(ix).astype(int) + ok = (ix >= 0) & (ix < nx) + + iy = (lat0 - lat) * dy # *assume* in range [-pi/2, pi/2] + iy = np.round(iy).astype(int) + ok &= (iy >= 0) & (iy < ny) + + data_img.fill_into_maskable_buffer(buffer, iy[ok], ix[ok], biy[ok], bix[ok]) + return buffer.asarray() + + return plate_carree_planet_sampler diff --git a/toasty/tests/Equirectangular_projection_SW-tweaked.jp2 b/toasty/tests/Equirectangular_projection_SW-tweaked.jp2 new file mode 100644 index 0000000..3246a98 Binary files /dev/null and b/toasty/tests/Equirectangular_projection_SW-tweaked.jp2 differ diff --git a/toasty/tests/test_toast.py b/toasty/tests/test_toast.py index 8905222..7adcc67 100644 --- a/toasty/tests/test_toast.py +++ b/toasty/tests/test_toast.py @@ -1,11 +1,10 @@ # -*- mode: python; coding: utf-8 -*- -# Copyright 2013-2020 Chris Beaumont and the AAS WorldWide Telescope project +# Copyright 2013-2021 Chris Beaumont and the AAS WorldWide Telescope project # Licensed under the MIT License. from __future__ import absolute_import, division, print_function import os -from xml.dom.minidom import parseString from tempfile import mkstemp, mkdtemp from shutil import rmtree @@ -28,8 +27,9 @@ from . import test_path from .. import cli, toast from ..image import ImageLoader +from ..jpeg2000 import ChunkedJPEG2000Reader, HAS_JPEG2000 from ..pyramid import Pos, PyramidIO -from ..toast import sample_layer +from ..toast import sample_layer, sample_layer_filtered from ..transform import f16x3_to_rgb @@ -156,18 +156,42 @@ def setup_method(self, method): def teardown_method(self, method): rmtree(self.base) - def verify_level1(self, ref='earth_toasted_sky', format=None, expected_2d=False): + def verify_level1( + self, + ref='earth_toasted_sky', + format=None, + expected_2d=False, + drop_alpha=False, + planetary=False, + ): for n, x, y in [(1, 0, 0), (1, 0, 1), (1, 1, 0), (1, 1, 1)]: - ref_path = test_path(ref, str(n), str(y), "%i_%i.png" % (y, x)) + if planetary: + # to transform sky to planetary, we need to mirror the global + # image horizontally. So here we mirror on a tile level, and + # below we mirror on a pixel level. + ref_x = 1 - x + warn = f'; this is mirrored from input position ({n},{x},{y}) for planetary mode' + else: + ref_x = x + warn = '' + + ref_path = test_path(ref, str(n), str(y), "%i_%i.png" % (y, ref_x)) expected = ImageLoader().load_path(ref_path).asarray() + if planetary: + expected = expected[:,::-1] + pos = Pos(n=n, x=x, y=y) observed = self.pio.read_image(pos, format=format).asarray() if expected_2d: expected = expected.mean(axis=2) - image_test(expected, observed, 'Failed for %s' % ref_path) + if drop_alpha: + assert observed.shape[2] == 4 + observed = observed[...,:3] + + image_test(expected, observed, 'Failed for %s%s' % (ref_path, warn)) def test_plate_carree(self): from ..samplers import plate_carree_sampler @@ -185,15 +209,38 @@ def test_plate_carree_ecliptic(self): self.verify_level1(ref='tess') @pytest.mark.skipif('not HAS_OPENEXR') - def test_earth_plate_caree_exr(self): + def test_earth_plate_carree_exr(self): from ..samplers import plate_carree_sampler img = ImageLoader().load_path(test_path('Equirectangular_projection_SW-tweaked.exr')) sampler = plate_carree_sampler(img.asarray()) sample_layer(self.pio, sampler, 1, format='npy') - f16x3_to_rgb(self.pio, 1, parallel=1) + f16x3_to_rgb(self.pio, 1, parallel=1, out_format='png') self.verify_level1() + @pytest.mark.skipif('not HAS_JPEG2000') + def test_earth_plate_carree_jpeg2000_chunked_planetary(self): + from ..samplers import ChunkedPlateCarreeSampler + from ..toast import ToastCoordinateSystem + + img = ChunkedJPEG2000Reader(test_path('Equirectangular_projection_SW-tweaked.jp2')) + # this currently (2021 Oct) only supports planetary coordinates: + chunker = ChunkedPlateCarreeSampler(img, planetary=True) + + for ichunk in range(chunker.n_chunks): + tile_filter = chunker.filter(ichunk) + sampler = chunker.sampler(ichunk) + sample_layer_filtered( + self.pio, + tile_filter, + sampler, + 1, # depth + coordsys=ToastCoordinateSystem.PLANETARY, + parallel=1, + ) + + self.verify_level1(drop_alpha=True, planetary=True) + @pytest.mark.skipif('not HAS_ASTRO') def test_healpix_equ(self): from ..samplers import healpix_fits_file_sampler diff --git a/toasty/toast.py b/toasty/toast.py index 0eb1dc3..a9bedbb 100644 --- a/toasty/toast.py +++ b/toasty/toast.py @@ -1,57 +1,56 @@ # -*- mode: python; coding: utf-8 -*- -# Copyright 2013-2020 Chris Beaumont and the AAS WorldWide Telescope project +# Copyright 2013-2021 Chris Beaumont and the AAS WorldWide Telescope project # Licensed under the MIT License. """Computations for the TOAST projection scheme and tile pyramid format. -TODO this all needs to be ported to modern Toasty infrastructure and -wwt_data_formats. +For all TOAST maps, the north pole is in the dead center of the virtual image +square, the south pole is at all four of its corners, and the equator is a +diamond connecting the midpoints of the four sides of the square. (See Figure 3 +of McGlynn+ 2019, DOI:10.3847/1538-4365/aaf79e). + +For TOAST maps of the sky, the line of RA (lon) = 0 in the northern hemisphere +extends from the center of the square to the right, as in the Figure 3 mentioned +above. The RA = 90 line goes from the center up, and so on counter-clockwise +around the square. + +For TOAST planetary maps, the lon = 0 line in the northern hemisphere extends +from the center of the square to the *left*. The lon = 90 line extends +downwards, and increasing longitude results in counter-clockwise motion around +the square as for sky maps. In other words, the longitudinal orientation is +rotated by 180 degrees. """ from __future__ import absolute_import, division, print_function __all__ = ''' +count_tiles_matching_filter +create_single_tile generate_tiles +generate_tiles_filtered sample_layer +sample_layer_filtered Tile +ToastCoordinateSystem toast_pixel_for_point toast_tile_area toast_tile_for_point +toast_tile_get_coords '''.split() -from collections import defaultdict, namedtuple -import os -import logging +from collections import namedtuple +from enum import Enum import numpy as np from tqdm import tqdm from ._libtoasty import subsample, mid from .image import Image -from .pyramid import Pos, depth2tiles, is_subtile, pos_parent, tiles_at_depth +from .pyramid import Pos, tiles_at_depth HALFPI = 0.5 * np.pi THREEHALFPI = 1.5 * np.pi TWOPI = 2 * np.pi -Tile = namedtuple('Tile', 'pos corners increasing') - -_level1_lonlats = [ - [np.radians(c) for c in row] - for row in [ - [(0, -90), (90, 0), (0, 90), (180, 0)], - [(90, 0), (0, -90), (0, 0), (0, 90)], - [(0, 90), (0, 0), (0, -90), (270, 0)], - [(180, 0), (0, 90), (270, 0), (0, -90)], - ] -] - -LEVEL1_TILES = [ - Tile(Pos(n=1, x=0, y=0), _level1_lonlats[0], True), - Tile(Pos(n=1, x=1, y=0), _level1_lonlats[1], False), - Tile(Pos(n=1, x=1, y=1), _level1_lonlats[2], True), - Tile(Pos(n=1, x=0, y=1), _level1_lonlats[3], False), -] - def _arclength(lat1, lon1, lat2, lon2): """Compute the length of an arc along the great circle defined by spherical @@ -79,6 +78,47 @@ def _spherical_triangle_area(lat1, lon1, lat2, lon2, lat3, lon3): return e +class ToastCoordinateSystem(Enum): + """ + Different TOAST coordinate systems that are in use. + """ + + ASTRONOMICAL = 'astronomical' + """The default TOAST coordinate system, where the ``lat = lon = 0`` point + lies at the middle right edge of the TOAST projection square.""" + + PLANETARY = 'planetary' + """The planetary TOAST coordinate system. This is rotated 180 degrees in + longitude from the astronomical system, such that the ``lat = lon = 0`` + point lies at the middle left edge of the TOAST projection square.""" + + +Tile = namedtuple('Tile', 'pos corners increasing') + +_level1_astronomical_lonlats = np.radians([ + [(0, -90), (90, 0), (0, 90), (180, 0)], + [(90, 0), (0, -90), (0, 0), (0, 90)], + [(180, 0), (0, 90), (270, 0), (0, -90)], + [(0, 90), (0, 0), (0, -90), (270, 0)], +]) +_level1_astronomical_lonlats.flags.writeable = False + + +def _create_level1_tiles(coordsys): + lonlats = _level1_astronomical_lonlats + + if coordsys == ToastCoordinateSystem.PLANETARY: + lonlats = lonlats.copy() + lonlats[...,0] = (lonlats[...,0] + np.pi) % TWOPI + + return [ + Tile(Pos(n=1, x=0, y=0), lonlats[0], True), + Tile(Pos(n=1, x=1, y=0), lonlats[1], False), + Tile(Pos(n=1, x=0, y=1), lonlats[2], False), + Tile(Pos(n=1, x=1, y=1), lonlats[3], True), + ] + + def toast_tile_area(tile): """Calculate the area of a TOAST tile in steradians. @@ -195,7 +235,7 @@ def _toast_tile_containment_score(tile, lat, lon): return upper + right + lower + left -def toast_tile_for_point(depth, lat, lon): +def toast_tile_for_point(depth, lat, lon, coordsys=ToastCoordinateSystem.ASTRONOMICAL): """ Identify the TOAST tile at a given depth that contains the given point. @@ -209,8 +249,11 @@ def toast_tile_for_point(depth, lat, lon): The latitude (declination) of the point, in radians. lon : number The longitude (RA) of the point, in radians. This value must - have already been normalied to lie within the range [0, 2pi] + have already been normalized to lie within the range [0, 2pi] (inclusive on both ends.) + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. Returns ------- @@ -223,7 +266,7 @@ def toast_tile_for_point(depth, lat, lon): if depth == 0: return Tile(Pos(n=0, x=0, y=0), (None, None, None, None), False) - for tile in LEVEL1_TILES: + for tile in _create_level1_tiles(coordsys): if _toast_tile_containment_score(tile, lat, lon) == 0.: break @@ -251,7 +294,32 @@ def toast_tile_for_point(depth, lat, lon): return tile -def toast_pixel_for_point(depth, lat, lon): +def toast_tile_get_coords(tile): + """ + Get the coordinates of the pixel centers of a TOAST Tile. + + Parameters + ---------- + tile : :class:`Tile` + A TOAST tile + + Returns + ------- + A tuple ``(lons, lats)``, each of which is a 256x256 array of longitudes and + latitudes of the tile pixel centers, in radians. + """ + + return subsample( + tile.corners[0], + tile.corners[1], + tile.corners[2], + tile.corners[3], + 256, + tile.increasing, + ) + + +def toast_pixel_for_point(depth, lat, lon, coordsys=ToastCoordinateSystem.ASTRONOMICAL): """ Identify the pixel within a TOAST tile at a given depth that contains the given point. @@ -266,8 +334,11 @@ def toast_pixel_for_point(depth, lat, lon): The latitude (declination) of the point, in radians. lon : number The longitude (RA) of the point, in radians. This value must - have already been normalied to lie within the range [0, 2pi] + have already been normalized to lie within the range [0, 2pi] (inclusive on both ends.) + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. Returns ------- @@ -278,20 +349,12 @@ def toast_pixel_for_point(depth, lat, lon): coordinates of the pixels nearest the specified coordinates *lat* and *lon*. """ - tile = toast_tile_for_point(depth, lat, lon) + tile = toast_tile_for_point(depth, lat, lon, coordsys=coordsys) # Now that we have the tile, get its pixel locations and identify the pixel # that is closest to the input position. - lons, lats = subsample( - tile.corners[0], - tile.corners[1], - tile.corners[2], - tile.corners[3], - 256, - tile.increasing, - ) - + lons, lats = toast_tile_get_coords(tile) dist2 = (lons - lon)**2 + (lats - lat)**2 min_y, min_x = np.unravel_index(np.argmin(dist2), (256, 256)) @@ -339,7 +402,7 @@ def toast_pixel_for_point(depth, lat, lon): return tile, x0 + x, y0 + y -def _postfix_corner(tile, depth, bottom_only): +def _postfix_corner(tile, depth, filter, bottom_only): """ Yield subtiles of a given tile, in postfix (deepest-first) order. @@ -349,6 +412,9 @@ def _postfix_corner(tile, depth, bottom_only): Parameters of the current tile. depth : int The depth to descend to. + filter : function(Tile)->bool + A filter function; only tiles for which the function returns True will + be investigated. bottom_only : bool If True, only yield tiles at max_depth. @@ -357,8 +423,11 @@ def _postfix_corner(tile, depth, bottom_only): if n > depth: return + if n > 1 and not filter(tile): + return + for child in _div4(tile): - for item in _postfix_corner(child, depth, bottom_only): + for item in _postfix_corner(child, depth, filter, bottom_only): yield item if n == depth or not bottom_only: @@ -389,7 +458,48 @@ def _div4(tile): ] -def generate_tiles(depth, bottom_only=True): +def create_single_tile(pos, coordsys=ToastCoordinateSystem.ASTRONOMICAL): + """ + Create a single TOAST tile. + + Parameters + ---------- + pos : :class:`~toasty.pyramid.Pos` + The position of the tile that will be created. The depth of the + tile must be at least 1. + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. + + Returns + ------- + :class:`Tile` + + Notes + ----- + This function should only be used for one-off investigations and debugging. + It is much more efficient to use :func:`generate_tiles` for bulk computations. + """ + + if pos.n == 0: + raise ValueError('cannot create a Tile for the n=0 tile') + + children = _create_level1_tiles(coordsys) + cur_n = 0 + + while True: + cur_n += 1 + ix = (pos.x >> (pos.n - cur_n)) & 0x1 + iy = (pos.y >> (pos.n - cur_n)) & 0x1 + tile = children[iy * 2 + ix] + + if cur_n == pos.n: + return tile + + children = _div4(tile) + + +def generate_tiles(depth, bottom_only=True, coordsys=ToastCoordinateSystem.ASTRONOMICAL): """Generate a pyramid of TOAST tiles in deepest-first order. Parameters @@ -398,22 +508,95 @@ def generate_tiles(depth, bottom_only=True): The tile depth to recurse to. bottom_only : bool If True, then only the lowest tiles will be yielded. + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. + + Yields + ------ + tile : Tile + An individual tile to process. Tiles are yielded deepest-first. + + The ``n = 0`` depth is not included. + + """ + return generate_tiles_filtered(depth, lambda t: True, bottom_only, coordsys=coordsys) + + +def generate_tiles_filtered(depth, filter, bottom_only=True, coordsys=ToastCoordinateSystem.ASTRONOMICAL): + """Generate a pyramid of TOAST tiles in deepest-first order, filtering out subtrees. + + Parameters + ---------- + depth : int + The tile depth to recurse to. + filter : function(Tile)->bool + A filter function; only tiles for which the function returns True will + be investigated. + bottom_only : optional bool + If True, then only the lowest tiles will be yielded. + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. Yields ------ tile : Tile - An individual tile to process. Tiles are yield deepest-first. + An individual tile to process. Tiles are yielded deepest-first. The ``n = 0`` depth is not included. """ - for t in LEVEL1_TILES: - for item in _postfix_corner(t, depth, bottom_only): + for t in _create_level1_tiles(coordsys): + for item in _postfix_corner(t, depth, filter, bottom_only): yield item -def sample_layer(pio, sampler, depth, parallel=None, cli_progress=False, - format=None): +def count_tiles_matching_filter(depth, filter, bottom_only=True, coordsys=ToastCoordinateSystem.ASTRONOMICAL): + """ + Count the number of tiles matching a filter. + + Parameters + ---------- + depth : int + The tile depth to recurse to. + filter : function(Tile)->bool + A filter function; only tiles for which the function returns True will + be investigated. + bottom_only : bool + If True, then only the lowest tiles will be processed. + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. + + Returns + ------ + The number of tiles matching the filter. Even if ``bottom_only`` is false, + the ``n = 0`` tile is not counted. + + Notes + ----- + This function's call signature and tree-exploration semantics match + :func:`generate_tiles_filtered`. + """ + # With a generic filter function, brute force is our only option: + n = 0 + + for _tile in generate_tiles_filtered(depth, filter, bottom_only=bottom_only, coordsys=coordsys): + n += 1 + + return n + + +def sample_layer( + pio, + sampler, + depth, + coordsys=ToastCoordinateSystem.ASTRONOMICAL, + format=None, + parallel=None, + cli_progress=False, +): """Generate a layer of the TOAST tile pyramid through direct sampling. Parameters @@ -428,6 +611,12 @@ def sample_layer(pio, sampler, depth, parallel=None, cli_progress=False, number of tiles in each layer is ``4**depth``. Each tile is 256×256 TOAST pixels, so the resolution of the pixelization at which the data will be sampled is a refinement level of ``2**(depth + 8)``. + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. + format : optional :class:`str` + If provided, override the default data storage format of *pio* with the + named format, one of the values in ``toasty.image.SUPPORTED_FORMATS``. parallel : integer or None (the default) The level of parallelization to use. If unspecified, defaults to using all CPUs. If the OS does not support fork-based multiprocessing, @@ -435,28 +624,21 @@ def sample_layer(pio, sampler, depth, parallel=None, cli_progress=False, forced. Pass ``1`` to force serial processing. cli_progress : optional boolean, defaults False If true, a progress bar will be printed to the terminal using tqdm. - """ + from .par_util import resolve_parallelism parallel = resolve_parallelism(parallel) if parallel > 1: - _sample_layer_parallel(pio, format, sampler, depth, cli_progress, parallel) + _sample_layer_parallel(pio, format, sampler, depth, coordsys, cli_progress, parallel) else: - _sample_layer_serial(pio, format, sampler, depth, cli_progress) + _sample_layer_serial(pio, format, sampler, depth, coordsys, cli_progress) -def _sample_layer_serial(pio, format, sampler, depth, cli_progress): +def _sample_layer_serial(pio, format, sampler, depth, coordsys, cli_progress): with tqdm(total=tiles_at_depth(depth), disable=not cli_progress) as progress: - for tile in generate_tiles(depth, bottom_only=True): - lon, lat = subsample( - tile.corners[0], - tile.corners[1], - tile.corners[2], - tile.corners[3], - 256, - tile.increasing, - ) + for tile in generate_tiles(depth, bottom_only=True, coordsys=coordsys): + lon, lat = toast_tile_get_coords(tile) sampled_data = sampler(lon, lat) pio.write_image(tile.pos, Image.from_array(sampled_data), format=format) progress.update(1) @@ -465,63 +647,186 @@ def _sample_layer_serial(pio, format, sampler, depth, cli_progress): print() -def _sample_layer_parallel(pio, format, sampler, depth, cli_progress, parallel): +def _sample_layer_parallel(pio, format, sampler, depth, coordsys, cli_progress, parallel): import multiprocessing as mp + done_event = mp.Event() queue = mp.Queue(maxsize = 2 * parallel) - dispatcher = mp.Process(target=_mp_sample_dispatcher, args=(queue, depth, cli_progress)) - dispatcher.start() - workers = [] for _ in range(parallel): - w = mp.Process(target=_mp_sample_worker, args=(queue, pio, sampler, format)) + w = mp.Process(target=_mp_sample_worker, args=(queue, done_event, pio, sampler, format)) w.daemon = True w.start() workers.append(w) - dispatcher.join() + # Send out tiles: + + with tqdm(total=tiles_at_depth(depth), disable=not cli_progress) as progress: + for tile in generate_tiles(depth, bottom_only=True, coordsys=coordsys): + queue.put(tile) + progress.update(1) + + # OK, we're done! + + queue.close() + queue.join_thread() + done_event.set() for w in workers: w.join() + if cli_progress: + print() + -def _mp_sample_dispatcher(queue, depth, cli_progress): +def _mp_sample_worker(queue, done_event, pio, sampler, format): """ - Generate and enqueue the tiles that need to be processed. + Process tiles on the queue. """ - with tqdm(total=tiles_at_depth(depth), disable=not cli_progress) as progress: - for tile in generate_tiles(depth, bottom_only=True): - queue.put(tile) + from queue import Empty + + while True: + try: + tile = queue.get(True, timeout=1) + except Empty: + if done_event.is_set(): + break + continue + + lon, lat = toast_tile_get_coords(tile) + sampled_data = sampler(lon, lat) + pio.write_image(tile.pos, Image.from_array(sampled_data), format=format) + + +def sample_layer_filtered( + pio, + tile_filter, + sampler, + depth, + coordsys=ToastCoordinateSystem.ASTRONOMICAL, + parallel=None, + cli_progress=False, +): + """Populate a subset of a layer of the TOAST tile pyramid through direct sampling. + + Parameters + ---------- + pio : :class:`toasty.pyramid.PyramidIO` + A :class:`~toasty.pyramid.PyramidIO` instance to manage the I/O with + the tiles in the tile pyramid. + tile_filter : callable + A tile filtering function, suitable for passing to + :func:`toasty.toast.generate_tiles_filtered`. + sampler : callable + The sampler callable that will produce data for tiling. + depth : int + The depth of the layer of the TOAST tile pyramid to generate. The + number of tiles in each layer is ``4**depth``. Each tile is 256×256 + TOAST pixels, so the resolution of the pixelization at which the + data will be sampled is a refinement level of ``2**(depth + 8)``. + coordsys : optional :class:`ToastCoordinateSystem` + The TOAST coordinate system to use. Default is + :attr:`ToastCoordinateSystem.ASTRONOMICAL`. + parallel : integer or None (the default) + The level of parallelization to use. If unspecified, defaults to using + all CPUs. If the OS does not support fork-based multiprocessing, + parallel processing is not possible and serial processing will be + forced. Pass ``1`` to force serial processing. + cli_progress : optional boolean, defaults False + If true, a progress bar will be printed to the terminal using tqdm. + """ + + from .par_util import resolve_parallelism + parallel = resolve_parallelism(parallel) + + if parallel > 1: + _sample_filtered_parallel(pio, tile_filter, sampler, depth, coordsys, cli_progress, parallel) + else: + _sample_filtered_serial(pio, tile_filter, sampler, depth, coordsys, cli_progress) + + +def _sample_filtered_serial(pio, tile_filter, sampler, depth, coordsys, cli_progress): + n_todo = count_tiles_matching_filter(depth, tile_filter, bottom_only=True, coordsys=coordsys) + + with tqdm(total=n_todo, disable=not cli_progress) as progress: + for tile in generate_tiles_filtered(depth, tile_filter, bottom_only=True, coordsys=coordsys): + lon, lat = toast_tile_get_coords(tile) + sampled_data = sampler(lon, lat) + img = Image.from_array(sampled_data) + + with pio.update_image(tile.pos, masked_mode=img.mode, default='masked') as basis: + img.update_into_maskable_buffer(basis, slice(None), slice(None), slice(None), slice(None)) + progress.update(1) if cli_progress: print() + # do not clean lockfiles, for HPC contexts where we're processing different + # chunks in parallel. + + +def _sample_filtered_parallel(pio, tile_filter, sampler, depth, coordsys, cli_progress, parallel): + import multiprocessing as mp + + n_todo = count_tiles_matching_filter(depth, tile_filter, bottom_only=True, coordsys=coordsys) + + done_event = mp.Event() + queue = mp.Queue(maxsize = 2 * parallel) + workers = [] + + for _ in range(parallel): + w = mp.Process(target=_mp_sample_filtered, args=(queue, done_event, pio, sampler)) + w.daemon = True + w.start() + workers.append(w) + + # Here we go: + + with tqdm(total=n_todo, disable=not cli_progress) as progress: + for tile in generate_tiles_filtered(depth, tile_filter, bottom_only=True, coordsys=coordsys): + queue.put(tile) + progress.update(1) + + # OK, we're done! + queue.close() + queue.join_thread() + done_event.set() + + for w in workers: + w.join() + + if cli_progress: + print() + + # do not clean lockfiles, for HPC contexts where we're processing different + # chunks in parallel. -def _mp_sample_worker(queue, pio, sampler, format): +def _mp_sample_filtered(queue, done_event, pio, sampler): """ Process tiles on the queue. + + We use ``pio.update_image`` in case we are in an HPC-type context where + other chunks may be trying to write out the populate the same tiles as us at + the same time. """ + from queue import Empty while True: try: tile = queue.get(True, timeout=1) - except (OSError, ValueError, Empty): - # OSError or ValueError => queue closed. This signal seems not to - # cross multiprocess lines, though. - break + except Empty: + if done_event.is_set(): + break + continue - lon, lat = subsample( - tile.corners[0], - tile.corners[1], - tile.corners[2], - tile.corners[3], - 256, - tile.increasing, - ) + lon, lat = toast_tile_get_coords(tile) sampled_data = sampler(lon, lat) - pio.write_image(tile.pos, Image.from_array(sampled_data), format=format) + img = Image.from_array(sampled_data) + + with pio.update_image(tile.pos, masked_mode=img.mode, default='masked') as basis: + img.update_into_maskable_buffer(basis, slice(None), slice(None), slice(None), slice(None)) diff --git a/toasty/transform.py b/toasty/transform.py index e7df27a..8616629 100644 --- a/toasty/transform.py +++ b/toasty/transform.py @@ -1,5 +1,5 @@ # -*- mode: python; coding: utf-8 -*- -# Copyright 2020 the AAS WorldWide Telescope project +# Copyright 2020-2021 the AAS WorldWide Telescope project # Licensed under the MIT License. """ @@ -9,6 +9,7 @@ __all__ = ''' f16x3_to_rgb +u8_to_rgb '''.split() from astropy import visualization as viz @@ -19,43 +20,40 @@ from .pyramid import depth2tiles, generate_pos -def f16x3_to_rgb(pio, start_depth, clip=1, parallel=None, cli_progress=False): - transform = viz.SqrtStretch() + viz.ManualInterval(0, clip) - _float_to_rgb(pio, start_depth, ImageMode.F16x3, transform, parallel=parallel, cli_progress=cli_progress) - +# Generalized transform machinery, with both serial and parallel support -def _float_to_rgb(pio, depth, read_mode, transform, parallel=None, cli_progress=False): +def _do_a_transform(pio, depth, make_buf, do_one, pio_out=None, parallel=None, cli_progress=False): from .par_util import resolve_parallelism parallel = resolve_parallelism(parallel) + if pio_out is None: + pio_out = pio + if parallel > 1: - _float_to_rgb_parallel(pio, depth, read_mode, transform, cli_progress, parallel) + _transform_parallel(pio, pio_out, depth, make_buf, do_one, cli_progress, parallel) else: - _float_to_rgb_serial(pio, depth, read_mode, transform, cli_progress) + buf = make_buf() + with tqdm(total=depth2tiles(depth), disable=not cli_progress) as progress: + for pos in generate_pos(depth): + do_one(buf, pos, pio, pio_out) + progress.update(1) -def _float_to_rgb_serial(pio, depth, read_mode, transform, cli_progress): - buf = np.empty((256, 256, 4), dtype=np.uint8) + if cli_progress: + print() - with tqdm(total=depth2tiles(depth), disable=not cli_progress) as progress: - for pos in generate_pos(depth): - _float_to_rgb_do_one(buf, pos, pio, read_mode, transform) - progress.update(1) - if cli_progress: - print() - - -def _float_to_rgb_parallel(pio, depth, read_mode, transform, cli_progress, parallel): +def _transform_parallel(pio_in, pio_out, depth, make_buf, do_one, cli_progress, parallel): import multiprocessing as mp # Start up the workers + done_event = mp.Event() queue = mp.Queue(maxsize = 16 * parallel) workers = [] for _ in range(parallel): - w = mp.Process(target=_float_to_rgb_mp_worker, args=(queue, pio, read_mode, transform)) + w = mp.Process(target=_transform_mp_worker, args=(queue, done_event, pio_in, pio_out, make_buf, do_one)) w.daemon = True w.start() workers.append(w) @@ -67,40 +65,80 @@ def _float_to_rgb_parallel(pio, depth, read_mode, transform, cli_progress, paral queue.put(pos) progress.update(1) - queue.close() + # All done - for w in workers: - w.join() + queue.close() + queue.join_thread() + done_event.set() + + for w in workers: + w.join() if cli_progress: print() -def _float_to_rgb_mp_worker(queue, pio, read_mode, transform): +def _transform_mp_worker(queue, done_event, pio_in, pio_out, make_buf, do_one): """ Do the colormapping. """ from queue import Empty - buf = np.empty((256, 256, 4), dtype=np.uint8) + buf = make_buf() while True: try: pos = queue.get(True, timeout=1) - except (OSError, ValueError, Empty): - # OSError or ValueError => queue closed. This signal seems not to - # cross multiprocess lines, though. - break + except Empty: + if done_event.is_set(): + break + continue - _float_to_rgb_do_one(buf, pos, pio, read_mode, transform) + do_one(buf, pos, pio_in, pio_out) -def _float_to_rgb_do_one(buf, pos, pio, read_mode, transform): - """ - Do one float-to-RGB job. This problem is embarassingly parallel so we can - share code between the serial and parallel implementations. - """ - img = pio.read_image(pos, format='npy') +# float-to-RGB(A), with a generalized float-to-unit transform + +def f16x3_to_rgb(pio, start_depth, clip=1, **kwargs): + transform = viz.SqrtStretch() + viz.ManualInterval(0, clip) + _float_to_rgb(pio, start_depth, transform, **kwargs) + + +def _float_to_rgb(pio_in, depth, transform, out_format='png', **kwargs): + make_buf = lambda: np.empty((256, 256, 3), dtype=np.uint8) + do_one = lambda buf, pos, pio_in, pio_out: _float_to_rgb_do_one(buf, pos, pio_in, pio_out, transform, out_format) + _do_a_transform(pio_in, depth, make_buf, do_one, **kwargs) + + +def _float_to_rgb_do_one(buf, pos, pio_in, pio_out, transform, out_format): + img = pio_in.read_image(pos, format='npy') + if img is None: + return + + mapped = transform(img.asarray()) + + if mapped.ndim == 2: + # TODO: allow real colormaps + valid = np.isfinite(mapped) + mapped = mapped.reshape((256, 256, 1)) + else: + valid = np.all(np.isfinite(mapped), axis=2) + + mapped[~valid] = 0 + mapped = np.clip(mapped * 255, 0, 255).astype(np.uint8) + buf[...] = mapped + rgb = Image.from_array(buf) + pio_out.write_image(pos, rgb, format=out_format, mode=ImageMode.RGB) + + +def _float_to_rgba(pio_in, depth, transform, **kwargs): + make_buf = lambda: np.empty((256, 256, 4), dtype=np.uint8) + do_one = lambda buf, pos, pio_in, pio_out: _float_to_rgba_do_one(buf, pos, pio_in, pio_out, transform) + _do_a_transform(pio_in, depth, make_buf, do_one, **kwargs) + + +def _float_to_rgba_do_one(buf, pos, pio_in, pio_out, transform): + img = pio_in.read_image(pos, format='npy') if img is None: return @@ -120,5 +158,22 @@ def _float_to_rgb_do_one(buf, pos, pio, read_mode, transform): buf[...,:3] = mapped buf[...,3] = 255 * valid + rgba = Image.from_array(buf) + pio_out.write_image(pos, rgba, format='png', mode=ImageMode.RGBA) + + +# u8-to-RGB + +def u8_to_rgb(pio, depth, **kwargs): + make_buf = lambda: np.empty((256, 256, 3), dtype=np.uint8) + _do_a_transform(pio, depth, make_buf, _u8_to_rgb_do_one, **kwargs) + + +def _u8_to_rgb_do_one(buf, pos, pio_in, pio_out): + img = pio_in.read_image(pos, format='npy') + if img is None: + return + + buf[...] = img.asarray()[...,np.newaxis] rgb = Image.from_array(buf) - pio.write_image(pos, rgb, format='png', mode=ImageMode.RGB) + pio_out.write_image(pos, rgb, format='jpg', mode=ImageMode.RGB)