diff --git a/snputils/ancestry/genobj/local.py b/snputils/ancestry/genobj/local.py index f767f09..35268bc 100644 --- a/snputils/ancestry/genobj/local.py +++ b/snputils/ancestry/genobj/local.py @@ -3,10 +3,12 @@ import numpy as np import copy import warnings -from typing import Union, List, Dict, Sequence, Optional +from typing import Union, List, Dict, Sequence, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from snputils.snp.genobj.snpobj import SNPObject from .base import AncestryObject -from snputils.snp.genobj.snpobj import SNPObject log = logging.getLogger(__name__) @@ -17,10 +19,10 @@ class LocalAncestryObject(AncestryObject): """ def __init__( self, - haplotypes: List[str], + haplotypes: List[str], lai: np.ndarray, - samples: Optional[List[str]] = None, - ancestry_map: Optional[Dict[str, str]] = None, + samples: Optional[List[str]] = None, + ancestry_map: Optional[Dict[str, str]] = None, window_sizes: Optional[np.ndarray] = None, centimorgan_pos: Optional[np.ndarray] = None, chromosomes: Optional[np.ndarray] = None, @@ -28,14 +30,14 @@ def __init__( ) -> None: """ Args: - haplotypes (list of str of length n_haplotypes): + haplotypes (list of str of length n_haplotypes): A list of unique haplotype identifiers. lai (array of shape (n_windows, n_haplotypes)): A 2D array containing local ancestry inference values, where each row represents a genomic window, and each column corresponds to a haplotype phase for each sample. - samples (list of str of length n_samples, optional): + samples (list of str of length n_samples, optional): A list of unique sample identifiers. - ancestry_map (dict of str to str, optional): + ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names. window_sizes (array of shape (n_windows,), optional): An array specifying the number of SNPs in each genomic window. @@ -508,7 +510,7 @@ def convert_to_snp_level( ) -> 'SNPObject': """ Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI), - with optional support for Single Nucleotide Polymorphism (SNP) data. + with optional support for SNP data. If SNP positions (`variants_pos`) and chromosomes (`variants_chrom`) are not specified, the method generates SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP @@ -535,9 +537,11 @@ def convert_to_snp_level( An array containing the Phred-scaled quality score for each SNP. Returns: - SNPObject: + **SNPObject**: A `SNPObject` containing SNP-level ancestry data, along with optional metadata. """ + from snputils.snp.genobj.snpobj import SNPObject + # Extract attributes from SNPObject if provided if snpobject is not None: variants_chrom = variants_chrom if variants_chrom is not None else snpobject.variants_chrom diff --git a/snputils/snp/genobj/snpobj.py b/snputils/snp/genobj/snpobj.py index 3f37acd..02fd769 100644 --- a/snputils/snp/genobj/snpobj.py +++ b/snputils/snp/genobj/snpobj.py @@ -4,7 +4,11 @@ import copy import warnings import re -from typing import Any, Union, Tuple, List, Sequence, Dict, Optional +from typing import Any, Union, Tuple, List, Sequence, Dict, Optional, TYPE_CHECKING +from scipy.stats import mode + +if TYPE_CHECKING: + from snputils.ancestry.genobj.local import LocalAncestryObject log = logging.getLogger(__name__) @@ -50,8 +54,9 @@ def __init__( An array containing the chromosomal positions for each SNP. variants_qual (array of shape (n_snps,), optional): An array containing the Phred-scaled quality score for each SNP. - calldata_lai (array of shape (n_snps, n_samples, 2)): - An array containing the ancestry for each SNP. + calldata_lai (array, optional): + An array containing the ancestry for each SNP. This array can be either 2D with shape + `(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2). ancestry_map (dict of str to str, optional): A dictionary mapping ancestry codes to region names. """ @@ -1364,6 +1369,153 @@ def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']: snpobj.variants_id[snpobj.variants_id == ''] = '.' return snpobj + def convert_to_window_level( + self, + window_size: Optional[int] = None, + physical_pos: Optional[np.ndarray] = None, + chromosomes: Optional[np.ndarray] = None, + window_sizes: Optional[np.ndarray] = None, + laiobj: Optional['LocalAncestryObject'] = None + ) -> 'LocalAncestryObject': + """ + Aggregate the `calldata_lai` attribute into genomic windows within a + `snputils.ancestry.genobj.LocalAncestryObject`. + + **Options for defining windows (in order of precedence):** + + 1. **Fixed window size**: + - Use `window_size` to specify how many SNPs go into each window. The last window on each + chromosome may be larger if SNPs are not evenly divisible by the size. + + 2. **Custom start and end positions**: + - Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair + coordinates for each window. + - If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are + assumed to belong to that chromosome. + - If multiple chromosomes exist but `chromosomes` is missing, an error will be raised. + - Optionally, provide `window_sizes` to store the SNP count per-window. + + 3. **Matching existing windows**: + - Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`. + + Args: + window_size (int, optional): + Number of SNPs in each window if defining fixed-size windows. If the total number of + SNPs in a chromosome is not evenly divisible by the window size, the last window on that + chromosome will include all remaining SNPs and therefore be larger than the specified size. + physical_pos (array of shape (n_windows, 2), optional): + A 2D array containing the start and end physical positions for each window. + chromosomes (array of shape (n_windows,), optional): + An array with chromosome numbers corresponding to each genomic window. + window_sizes (array of shape (n_windows,), optional): + An array specifying the number of SNPs in each genomic window. + laiobj (LocalAncestryObject, optional): + A reference `LocalAncestryObject` from which to copy existing window definitions. + + Returns: + **LocalAncestryObject:** + A LocalAncestryObject containing window-level ancestry data. + """ + from snputils.ancestry.genobj.local import LocalAncestryObject + + if window_size is None and physical_pos is None and laiobj is None: + raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.") + + # 1. Fixed window size + if window_size is not None: + physical_pos = [] # Boundaries [start, end] of each window + chromosomes = [] # Chromosome for each window + window_sizes = [] # Number of SNPs for each window + for chrom in self.unique_chrom: + # Extract indices corresponding to this chromosome + mask_chrom = (self.variants_chrom == chrom) + # Subset to this chromosome + pos_chrom = self.variants_pos[mask_chrom] + # Number of SNPs for this chromosome + n_snps_chrom = pos_chrom.size + + # Initialize the start of the first window with the position of the first SNP + current_start = self.variants_pos[0] + + # Number of full windows with exactly `window_size` SNPs + n_full_windows = n_snps_chrom // window_size + + # Build all but the last window + for i in range(n_full_windows-1): + current_end = self.variants_pos[(i+1) * window_size - 1] + physical_pos.append([current_start, current_end]) + chromosomes.append(chrom) + window_sizes.append(window_size) + current_start = self.variants_pos[(i+1) * window_size] + + # Build the last window + current_end = self.variants_pos[-1] + physical_pos.append([current_start, current_end]) + chromosomes.append(chrom) + window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size)) + + physical_pos = np.array(physical_pos) + chromosomes = np.array(chromosomes) + window_sizes = np.array(window_sizes) + + # 2. Custom start and end positions + elif physical_pos is not None: + # Check if there is exactly one chromosome + if chromosomes is None: + unique_chrom = self.unique_chrom + if len(unique_chrom) == 1: + # We assume all windows belong to this single chromosome + single_chrom = unique_chrom[0] + chromosomes = np.array([single_chrom] * physical_pos.shape[0]) + else: + raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.") + + # 3. Match existing windows to a reference laiobj + elif laiobj is not None: + physical_pos = laiobj.physical_pos + chromosomes = laiobj.chromosomes + window_sizes = laiobj.window_sizes + + # Allocate an output LAI array + n_windows = physical_pos.shape[0] + n_samples = self.n_samples + if len(self.calldata_lai.shape) == 3: + lai = np.zeros((n_windows, n_samples, 2)) + else: + lai = np.zeros((n_windows, n_samples*2)) + + # For each window, find the relevant SNPs and compute the mode of the ancestries + for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)): + snps_mask = ( + (self.variants_chrom == chrom) & + (self.variants_pos >= start) & + (self.variants_pos <= end) + ) + if np.any(snps_mask): + lai_mask = self.calldata_lai[snps_mask, ...] + mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode + lai[i] = mode_ancestries + else: + lai[i] = np.nan + + # Generate haplotype labels, e.g. "Sample1.0", "Sample1.1" + haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)] + + # If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2) + if len(self.calldata_lai.shape) == 3: + lai = lai.reshape(n_windows, -1) + + # Aggregate into a LocalAncestryObject + return LocalAncestryObject( + haplotypes=haplotypes, + lai=lai, + samples=self.samples, + ancestry_map=self.ancestry_map, + window_sizes=window_sizes, + physical_pos=physical_pos, + chromosomes=chromosomes + ) + def save(self, file: Union[str, Path]) -> None: """ Save the data stored in `self` to a specified file.