Skip to content

Commit

Permalink
Use genetic map
Browse files Browse the repository at this point in the history
  • Loading branch information
szhan committed Sep 22, 2023
1 parent 55f990b commit 44c7203
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions python/tests/beagle.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def convert_to_genetic_map_position(pos):
return pos / 1e6 # Convert to Mb


def get_weights(genotyped_pos, ungenotyped_pos):
def get_weights(genotyped_pos, ungenotyped_pos, genetic_map=None):
"""
Get weights at ungenotyped markers in the query haplotype, which are used in
linear interpolation of HMM state probabilities at ungenotyped markers.
Expand All @@ -74,6 +74,7 @@ def get_weights(genotyped_pos, ungenotyped_pos):
:param numpy.ndarray genotyped_pos: Site positions of genotyped markers.
:param numpy.ndarray ungenotyped_pos: Site positions of ungenotyped markers.
:param msprime.RateMap genetic_map: Genetic map.
:return: Weights and associated marker interval start indices.
:rtype: tuple(numpy.ndarray, numpy.ndarray)
"""
Expand All @@ -85,8 +86,12 @@ def get_weights(genotyped_pos, ungenotyped_pos):
assert len(np.union1d(genotyped_pos, ungenotyped_pos)) == m + x
# Set min genetic distance.
MIN_CM_DIST = 0.005
genotyped_cm = convert_to_genetic_map_position(genotyped_pos)
ungenotyped_cm = convert_to_genetic_map_position(ungenotyped_pos)
if genetic_map is None:
genotyped_cm = convert_to_genetic_map_position(genotyped_pos)
ungenotyped_cm = convert_to_genetic_map_position(ungenotyped_pos)
else:
genotyped_cm = genetic_map.get_cumulative_mass(genotyped_pos)
ungenotyped_cm = genetic_map.get_cumulative_mass(ungenotyped_pos)
# Calculate weights for ungenotyped markers.
weights = np.zeros(x, dtype=np.float32)
# Identify genotype markers k and k + 1 sandwiching marker i.
Expand Down Expand Up @@ -131,7 +136,7 @@ def get_mismatch_prob(pos, miscall_rate):
return mu


def get_switch_prob(pos, h, ne):
def get_switch_prob(pos, h, ne, genetic_map=None):
"""
Set switch probabilities at genotyped markers.
Expand All @@ -143,14 +148,18 @@ def get_switch_prob(pos, h, ne):
:param numpy.ndarray pos: Site positions.
:param int h: Number of reference haplotypes.
:param float ne: Effective population size.
:param msprime.RateMap genetic_map: Genetic map.
:return: Switch probabilities.
:rtype: numpy.ndarray
"""
assert pos[0] > 0, "Site positions are not 1-based."
assert isinstance(h, int), "Number of reference haplotypes is not an integer."
assert isinstance(ne, float), "Effective population size is not a float."
# Get genetic distance between adjacent markers.
cm = convert_to_genetic_map_position(pos)
if genetic_map is None:
cm = convert_to_genetic_map_position(pos)
else:
cm = genetic_map.get_cumulative_mass(pos)
d = np.zeros(len(pos), dtype=np.float32)
d[0] = cm[0]
d[1:] = cm[1:] - cm[:-1]
Expand Down Expand Up @@ -225,7 +234,7 @@ def compute_backward_probability_matrix(ref_h, query_h, rho, mu):
:return: Backward probability matrix.
:rtype: numpy.ndarray
"""
alleles = np.arange(3) # ACGT encoding
alleles = np.arange(4) # ACGT
assert np.all(
np.isin(ref_h, alleles)
), "Reference haplotypes have non-ACGT alleles."
Expand Down Expand Up @@ -313,7 +322,7 @@ def _compute_state_probability_matrix(fm, bm, ref_h, query_h, rho, mu):
backward haplotype probability matrix.
:rtype: tuple(numpy.ndarray, numpy.ndarray, numpy.ndarray)
"""
alleles = np.arange(3) # ACGT encoding
alleles = np.arange(4) # ACGT encoding
m = fm.shape[0]
h = fm.shape[1]
assert (m, h) == bm.shape
Expand Down Expand Up @@ -353,7 +362,7 @@ def _compute_state_probability_matrix(fm, bm, ref_h, query_h, rho, mu):


