Skip to content

Commit

Permalink
Support multiallelic states
Browse files Browse the repository at this point in the history
  • Loading branch information
szhan committed Jul 1, 2024
1 parent e88a86c commit 834bfe0
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 30 deletions.
60 changes: 30 additions & 30 deletions lshmm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,36 +302,6 @@ def get_emission_probability_haploid(ref_allele, query_allele, site, emission_ma
return emission_matrix[site, 1]


@jit.numba_njit
def get_emission_probability_haploid_hkylike(ref_allele, query_allele, site, emission_matrix):
"""
Return the emission probability at a specified site for the haploid case,
given an emission probability matrix.
The emission probability matrix is an array of size (m, 4),
where m = number of sites.
:param int ref_allele: Reference allele.
:param int query_allele: Query allele.
:param int site: Site index.
:param numpy.ndarray emission_matrix: Emission probability matrix.
:return: Emission probability.
:rtype: float
"""
if ref_allele == MISSING:
raise ValueError("Reference allele cannot be MISSING.")
if query_allele == NONCOPY:
raise ValueError("Query allele cannot be NONCOPY.")
if emission_matrix.shape[1] != 4 or emission_matrix.shape[2] != 4:
raise ValueError("Emission probability matrix has incorrect shape.")
if ref_allele == NONCOPY:
return 0.0
elif query_allele == MISSING:
return 1.0
else:
return emission_matrix[site, ref_allele, query_allele]


@jit.numba_njit
def get_emission_matrix_hkylike(mu, kappa=None):
"""
Expand Down Expand Up @@ -365,6 +335,36 @@ def get_emission_matrix_hkylike(mu, kappa=None):
return emission_matrix


@jit.numba_njit
def get_emission_probability_haploid_hkylike(ref_allele, query_allele, site, emission_matrix):
"""
Return the emission probability at a specified site for the haploid case,
given an emission probability matrix.
The emission probability matrix is an array of size (m, 4),
where m = number of sites.
:param int ref_allele: Reference allele.
:param int query_allele: Query allele.
:param int site: Site index.
:param numpy.ndarray emission_matrix: Emission probability matrix.
:return: Emission probability.
:rtype: float
"""
if ref_allele == MISSING:
raise ValueError("Reference allele cannot be MISSING.")
if query_allele == NONCOPY:
raise ValueError("Query allele cannot be NONCOPY.")
if emission_matrix.shape[1] != 4 or emission_matrix.shape[2] != 4:
raise ValueError("Emission probability matrix has incorrect shape.")
if ref_allele == NONCOPY:
return 0.0
elif query_allele == MISSING:
return 1.0
else:
return emission_matrix[site, ref_allele, query_allele]


# Functions to assign emission probabilities for diploid LS HMM.
@jit.numba_njit
def get_emission_matrix_diploid(mu, num_sites, num_alleles, scale_mutation_rate):
Expand Down
215 changes: 215 additions & 0 deletions tests/test_nontree_vit_haploid_multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import itertools
import pytest

import numpy as np
import numba as nb

from . import lsbase
import lshmm.core as core
import lshmm.vit_haploid as vh


class TestNonTreeViterbiHaploid(lsbase.ViterbiAlgorithmBase):
def verify(self, ts, include_ancestors):
H, queries = self.get_examples_haploid(ts, include_ancestors)
m = H.shape[0]
n = H.shape[1]

r_s = [
np.append([0], np.zeros(m - 1) + 0.01),
np.append([0], np.random.rand(m - 1)),
np.append([0], 1e-5 * (np.random.rand(m - 1) + 0.5) / 2),
np.append([0], np.zeros(m - 1) + 0.2),
np.append([0], np.zeros(m - 1) + 1e-6),
]
mu_s = [
np.zeros(m) + 0.01,
np.random.rand(m) * 0.2,
1e-5 * (np.random.rand(m) + 0.5) / 2,
np.zeros(m) + 0.2,
np.zeros(m) + 1e-6,
]
kappa_s = [1/2, 1/4, 1, 3/2, 2]

for s, r, mu, kappa in itertools.product(queries, r_s, mu_s, kappa_s):
e = core.get_emission_matrix_hkylike(mu, kappa)

V_vs, P_vs, ll_vs = vh.forwards_viterbi_hap_naive(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_vs = vh.backwards_viterbi_hap(m=m, V_last=V_vs[m - 1, :], P=P_vs)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_vs,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_vs, ll_check)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_naive_vec(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m=m, V_last=V_tmp[m - 1, :], P=P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_naive_low_mem(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m=m, V_last=V_tmp, P=P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_naive_low_mem_rescaling(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m, V_tmp, P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_low_mem_rescaling(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m=m, V_last=V_tmp, P=P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_lower_mem_rescaling(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m, V_tmp, P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

(
V_tmp,
V_argmaxes_tmp,
recombs,
ll_tmp,
) = vh.forwards_viterbi_hap_lower_mem_rescaling_no_pointer(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap_no_pointer(
m=m,
V_argmaxes=V_argmaxes_tmp,
recombs=nb.typed.List(recombs),
)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

@pytest.mark.parametrize("include_ancestors", [True, False])
def test_ts_multiallelic_n10_no_recomb(self, include_ancestors):
ts = self.get_ts_multiallelic_n10_no_recomb()
self.verify(ts, include_ancestors)

@pytest.mark.parametrize("num_samples", [8, 16, 32])
@pytest.mark.parametrize("include_ancestors", [True, False])
def test_ts_multiallelic(self, num_samples, include_ancestors):
ts = self.get_ts_multiallelic(num_samples)
self.verify(ts, include_ancestors)

0 comments on commit 834bfe0

Please sign in to comment.