diff --git a/projection/camera_funcs.py b/projection/camera_funcs.py index bb9edd0..4f49c9f 100755 --- a/projection/camera_funcs.py +++ b/projection/camera_funcs.py @@ -8,12 +8,14 @@ class CameraModel(): + '''Holds the camera model and filter specific variables ''' - holds the camera model and filter specific - variables - ''' - def __init__(self, filt): + def __init__(self, filt: int): + """Initialize the camera and load the corresponding variables from the SPICE kernel pool + + :param filt: The filter number (0 for blue, 1 for green and 2 for red) + """ self.filter = filt self.id = CAMERA_IDS[filt] @@ -32,25 +34,12 @@ def __init__(self, filt): self.iframe_delay = spice.gdpool( 'INS%s_INTERFRAME_DELTA' % self.id, 0, 32)[0] - ''' - functions to obtain positions in JUNOCAM frame - see: https://naif.jpl.nasa.gov/pub/naif/JUNO/kernels/ik/juno_junocam_v03.ti - ''' + def pix2vec(self, px: list[float]) -> np.ndarray: + '''Convert from pixel coordinate to vector in the JUNO_JUNOCAM reference frame. See: https://naif.jpl.nasa.gov/pub/naif/JUNO/kernels/ik/juno_junocam_v03.ti - def pix2vec(self, px): - ''' - Convert from pixel coordinate to vector in the - JUNO_JUNOCAM reference frame - - Parameters - ---------- - px : array-like - x and y position of pixel centers in the camera - - Output - ------ - v : numpy.ndarray - vector in the JUNO_JUNOCAM reference frame + :param px: x and y position of pixel centers in the camera + + :return: vector in the JUNO_JUNOCAM reference frame ''' camx = px[0] - self.cx camy = px[1] - self.cy @@ -58,21 +47,12 @@ def pix2vec(self, px): v = np.asarray([cam[0], cam[1], self.f1]) return v - def undistort(self, c): - ''' - Removes the barrel distortion in the JunoCam image - - Parameters - ---------- - c : array-like - x and y position of pixel centers in the camera - - Output - ------ - xd : float - x position of the pixel after removing barrel distortion - yd : float - y position of the pixel after removing barrel distortion + def undistort(self, c: list[float]) -> tuple[float]: + ''' Removes the barrel distortion in the JunoCam image + + :param c: x and y position of pixel centers in the camera + + :return: tuple containing the x- and y-position of the pixel after removing barrel distortion ''' xd, yd = c[0], c[1] for i in range(5): @@ -82,45 +62,26 @@ def undistort(self, c): yd = c[1] / dr return (xd, yd) - def distort(self, c): - ''' - Adds barrel distortion to the image - - Parameters - ---------- - c : array-like - x and y position of undistorted pixel centers in the camera - - Output - ------ - xd : float - x position of the pixel after adding barrel distortion - yd : float - y position of the pixel after adding barrel distortion + def distort(self, c: list[float]) -> tuple[float]: + '''Adds barrel distortion to the image + + :param c: x and y position of undistorted pixel centers in the camera + + :return: x- and y- position of the pixel after adding barrel distortion ''' xd, yd = c[0], c[1] r2 = (xd**2 + yd**2) dr = 1 + self.k1 * r2 + self.k2 * r2 * r2 xd *= dr yd *= dr - return [xd, yd] + return (xd, yd) - def vec2pix(self, v): - ''' - Convert a vector in the JUNO_JUNOCAM reference frame - to pixel coordinates on the plate - - Parameters - ---------- - v : array-like - vector in the JUNO_JUNOCAM reference frame - - Output - ------ - x : float - x-center of the pixel in the plate - y : float - y-center of the pixel in the plate + def vec2pix(self, v: list[float]) -> tuple[float]: + '''Convert a vector in the JUNO_JUNOCAM reference frame to pixel coordinates on the plate + + :param v: vector in the JUNO_JUNOCAM reference frame + + :return: x- and y-center of the pixel in the plate ''' alpha = v[2] / self.f1 cam = [v[0] / alpha, v[1] / alpha] diff --git a/projection/frameletdata.py b/projection/frameletdata.py index 97a5b78..8040dde 100644 --- a/projection/frameletdata.py +++ b/projection/frameletdata.py @@ -12,7 +12,14 @@ class FrameletData: + """Holds information and methods pertaining to all the framelets in the image + """ def __init__(self, metadata: dict, imgfolder: str): + """Initialize the structure by reading in the metadata and separating the framelets in the image + + :param metadata: input image metadata + :param imgfolder: path to the folder where the image is stored + """ self.nframelets = int(metadata["LINES"] / FRAME_HEIGHT) # number of RGB frames self.nframes = int(self.nframelets / 3) @@ -63,7 +70,11 @@ def __init__(self, metadata: dict, imgfolder: str): self.fullimg = np.concatenate([frame.rawimg for frame in self.framelets], axis=0) - def get_backplane(self, num_procs: int): + def get_backplane(self, num_procs: int) -> None: + """Retrieve backplane information for all framelets (i.e., get pixel coordinates in the mid-plane frame and also incidence/emission angles) + + :param num_procs: number of processors to use for multi-threaded processing + """ tmid = self.tmid if num_procs > 1: @@ -84,42 +95,61 @@ def get_backplane(self, num_procs: int): for frame in tqdm.tqdm(self.framelets): frame.project_to_midplane(tmid) - def update_jitter(self, jitter: float): + def update_jitter(self, jitter: float) -> None: + """Update the jitter in the spacecraft clock for each framelet + + :param jitter: the new jitter value in seconds + """ self.jitter = jitter for framelet in self.framelets: framelet.jitter = jitter @property def tmid(self) -> float: + """The mid-plane clock time""" return np.mean([frame.et for frame in self.framelets]) @property def image(self) -> np.ndarray: + """The concatenated illumination-corrected image in all the framelets""" return np.stack([frame.image for frame in self.framelets], axis=0).reshape((self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)) @property def coords(self) -> np.ndarray: + """The concatenated coordinate of each pixel in the mid-plane frame for each pixel in the camera frame""" return np.stack([frame.coords for frame in self.framelets], axis=0).reshape((self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH, 2)) @property def emission(self) -> np.ndarray: + """The concatenated emission angles for each pixel in the mid-plane frame""" return np.stack([frame.emission for frame in self.framelets], axis=0).reshape((self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)) @property def incidence(self) -> np.ndarray: + """The concatenated incidence angles for each pixel in the mid-plane frame""" return np.stack([frame.incidence for frame in self.framelets], axis=0).reshape((self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)) @property def longitude(self) -> np.ndarray: + """The concatenated Sys III longitude values for each pixel in the mid-plane frame""" return np.stack([frame.lon for frame in self.framelets], axis=0).reshape((self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)) @property def latitude(self) -> np.ndarray: + """The concatenated planetographic latitude value for each pixel in the mid-plane frame""" return np.stack([frame.lat for frame in self.framelets], axis=0).reshape((self.nframes, 3, FRAME_HEIGHT, FRAME_WIDTH)) class Framelet: + """Holds information and methods for a single framelet in the JunoCam image""" def __init__(self, start_et: float, frame_delay: float, frame_no: int, color: int, img: np.ndarray): + """Initialize the framelet with relevant spacecraft data and camera information + + :param start_et: the spacecraft clock time for the start of the exposure in ET + :param frame_delay: the inter-frame delay for this camera + :param frame_no: the index of the 3-color subframe (i.e., the 4th framelet is blue frame #2) + :param color: the filter for this framelet (i.e., 0 for blue, 1 for green and 2 for red) + """ self.start_et = start_et self.frame_delay = frame_delay self.frame_no = frame_no @@ -130,12 +160,17 @@ def __init__(self, start_et: float, frame_delay: float, frame_no: int, color: in @property def et(self) -> float: + """Get the observation time for this framelet""" jitter = 0 if self.jitter is None else self.jitter return ( self.start_et + self.camera.time_bias + jitter + (self.frame_delay + self.camera.iframe_delay) * self.frame_no ) - def project_to_midplane(self, tmid: float) -> None: + def project_to_midplane(self, tmid: float): + """Get the backplane information at the mid-plane frame for this framelet. Returns self for multi-processing + + :param tmid: the spacecraft clock at the mid-plane frame + """ coords = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH, 2)) lat = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH)) lon = np.nan * np.zeros((FRAME_HEIGHT, FRAME_WIDTH)) @@ -187,19 +222,12 @@ def project_to_midplane(self, tmid: float) -> None: def decompand(image: np.ndarray) -> np.ndarray: - """ - Decompands the image from the 8-bit in the public release + """Decompands the image from the 8-bit in the public release to the original 12-bit shot by JunoCam - Parameters - ---------- - image : numpy.ndarray - 8-bit input image + :param image: 8-bit input image - Outputs - ------- - data : numpy.ndarray - Original 12-bit image + :return: Original 12-bit image """ data = np.array(255 * image, dtype=np.uint8) ny, nx = data.shape diff --git a/projection/projector.py b/projection/projector.py index a41b1a1..d9ad1c3 100755 --- a/projection/projector.py +++ b/projection/projector.py @@ -17,17 +17,15 @@ class Projector: """ The main projector class that determines the surface intercept points of each pixel in a JunoCam image - - Methods - ------- - load_kernels: Determines and loads the required SPICE kernels - for processing - process_n_c : Projects an individual framelet in the JunoCam raw image - process : Driver for the projection that handles - parallel processing """ - def __init__(self, imagefolder, meta, kerneldir="."): + def __init__(self, imagefolder: str, meta: str, kerneldir: str = "./kernels/"): + """Initializes the Projector + + :param imagefolder: Path to the folder containing the image files + :param meta: Path to the metadata JSON file + :param kerneldir: Path to folder where SPICE kernels will be stored, defaults to "./" + """ with open(meta, "r") as metafile: metadata = json.load(metafile) @@ -43,7 +41,11 @@ def __init__(self, imagefolder, meta, kerneldir="."): self.find_jitter(jitter_max=120) - def load_kernels(self, KERNEL_DATAFOLDER): + def load_kernels(self, KERNEL_DATAFOLDER: str) -> None: + """Get the kernels for the current spacecraft time and load them + + :param KERNEL_DATAFOLDER: path to the folder where kernels are stored + """ self.kernels = [] kernels = get_kernels(KERNEL_DATAFOLDER, self.start_utc) for kernel in kernels: @@ -51,7 +53,14 @@ def load_kernels(self, KERNEL_DATAFOLDER): spice.furnsh(kernel) self.kernels.append(kernel) - def find_jitter(self, jitter_max=25, plot=False, threshold=80): + def find_jitter(self, jitter_max: float = 25, threshold: float = 80, plot: bool = False) -> None: + """Find the best jitter value to the spacecraft camera time + + + :param jitter_max: Maximum value to search for the jitter [millseconds], defaults to 25 + :param threshold: percentile value to use to find the planet's limb, defaults to 80 + :param plot: boolean flag for whether to plot the intermediate steps for debugging, defaults to False + """ threshold = np.percentile(self.framedata.fullimg.flatten(), threshold) for nci in range(self.framedata.nframes * 3): @@ -153,7 +162,14 @@ def find_jitter(self, jitter_max=25, plot=False, threshold=80): plt.plot(limbs_jcam[:, 0], limbs_jcam[:, 1], "g.", markersize=0.1) plt.show() - def get_limb(self, eti, cami): + def get_limb(self, eti: float, cami: CameraModel) -> np.ndarray: + """Find the pixel positions of the planet's limb for a given camera frame at a specific time + + :param eti: spacecraft clock time in seconds + :param cami: the camera which is used for observing + + :return: the pixel positions of the limb of the planet in the camera frame (shape: (npix, 2)) + """ METHOD = "TANGENT/ELLIPSOID" CORLOC = "ELLIPSOID LIMB" @@ -184,7 +200,17 @@ def get_limb(self, eti, cami): return limbs_jcam - def process(self, nside=512, num_procs=8, apply_correction='ls', n_neighbor=5, minneart_k=1.25): + def process(self, nside: int = 512, num_procs: int = 8, apply_correction: str = 'ls', n_neighbor: int = 5, minneart_k: float = 1.25) -> np.ndarray: + """Processes the current image into a HEALPix map of a given resolution. Also applies lightning correction as needed. + + :param nside: resolution of the HEALPix map. See https://healpy.readthedocs.io/en/latest/tutorial.html, defaults to 512 + :param num_proces: number of cores to use for projection. Set to 1 to disable multithreading, defaults to 8 + :param apply_correction: the choice of lightning correction to apply. Choose betwen 'ls' for Lommel-Seeliger, 'minneart' for Minneart and 'none' for no correction, defaults to 'ls' + :param n_neighbor: the number of nearest neighbours to use for interpolating. Increase to get more details at the cost of performance, defaults to 5 + :param minneart_k: the index for Minneart correction. Only used when apply_correction='minneart', defaults to 1.25 + + :return: the HEALPix map of the projected image (shape: (npixels, 3), where npixels is the corresponding image size for a given n_side) + """ print(f"Projecting {self.fname} to a HEALPix grid with n_side={nside}") self.framedata.get_backplane(num_procs) @@ -195,7 +221,14 @@ def process(self, nside=512, num_procs=8, apply_correction='ls', n_neighbor=5, m return map - def apply_correction(self, correction_type, minneart_k=1.25): + def apply_correction(self, correction_type: str, minneart_k: float = 1.25) -> None: + """Apply the requested illumination correction + + This function updates the framelet's `image` variable in-place and does not return a value + + :param apply_correction: the choice of lightning correction to apply. Choose betwen 'ls' for Lommel-Seeliger, 'minneart' for Minneart and 'none' for no correction, defaults to 'ls' + :param minneart_k: the index for Minneart correction. Only used when apply_correction='minneart', defaults to 1.25 + """ if correction_type == 'ls': print("Applying Lommel-Seeliger correction") for frame in self.framedata.framelets: @@ -210,14 +243,26 @@ def apply_correction(self, correction_type, minneart_k=1.25): frame.image = frame.rawimg / frame.fluxcal @property - def framecoords(self): + def framecoords(self) -> np.ndarray: + """Get the coordinates of each pixel in the current camera frame in the midplane frame + """ return np.transpose(self.framedata.coords, (1, 0, 2, 3, 4)).reshape(3, -1, 2) @property - def imagevalues(self): + def imagevalues(self) -> np.ndarray: + """Get the illumination corrected image values from each frame + """ return np.transpose(self.framedata.image, (1, 0, 2, 3)).reshape(3, -1) - def project_to_healpix(self, nside, n_neighbor=4, min_dist=25): + def project_to_healpix(self, nside: int, n_neighbor: int = 4, max_dist: int = 25) -> np.ndarray: + """Project the current image to a HEALPix map + + :param nside: resolution of the HEALPix map. See https://healpy.readthedocs.io/en/latest/tutorial.html + :param n_neighbor: the number of nearest neighbours to use for interpolating. Increase to get more details at the cost of performance, defaults to 5 + :param max_dist: the largest distance between neighbours to use for interpolation, defaults to 25 pixels + + :return: the HEALPix map of the projected image (shape: (npixels, 3), where npixels is the corresponding image size for a given n_side) + """ # get the image extents in pixel coordinate space # clip half a pixel to avoid edge artifacts x0 = np.nanmin(self.framecoords[:, :, 0]) + 0.5 @@ -245,14 +290,19 @@ def project_to_healpix(self, nside, n_neighbor=4, min_dist=25): pixel_inds = hp.ang2pix(nside, longrid[inds], latgrid[inds], lonlat=True) # finally, project the image onto the healpix grid - m = create_image_from_grid(self.framecoords, self.imagevalues, pix, pixel_inds, longrid.shape, n_neighbor=n_neighbor, min_dist=min_dist) + m = create_image_from_grid(self.framecoords, self.imagevalues, pixel_inds, pix, longrid.shape, n_neighbor=n_neighbor, max_dist=max_dist) return m -def apply_lommel_seeliger(imgvals, incidence, emission): - ''' - Apply the Lommel-Seeliger correction for incidence +def apply_lommel_seeliger(imgvals: np.ndarray, incidence: np.ndarray, emission: np.ndarray) -> np.ndarray: + '''Apply the Lommel-Seeliger correction for incidence + + :param imgvals: the raw image values + :param incidence: the incidence angles (in radians) for each pixel in `imgvals` + :param emission: the emission angles (in radians) for each pixel in `imgvals` + + :return: the corrected image values with the same shape as `imgvals` ''' # apply Lommel-Seeliger correction mu0 = np.cos(incidence) @@ -264,7 +314,16 @@ def apply_lommel_seeliger(imgvals, incidence, emission): return imgvals -def apply_minneart(imgvals, incidence, emission, k=1.25): +def apply_minneart(imgvals: np.ndarray, incidence: np.ndarray, emission: np.ndarray, k: float = 1.25) -> np.ndarray: + """Apply the Minneart illumination correction + + :param imgvals: the raw image values + :param incidence: the incidence angles (in radians) for each pixel in `imgvals` + :param emission: the emission angles (in radians) for each pixel in `imgvals` + :param minnaert_k: the index for Minneart correction, defaults to 1.25 + + :return: the corrected image values with the same shape as `imgvals` + """ # apply Minneart correction mu0 = np.cos(incidence) mu = np.cos(emission) @@ -276,13 +335,24 @@ def apply_minneart(imgvals, incidence, emission, k=1.25): return imgvals -def create_image_from_grid(coords, imgvals, pix, inds, img_shape, n_neighbor=5, min_dist=25.): +def create_image_from_grid(coords: np.ndarray, imgvals: np.ndarray, inds: np.ndarray, pix: np.ndarray, + img_shape: tuple[int], n_neighbor: int = 5, max_dist: float = 25.): ''' Reproject an irregular spaced image onto a regular grid from a list of coordinate locations and corresponding image values. This uses an inverse lookup-table defined by `pix`, where pix gives the coordinates in the original image where the corresponding pixel coordinate on the new image should be. The coordinate on the new image is given by - the inds variable. + the `inds`. + + :param coords: the pixel coordinates in the original image + :param imgvals: the image values corresponding to coords + :param inds: the coordinate on the new image where we need to interpolate + :param pix: the coordinate in the original image corresponding to each pixel in inds + :param img_shape: the shape of the new image + :param n_neighbor: the number of nearest neighbours to use for interpolating. Increase to get more details at the cost of performance, defaults to 5 + :param max_dist: the largest distance between neighbours to use for interpolation, defaults to 25 pixels + + :return: the interpolated new image of shape `img_shape` where every pixel at `inds` has corresponding values interpolated from `imgvals` ''' nchannels, ncoords, _ = coords.shape @@ -294,7 +364,7 @@ def create_image_from_grid(coords, imgvals, pix, inds, img_shape, n_neighbor=5, dist, indi = neighbors.kneighbors(pix, n_neighbor) weight = 1. / (dist + 1.e-16) weight = weight / np.sum(weight, axis=1, keepdims=True) - weight[dist > min_dist] = 0. + weight[dist > max_dist] = 0. newvals[n, :] = np.sum(np.take(imgvals[n][mask], indi, axis=0) * weight, axis=1) diff --git a/projection/spice_utils.py b/projection/spice_utils.py index ee7b7bf..cfae106 100644 --- a/projection/spice_utils.py +++ b/projection/spice_utils.py @@ -8,14 +8,28 @@ BASEURL = 'https://naif.jpl.nasa.gov/pub/naif/JUNO/kernels/' -def fetch_kernels_from_https(path, pattern): +def fetch_kernels_from_https(path: str, pattern: str) -> list[str]: + """Fetch kernels from URL matching a given pattern + + :param path: URL at which to search for kernels (will parse HTML links in this URL) + :param pattern: file-name pattern to search for relevant links + + :return: list of files that match the given pattern + """ with requests.get(path) as response: soup = BeautifulSoup(response.text, 'html.parser') kernels_all = [a['href'] for a in soup.find('pre').find_all('a')] return fnmatch.filter(kernels_all, pattern) -def check_and_download_kernels(kernels, KERNEL_DATAFOLDER): +def check_and_download_kernels(kernels: list[str], KERNEL_DATAFOLDER: str) -> list[str]: + """Check whether the list of kernels are in the local directory and download them if needed. + + :param kernels: list of kernel filenames (relative to the root URL) + :param KERNEL_DATAFOLDER: path to the local kernel directory to store downloaded kernels + + :return: list of local paths to the kernels + """ kernel_fnames = [] for kernel in kernels: if not os.path.exists(os.path.join(KERNEL_DATAFOLDER, kernel)): @@ -25,7 +39,12 @@ def check_and_download_kernels(kernels, KERNEL_DATAFOLDER): return kernel_fnames -def download_kernel(kernel, KERNEL_DATAFOLDER): +def download_kernel(kernel: str, KERNEL_DATAFOLDER: str) -> None: + """Download a given kernel to the local folder + + :param kernel: URL path to the kernel + :param KERNEL_DATAFOLDER: root folder to store the downloaded kernel + """ link = os.path.join(BASEURL, kernel) file_name = os.path.join(KERNEL_DATAFOLDER, kernel) with open(file_name, "wb") as f: @@ -43,7 +62,14 @@ def download_kernel(kernel, KERNEL_DATAFOLDER): pbar.update(len(data)) -def get_kernels(KERNEL_DATAFOLDER, start_utc): +def get_kernels(KERNEL_DATAFOLDER: str, start_utc: float) -> list[str]: + """Fetch all relevant kernels for JunoCam at a given time + + :param KERNEL_DATAFOLDER: path to the local kernel directory to store downloaded kernels + :param start_utc: the spacecraft clock time in start_utc + + :return: list of local paths to the kernels + """ if not os.path.exists(KERNEL_DATAFOLDER): os.mkdir(KERNEL_DATAFOLDER)