Skip to content

Commit

Permalink
Support multiple allele states
Browse files Browse the repository at this point in the history
  • Loading branch information
szhan committed Jul 2, 2024
1 parent a6afc23 commit d6550a9
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 0 deletions.
67 changes: 67 additions & 0 deletions lshmm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,73 @@ def get_emission_probability_haploid(ref_allele, query_allele, site, emission_ma
return emission_matrix[site, 1]


@jit.numba_njit
def get_emission_matrix_hkylike(mu, kappa=None):
"""
Return an emission probability matrix that allows for
differences between transition and transversion rates.
When `kappa` is set to None, it is equivalent to setting it to 1.
:param float mu: Probability of mutation to any allele.
:param float kappa: Transition-to-transversion rate ratio.
"""
if kappa is not None:
if kappa <= 0:
raise ValueError("Ts/tv ratio must be positive.")
num_sites = len(mu)
num_alleles = 4 # Assume that ACGT are encoded as 0 to 3.
emission_matrix = (
np.zeros((num_sites, num_alleles, num_alleles), dtype=np.float64) - 1
)
for i in range(num_sites):
for j in range(num_alleles):
for k in range(num_alleles):
if j == k:
emission_matrix[i, j, k] = 1 - mu[i]
else:
emission_matrix[i, j, k] = mu[i] / 3
if kappa is not None:
# Transitions: A <-> G, C <-> T.
is_transition_AG = i in [0, 2] and j in [0, 2]
is_transition_CT = i in [1, 3] and j in [1, 3]
if is_transition_AG or is_transition_CT:
emission_matrix[i, j, k] *= kappa
return emission_matrix


@jit.numba_njit
def get_emission_probability_haploid_hkylike(
ref_allele, query_allele, site, emission_matrix
):
"""
Return the emission probability at a specified site for the haploid case,
given an emission probability matrix.
The emission probability matrix is an array of size (m, 4),
where m = number of sites.
:param int ref_allele: Reference allele.
:param int query_allele: Query allele.
:param int site: Site index.
:param numpy.ndarray emission_matrix: Emission probability matrix.
:return: Emission probability.
:rtype: float
"""
if ref_allele == MISSING:
raise ValueError("Reference allele cannot be MISSING.")
if query_allele == NONCOPY:
raise ValueError("Query allele cannot be NONCOPY.")
if emission_matrix.shape[1] != 4 or emission_matrix.shape[2] != 4:
raise ValueError("Emission probability matrix has incorrect shape.")
if ref_allele == NONCOPY:
return 0.0
elif query_allele == MISSING:
return 1.0
else:
return emission_matrix[site, ref_allele, query_allele]


# Functions to assign emission probabilities for diploid LS HMM.
@jit.numba_njit
def get_emission_matrix_diploid(mu, num_sites, num_alleles, scale_mutation_rate):
Expand Down
215 changes: 215 additions & 0 deletions tests/test_nontree_vit_haploid_tstv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import itertools
import pytest

import numpy as np
import numba as nb

from . import lsbase
import lshmm.core as core
import lshmm.vit_haploid as vh


class TestNonTreeViterbiHaploid(lsbase.ViterbiAlgorithmBase):
def verify(self, ts, include_ancestors):
H, queries = self.get_examples_haploid(ts, include_ancestors)
m = H.shape[0]
n = H.shape[1]

r_s = [
np.append([0], np.zeros(m - 1) + 0.01),
np.append([0], np.random.rand(m - 1)),
np.append([0], 1e-5 * (np.random.rand(m - 1) + 0.5) / 2),
np.append([0], np.zeros(m - 1) + 0.2),
np.append([0], np.zeros(m - 1) + 1e-6),
]
mu_s = [
np.zeros(m) + 0.01,
np.random.rand(m) * 0.2,
1e-5 * (np.random.rand(m) + 0.5) / 2,
np.zeros(m) + 0.2,
np.zeros(m) + 1e-6,
]
kappa_s = [1 / 2, 1 / 4, 1, 3 / 2, 2]

for s, r, mu, kappa in itertools.product(queries, r_s, mu_s, kappa_s):
e = core.get_emission_matrix_hkylike(mu, kappa)

V_vs, P_vs, ll_vs = vh.forwards_viterbi_hap_naive(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_vs = vh.backwards_viterbi_hap(m=m, V_last=V_vs[m - 1, :], P=P_vs)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_vs,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_vs, ll_check)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_naive_vec(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m=m, V_last=V_tmp[m - 1, :], P=P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_naive_low_mem(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m=m, V_last=V_tmp, P=P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_naive_low_mem_rescaling(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m, V_tmp, P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_low_mem_rescaling(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m=m, V_last=V_tmp, P=P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

V_tmp, P_tmp, ll_tmp = vh.forwards_viterbi_hap_lower_mem_rescaling(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap(m, V_tmp, P_tmp)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

(
V_tmp,
V_argmaxes_tmp,
recombs,
ll_tmp,
) = vh.forwards_viterbi_hap_lower_mem_rescaling_no_pointer(
n=n,
m=m,
H=H,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
path_tmp = vh.backwards_viterbi_hap_no_pointer(
m=m,
V_argmaxes=V_argmaxes_tmp,
recombs=nb.typed.List(recombs),
)
ll_check = vh.path_ll_hap(
n=n,
m=m,
H=H,
path=path_tmp,
s=s,
e=e,
r=r,
emission_func=core.get_emission_probability_haploid_hkylike,
)
self.assertAllClose(ll_tmp, ll_check)
self.assertAllClose(ll_vs, ll_tmp)

@pytest.mark.parametrize("include_ancestors", [True, False])
def test_ts_multiallelic_n10_no_recomb(self, include_ancestors):
ts = self.get_ts_multiallelic_n10_no_recomb()
self.verify(ts, include_ancestors)

@pytest.mark.parametrize("num_samples", [8, 16, 32])
@pytest.mark.parametrize("include_ancestors", [True, False])
def test_ts_multiallelic(self, num_samples, include_ancestors):
ts = self.get_ts_multiallelic(num_samples)
self.verify(ts, include_ancestors)

0 comments on commit d6550a9

Please sign in to comment.