Skip to content

Commit

Permalink
Merge pull request #17 from AI-sandbox/feature/PSW-112-implement-meth…
Browse files Browse the repository at this point in the history
…od-to-convert-to-window-level

Add mehod to go from SNP-level (SNPObject) to window-level ancestry information (LocalAncestryObject)
  • Loading branch information
salcc authored Jan 10, 2025
2 parents bbb7aa7 + ea90d72 commit 2b01d03
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 13 deletions.
24 changes: 14 additions & 10 deletions snputils/ancestry/genobj/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import numpy as np
import copy
import warnings
from typing import Union, List, Dict, Sequence, Optional
from typing import Union, List, Dict, Sequence, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from snputils.snp.genobj.snpobj import SNPObject

from .base import AncestryObject
from snputils.snp.genobj.snpobj import SNPObject

log = logging.getLogger(__name__)

Expand All @@ -17,25 +19,25 @@ class LocalAncestryObject(AncestryObject):
"""
def __init__(
self,
haplotypes: List[str],
haplotypes: List[str],
lai: np.ndarray,
samples: Optional[List[str]] = None,
ancestry_map: Optional[Dict[str, str]] = None,
samples: Optional[List[str]] = None,
ancestry_map: Optional[Dict[str, str]] = None,
window_sizes: Optional[np.ndarray] = None,
centimorgan_pos: Optional[np.ndarray] = None,
chromosomes: Optional[np.ndarray] = None,
physical_pos: Optional[np.ndarray] = None
) -> None:
"""
Args:
haplotypes (list of str of length n_haplotypes):
haplotypes (list of str of length n_haplotypes):
A list of unique haplotype identifiers.
lai (array of shape (n_windows, n_haplotypes)):
A 2D array containing local ancestry inference values, where each row represents a
genomic window, and each column corresponds to a haplotype phase for each sample.
samples (list of str of length n_samples, optional):
samples (list of str of length n_samples, optional):
A list of unique sample identifiers.
ancestry_map (dict of str to str, optional):
ancestry_map (dict of str to str, optional):
A dictionary mapping ancestry codes to region names.
window_sizes (array of shape (n_windows,), optional):
An array specifying the number of SNPs in each genomic window.
Expand Down Expand Up @@ -508,7 +510,7 @@ def convert_to_snp_level(
) -> 'SNPObject':
"""
Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI),
with optional support for Single Nucleotide Polymorphism (SNP) data.
with optional support for SNP data.
If SNP positions (`variants_pos`) and chromosomes (`variants_chrom`) are not specified, the method generates
SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP
Expand All @@ -535,9 +537,11 @@ def convert_to_snp_level(
An array containing the Phred-scaled quality score for each SNP.
Returns:
SNPObject:
**SNPObject**:
A `SNPObject` containing SNP-level ancestry data, along with optional metadata.
"""
from snputils.snp.genobj.snpobj import SNPObject

# Extract attributes from SNPObject if provided
if snpobject is not None:
variants_chrom = variants_chrom if variants_chrom is not None else snpobject.variants_chrom
Expand Down
158 changes: 155 additions & 3 deletions snputils/snp/genobj/snpobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import copy
import warnings
import re
from typing import Any, Union, Tuple, List, Sequence, Dict, Optional
from typing import Any, Union, Tuple, List, Sequence, Dict, Optional, TYPE_CHECKING
from scipy.stats import mode

