diff --git a/src/eddymotion/dmri.py b/src/eddymotion/data/dmri.py similarity index 100% rename from src/eddymotion/dmri.py rename to src/eddymotion/data/dmri.py diff --git a/src/eddymotion/data/pet.py b/src/eddymotion/data/pet.py new file mode 100644 index 00000000..22328d73 --- /dev/null +++ b/src/eddymotion/data/pet.py @@ -0,0 +1,170 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2022 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""PET data representation.""" +from collections import namedtuple +from pathlib import Path +from tempfile import mkdtemp + +import attr +import h5py +import nibabel as nb +import numpy as np +from nitransforms.linear import Affine + + +def _data_repr(value): + if value is None: + return "None" + return f"<{'x'.join(str(v) for v in value.shape)} ({value.dtype})>" + + +@attr.s(slots=True) +class PET: + """Data representation structure for dMRI data.""" + + dataobj = attr.ib(default=None, repr=_data_repr) + """A numpy ndarray object for the data array, without *b=0* volumes.""" + affine = attr.ib(default=None, repr=_data_repr) + """Best affine for RAS-to-voxel conversion of coordinates (NIfTI header).""" + brainmask = attr.ib(default=None, repr=_data_repr) + """A boolean ndarray object containing a corresponding brainmask.""" + timepoints = attr.ib(default=None, repr=_data_repr) + """A 1D numpy array with the timing of each sample.""" + em_affines = attr.ib(default=None) + """ + List of :obj:`nitransforms.linear.Affine` objects that bring + PET timepoints into alignment. + """ + _filepath = attr.ib( + factory=lambda: Path(mkdtemp()) / "em_cache.h5", + repr=False, + ) + """A path to an HDF5 file to store the whole dataset.""" + + def __len__(self): + """Obtain the number of high-*b* orientations.""" + return self.dataobj.shape[-1] + + def set_transform(self, index, affine, order=3): + """Set an affine, and update data object and gradients.""" + reference = namedtuple("ImageGrid", ("shape", "affine"))( + shape=self.dataobj.shape[:3], affine=self.affine + ) + xform = Affine(matrix=affine, reference=reference) + + if not Path(self._filepath).exists(): + self.to_filename(self._filepath) + + # read original PET + with h5py.File(self._filepath, "r") as in_file: + root = in_file["/0"] + dframe = np.asanyarray(root["dataobj"][..., index]) + + dmoving = nb.Nifti1Image(dframe, self.affine, None) + + # resample and update orientation at index + self.dataobj[..., index] = np.asanyarray( + xform.apply(dmoving, order=order).dataobj, + dtype=self.dataobj.dtype, + ) + + # update transform + if self.em_affines is None: + self.em_affines = [None] * len(self) + + self.em_affines[index] = xform + + def to_filename(self, filename, compression=None, compression_opts=None): + """Write an HDF5 file to disk.""" + filename = Path(filename) + if not filename.name.endswith(".h5"): + filename = filename.parent / f"{filename.name}.h5" + + with h5py.File(filename, "w") as out_file: + out_file.attrs["Format"] = "EMC/PET" + out_file.attrs["Version"] = np.uint16(1) + root = out_file.create_group("/0") + root.attrs["Type"] = "pet" + for f in attr.fields(self.__class__): + if f.name.startswith("_"): + continue + + value = getattr(self, f.name) + if value is not None: + root.create_dataset( + f.name, + data=value, + compression=compression, + compression_opts=compression_opts, + ) + + def to_nifti(self, filename, insert_b0=False): + """Write a NIfTI 1.0 file to disk.""" + nii = nb.Nifti1Image(self.dataobj, self.affine, None) + nii.header.set_xyzt_units("mm") + nii.to_filename(filename) + + @classmethod + def from_filename(cls, filename): + """Read an HDF5 file from disk.""" + with h5py.File(filename, "r") as in_file: + root = in_file["/0"] + data = { + k: np.asanyarray(v) for k, v in root.items() if not k.startswith("_") + } + return cls(**data) + + +def load( + filename, + brainmask_file=None, + volume_timings=None, + volume_spacings=None, +): + """Load PET data.""" + filename = Path(filename) + if filename.name.endswith(".h5"): + return PET.from_filename(filename) + + img = nb.load(filename) + retval = PET( + dataobj=img.get_fdata(dtype="float32"), + affine=img.affine, + ) + + if volume_timings is not None: + retval.timepoints = np.array(volume_timings) + elif volume_spacings: + x = np.array([ + np.sum(volume_spacings[:i]) + for i in range(1, len(volume_spacings) + 1) + ]) + retval.timepoints = x - x[0] + else: + raise RuntimeError("Volume timings are necessary") + + if brainmask_file: + mask = nb.load(brainmask_file) + retval.brainmask = np.asanyarray(mask.dataobj) + + return retval diff --git a/src/eddymotion/model.py b/src/eddymotion/model.py index f4aed755..3078cb03 100644 --- a/src/eddymotion/model.py +++ b/src/eddymotion/model.py @@ -271,6 +271,82 @@ def predict(self, gradient, **kwargs): return self._data +class PETModel: + """A PET imaging realignment model based on B-Spline approximation.""" + + __slots__ = ("_t", "_x0", "_x1", "_order", "_coeff", "_mask", "_shape") + + def __init__(self, timepoints, n_ctrl=None, mask=None, order=3, **kwargs): + """ + Create the B-Spline interpolating matrix. + + Parameters: + ----------- + timepoints : :obj:`list` + The timing (in sec) of each PET volume. + E.g., ``[20., 40., 60., 120., 180., 240., 360., 480., 600., + 900., 1200., 1800., 2400., 3000.]`` + + n_ctrl : :obj:`int` + Number of B-Spline control points. If `None`, then one control point every + six timepoints will be used. The less control points, the smoother is the + model. + + """ + self._order = order + self._mask = mask + + x = np.array(timepoints) + self._x0 = x[0] + x -= self._x0 + + self._x1 = x[-1] + x /= self._x1 + + # Calculate index coordinates in the B-Spline grid + n_ctrl = n_ctrl or (len(timepoints) // 6) + 1 + x *= n_ctrl + + # B-Spline knots + self._t = np.linspace(-2.0, float(n_ctrl) + 2.0, n_ctrl + 4) + + def fit(self, data, timepoints, *args, **kwargs): + """Do nothing.""" + from scipy.interpolate import BSpline + from scipy.sparse.linalg import cg + + self._shape = data.shape[:3] + + # Convert data into V (voxels) x T (timepoints) + data = ( + data.reshape((-1, data.shape[-1])) + if self._mask is None else data[self._mask] + ) + + x = (np.array(timepoints) - self._x0) / self._x1 + A = BSpline.design_matrix(x, self._t, k=self._order) + + self._coeff, _ = cg(A.T @ A, A.T @ data) + + def predict(self, timepoint, **kwargs): + """Return the *b=0* map.""" + from scipy.interpolate import BSpline + + x = (timepoint - self._x0) / self._x1 + A = BSpline.design_matrix(x, self._t, k=self._order) + + # A is 1 (num. timepoints) x C (num. coeff) + # self._coeff is C (num. coeff) x V (num. voxels) + predicted = (A @ self._coeff).T + + if self._mask is None: + return predicted.reshape(self._shape) + + retval = np.zeros(self._shape, dtype="float32") + retval[self._mask] = predicted + return retval + + class DTIModel(BaseModel): """A wrapper of :obj:`dipy.reconst.dti.TensorModel`.""" diff --git a/test/test_dmri.py b/test/test_dmri.py index 26fa6b5c..f907b5c1 100644 --- a/test/test_dmri.py +++ b/test/test_dmri.py @@ -23,7 +23,7 @@ """Unit tests exercising the dMRI data structure.""" import pytest import numpy as np -from eddymotion.dmri import load +from eddymotion.data.dmri import load def test_load(datadir, tmp_path): diff --git a/test/test_estimator.py b/test/test_estimator.py index 496e9540..b592fe57 100644 --- a/test/test_estimator.py +++ b/test/test_estimator.py @@ -30,7 +30,7 @@ from nipype.interfaces.ants.registration import Registration from pkg_resources import resource_filename as pkg_fn -from eddymotion.dmri import DWI +from eddymotion.data.dmri import DWI @pytest.mark.parametrize("r_x", [0.0, 0.1, 0.3]) diff --git a/test/test_integration.py b/test/test_integration.py index fd51f311..a7ac01d6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -26,7 +26,7 @@ import nitransforms as nt import numpy as np -from eddymotion.dmri import DWI +from eddymotion.data.dmri import DWI from eddymotion.estimator import EddyMotionEstimator diff --git a/test/test_model.py b/test/test_model.py index 6f45a72d..22c9b75f 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -25,7 +25,7 @@ import pytest from eddymotion import model -from eddymotion.dmri import DWI +from eddymotion.data.dmri import DWI def test_trivial_model():