def _interpolate_allele_probabilities(
fwd_hap_probs, bwd_hap_probs, genotyped_pos, ungenotyped_pos
fwd_hap_probs, bwd_hap_probs, genotyped_pos, ungenotyped_pos, genetic_map=None
):
"""
WARN: This function is part of an attempt to faithfully reimplement
Expand All @@ -372,6 +381,7 @@ def _interpolate_allele_probabilities(
:param numpy.ndarray bwd_hap_probs: Backward haplotype probability matrix.
:param numpy.ndarray genotyped_pos: Site positions of genotyped markers.
:param numpy.ndarray ungenotyped_pos: Site positions of ungenotyped markers.
:param msprime.RateMap genetic_map: Genetic map.
:return: Interpolated allele probabilities.
:rtype: numpy.ndarray
"""
Expand All @@ -381,9 +391,9 @@ def _interpolate_allele_probabilities(
assert (m, 2) == bwd_hap_probs.shape
genotyped_cm = convert_to_genetic_map_position(genotyped_pos)
assert m == len(genotyped_cm)
imputed_cm = convert_to_genetic_map_position(ungenotyped_pos)
assert x == len(imputed_cm)
weights, _ = get_weights(genotyped_cm, imputed_cm)
ungenotyped_cm = convert_to_genetic_map_position(ungenotyped_pos)
assert x == len(ungenotyped_cm)
weights, _ = get_weights(genotyped_pos, ungenotyped_pos, genetic_map=genetic_map)
assert x == len(weights)
i_hap_probs = np.zeros((x, 2), dtype=np.float32)
for i in np.arange(x):
Expand All @@ -394,7 +404,7 @@ def _interpolate_allele_probabilities(
k = m - 2
else:
k = np.max(np.where(ungenotyped_pos[i] > genotyped_pos)[0])
# Vectorised over the allelic states
# Vectorised over the allelic states.
i_hap_probs[i, :] = weights[i] * bwd_hap_probs[k, :]
i_hap_probs[i, :] += (1 - weights[i]) * fwd_hap_probs[k, :]
return i_hap_probs
Expand Down Expand Up @@ -433,7 +443,9 @@ def compute_state_probability_matrix(fm, bm):
return sm


def interpolate_allele_probabilities(sm, ref_h, genotyped_pos, ungenotyped_pos):
def interpolate_allele_probabilities(
sm, ref_h, genotyped_pos, ungenotyped_pos, genetic_map=None
):
"""
Compute the interpolated allele probabilities at ungenotyped markers of
a query haplotype following Equation 1 of BB2016.
Expand All @@ -451,6 +463,7 @@ def interpolate_allele_probabilities(sm, ref_h, genotyped_pos, ungenotyped_pos):
:param numpy.ndarray ref_h: Reference haplotypes subsetted to ungenotyped markers.
:param numpy.ndarray genotyped_pos: Site positions at genotyped markers.
:param numpy.ndarray ungenotyped_pos: Site positions at ungenotyped markers.
:param msprime.RateMap genetic_map: Genetic map.
:return: Interpolated allele probabilities.
:rtype: numpy.ndarray
"""
Expand All @@ -462,7 +475,9 @@ def interpolate_allele_probabilities(sm, ref_h, genotyped_pos, ungenotyped_pos):
assert m == len(genotyped_pos)
x = len(ungenotyped_pos)
assert (x, h) == ref_h.shape
weights, marker_interval_start = get_weights(genotyped_pos, ungenotyped_pos)
weights, marker_interval_start = get_weights(
genotyped_pos, ungenotyped_pos, genetic_map=genetic_map
)
assert x == len(weights)
assert x == len(marker_interval_start)
p = np.zeros((x, len(alleles)), dtype=np.float32)
Expand Down Expand Up @@ -495,16 +510,18 @@ def get_map_alleles(allele_probs):
:return: Imputed alleles and their associated probabilities.
:rtype: tuple(numpy.ndarray, numpy.ndarray)
"""
assert np.all(allele_probs >= 0), "Some allele probabilities are negative."
assert not np.any(np.isnan(allele_probs)), "Some allele probabilities are NaN."
assert np.all(allele_probs >= 0)
assert not np.any(np.isnan(allele_probs))
imputed_alleles = np.argmax(allele_probs, axis=1)
max_allele_probs = np.max(allele_probs, axis=1)
assert len(imputed_alleles) == allele_probs.shape[0]
assert len(max_allele_probs) == allele_probs.shape[0]
return (imputed_alleles, max_allele_probs)


def run_beagle(ref_h, query_h, pos, miscall_rate=0.0001, ne=1e6, debug=False):
def run_beagle(
ref_h, query_h, pos, miscall_rate=0.0001, ne=1e6, genetic_map=None, debug=False
):
"""
Run a simplified version of the BEAGLE 4.1 imputation algorithm
based on Equation 1 of BB2016.
Expand All @@ -522,6 +539,7 @@ def run_beagle(ref_h, query_h, pos, miscall_rate=0.0001, ne=1e6, debug=False):
:param numpy.ndarray pos: Site positions of all the markers.
:param float miscall_rate: Allelic miscall rate.
:param int ne: Effective population size.
:param msprime.RateMap genetic_map: Genetic map.
:param bool debug: Whether to print intermediate results.
:return: Imputed alleles and their associated probabilities.
:rtype: tuple(numpy.ndarray, numpy.ndarray)
Expand Down Expand Up @@ -594,7 +612,7 @@ def run_beagle(ref_h, query_h, pos, miscall_rate=0.0001, ne=1e6, debug=False):
# Interpolate allele probabilities at ungenotyped markers.
alleles = np.arange(4) # ACGT
i_allele_probs = interpolate_allele_probabilities(
sm, ref_h_ungenotyped, genotyped_pos, ungenotyped_pos
sm, ref_h_ungenotyped, genotyped_pos, ungenotyped_pos, genetic_map=genetic_map
)
if debug:
print("Interpolated allele probabilities")
Expand Down

0 comments on commit 44c7203

Please sign in to comment.