if TYPE_CHECKING:
from snputils.ancestry.genobj.local import LocalAncestryObject

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,8 +54,9 @@ def __init__(
An array containing the chromosomal positions for each SNP.
variants_qual (array of shape (n_snps,), optional):
An array containing the Phred-scaled quality score for each SNP.
calldata_lai (array of shape (n_snps, n_samples, 2)):
An array containing the ancestry for each SNP.
calldata_lai (array, optional):
An array containing the ancestry for each SNP. This array can be either 2D with shape
`(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2).
ancestry_map (dict of str to str, optional):
A dictionary mapping ancestry codes to region names.
"""
Expand Down Expand Up @@ -1364,6 +1369,153 @@ def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']:
snpobj.variants_id[snpobj.variants_id == ''] = '.'
return snpobj

def convert_to_window_level(
self,
window_size: Optional[int] = None,
physical_pos: Optional[np.ndarray] = None,
chromosomes: Optional[np.ndarray] = None,
window_sizes: Optional[np.ndarray] = None,
laiobj: Optional['LocalAncestryObject'] = None
) -> 'LocalAncestryObject':
"""
Aggregate the `calldata_lai` attribute into genomic windows within a
`snputils.ancestry.genobj.LocalAncestryObject`.
**Options for defining windows (in order of precedence):**
1. **Fixed window size**:
- Use `window_size` to specify how many SNPs go into each window. The last window on each
chromosome may be larger if SNPs are not evenly divisible by the size.
2. **Custom start and end positions**:
- Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair
coordinates for each window.
- If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are
assumed to belong to that chromosome.
- If multiple chromosomes exist but `chromosomes` is missing, an error will be raised.
- Optionally, provide `window_sizes` to store the SNP count per-window.
3. **Matching existing windows**:
- Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`.
Args:
window_size (int, optional):
Number of SNPs in each window if defining fixed-size windows. If the total number of
SNPs in a chromosome is not evenly divisible by the window size, the last window on that
chromosome will include all remaining SNPs and therefore be larger than the specified size.
physical_pos (array of shape (n_windows, 2), optional):
A 2D array containing the start and end physical positions for each window.
chromosomes (array of shape (n_windows,), optional):
An array with chromosome numbers corresponding to each genomic window.
window_sizes (array of shape (n_windows,), optional):
An array specifying the number of SNPs in each genomic window.
laiobj (LocalAncestryObject, optional):
A reference `LocalAncestryObject` from which to copy existing window definitions.
Returns:
**LocalAncestryObject:**
A LocalAncestryObject containing window-level ancestry data.
"""
from snputils.ancestry.genobj.local import LocalAncestryObject

if window_size is None and physical_pos is None and laiobj is None:
raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.")

# 1. Fixed window size
if window_size is not None:
physical_pos = [] # Boundaries [start, end] of each window
chromosomes = [] # Chromosome for each window
window_sizes = [] # Number of SNPs for each window
for chrom in self.unique_chrom:
# Extract indices corresponding to this chromosome
mask_chrom = (self.variants_chrom == chrom)
# Subset to this chromosome
pos_chrom = self.variants_pos[mask_chrom]
# Number of SNPs for this chromosome
n_snps_chrom = pos_chrom.size

# Initialize the start of the first window with the position of the first SNP
current_start = self.variants_pos[0]

# Number of full windows with exactly `window_size` SNPs
n_full_windows = n_snps_chrom // window_size

# Build all but the last window
for i in range(n_full_windows-1):
current_end = self.variants_pos[(i+1) * window_size - 1]
physical_pos.append([current_start, current_end])
chromosomes.append(chrom)
window_sizes.append(window_size)
current_start = self.variants_pos[(i+1) * window_size]

# Build the last window
current_end = self.variants_pos[-1]
physical_pos.append([current_start, current_end])
chromosomes.append(chrom)
window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size))

physical_pos = np.array(physical_pos)
chromosomes = np.array(chromosomes)
window_sizes = np.array(window_sizes)

# 2. Custom start and end positions
elif physical_pos is not None:
# Check if there is exactly one chromosome
if chromosomes is None:
unique_chrom = self.unique_chrom
if len(unique_chrom) == 1:
# We assume all windows belong to this single chromosome
single_chrom = unique_chrom[0]
chromosomes = np.array([single_chrom] * physical_pos.shape[0])
else:
raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.")

# 3. Match existing windows to a reference laiobj
elif laiobj is not None:
physical_pos = laiobj.physical_pos
chromosomes = laiobj.chromosomes
window_sizes = laiobj.window_sizes

# Allocate an output LAI array
n_windows = physical_pos.shape[0]
n_samples = self.n_samples
if len(self.calldata_lai.shape) == 3:
lai = np.zeros((n_windows, n_samples, 2))
else:
lai = np.zeros((n_windows, n_samples*2))

# For each window, find the relevant SNPs and compute the mode of the ancestries
for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)):
snps_mask = (
(self.variants_chrom == chrom) &
(self.variants_pos >= start) &
(self.variants_pos <= end)
)
if np.any(snps_mask):
lai_mask = self.calldata_lai[snps_mask, ...]
mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode
lai[i] = mode_ancestries
else:
lai[i] = np.nan

# Generate haplotype labels, e.g. "Sample1.0", "Sample1.1"
haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)]

# If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2)
if len(self.calldata_lai.shape) == 3:
lai = lai.reshape(n_windows, -1)

# Aggregate into a LocalAncestryObject
return LocalAncestryObject(
haplotypes=haplotypes,
lai=lai,
samples=self.samples,
ancestry_map=self.ancestry_map,
window_sizes=window_sizes,
physical_pos=physical_pos,
chromosomes=chromosomes
)

def save(self, file: Union[str, Path]) -> None:
"""
Save the data stored in `self` to a specified file.
Expand Down

0 comments on commit 2b01d03

Please sign in to comment.