forked from Illumina/GTCtoVCF
-
Notifications
You must be signed in to change notification settings - Fork 0
/
GenotypeFormat.py
334 lines (273 loc) · 12.6 KB
/
GenotypeFormat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
from itertools import combinations_with_replacement
from vcf.parser import _Format
from IlluminaBeadArrayFiles import code2genotype
CHANNEL_MAP = {"A": "T", "T": "A", "C": "G", "G": "C"}
def get_expected_ploidy(gender, chrom):
"""
Determine expected ploidy of call based on sample's gender and chromosome. Unknown genders are processed as diploid.
Args:
gender (string): M,F or U
chrom (string): chromosome, PAR values should be represented as XY
Returns
int: value of expected ploidy, currently set at 1 or 2
"""
if (gender == b"M" and chrom == "X") or chrom == "Y" or chrom == "MT":
return 1
return 2
def format_vcf_genotype(vcf_allele1_char, vcf_allele2_char, ploidy):
"""
Create a VCF representation of the genotype based on alleles. Format appropriately if haploid.
Args:
vcf_allele1_char (string): 0,1,2 etc.
vcf_allele2_char (string): 0,1,2, etc.
vcf_record (vcf._Record): Record for the entry analyzed
ploidy(int): Expected ploidy.
Returns
string: String representation of genotype (e.g., "0/1"), adjusted for haploid calls if applicable
"""
assert ploidy < 3
# haploid calls
if ploidy == 1:
if vcf_allele1_char == vcf_allele2_char:
return str(vcf_allele1_char)
vcf_genotype = ""
if vcf_allele2_char < vcf_allele1_char:
vcf_genotype = str(vcf_allele2_char) + "/" + str(vcf_allele1_char)
else:
vcf_genotype = str(vcf_allele1_char) + "/" + str(vcf_allele2_char)
return vcf_genotype
def convert_ab_genotype_to_nucleotide(ab_genotype, plus_strand_alleles):
"""
Convert an integer genotype to nucleotide genotype on the plus strand
Args:
ab_genotype (int): 0 (no call), 1 (homozygous A), 2 (het), or 3 (homozygous B)
plus_strand_alleles (tuple(string)): Tuple of length 2 with assay alleles (e.g., from SNP column of manifest)
Returns
tuple(string): Tuple of length 2 with nucleotide alleles on plus strand (e.g., ('A', 'C')). NC is ('-', '-')
"""
if ab_genotype == 0:
return ('-', '-')
return tuple([plus_strand_alleles[0] if ab_allele == "A" else plus_strand_alleles[1] for ab_allele in code2genotype[ab_genotype]])
def convert_indel_genotype_to_vcf(nucleotide_genotypes, vcf_record, is_deletion, ploidy):
"""
For indel, convert indel SNP genotype (e.g., I/D) into VCF genotype (e.g, 0/1)
Args:
nucleotide_genotypes (string,string): SNP genotype from manifest (e.g., ('D', 'I'))
vcf_record (vcf._Record): Corresponding VCF record (define reference and alternate allele)
is_deletion (bool): Whether the BPM record that produced the nucleotide genotypes is a reference deletion
ploidy (int): Expected ploidy
Returns:
string: VCF genotype (e.g, "0/1")
"""
if not nucleotide_genotypes:
return format_vcf_genotype(".", ".", ploidy)
if len(nucleotide_genotypes) > 1:
for nucleotide_genotype in nucleotide_genotypes:
if nucleotide_genotype != nucleotide_genotypes[0]:
return format_vcf_genotype(".", ".", ploidy)
nucleotide_genotype = nucleotide_genotypes[0]
if is_deletion:
vcf_allele1_char = "0" if nucleotide_genotype[0] == "I" else "1"
vcf_allele2_char = "0" if nucleotide_genotype[1] == "I" else "1"
else:
vcf_allele1_char = "1" if nucleotide_genotype[0] == "I" else "0"
vcf_allele2_char = "1" if nucleotide_genotype[1] == "I" else "0"
vcf_genotype = format_vcf_genotype(
vcf_allele1_char, vcf_allele2_char, ploidy)
return vcf_genotype
def convert_nucleotide_genotype_to_vcf(nucleotide_genotype, vcf_record, ploidy):
"""
Convert a nucleotide genotype (on the plus strand) to a vcf genotype. For indels,
use "convert_indel_genotype_to_vcf"
Args:
nucleotide_genotype (tuple(string)): Tuple of length 2. Each element is nucleotide allele
No call should be represented as ("-", "-")
ploidy(int): value of estimated ploidy.
Returns
string: VCF genotype (e.g, "0/1")
"""
assert len(nucleotide_genotype) == 2
if nucleotide_genotype[0] == "-" and nucleotide_genotype[1] == "-":
return format_vcf_genotype(".", ".", ploidy)
try:
vcf_allele1_char = vcf_record.alleles.index(nucleotide_genotype[0])
vcf_allele2_char = vcf_record.alleles.index(nucleotide_genotype[1])
except:
raise Exception(
"Could not index alleles in VCF record " + vcf_record.ID)
vcf_genotype = format_vcf_genotype(
vcf_allele1_char, vcf_allele2_char, ploidy)
return vcf_genotype
class RecordCombiner(object):
"""
Class to take in a group of BPM records and output a combined genotype
"""
def __init__(self, bpm_records, genotypes, logger):
"""
Create new RecordCombiner
Args:
bpm_records (list(BPMRecord)): Group of BPM records for a single site (typically just one)
genotypes (list(genotypes)): List of all genotypes in GTC file as integers
Returns:
RecordCombiner
"""
self._bpm_records = bpm_records
self._genotypes = genotypes
self._logger = logger
def _generate_possible_genotypes(self):
"""
From the alleles in the BPM records, enumerate all possible genotypes
at this site
Args:
None
Returns:
list(list(string)) - A list of lists of length 2. Each inner list represents a possible
genotype at this site in terms of pair of nucleotide alleles on the plus strand
"""
alleles = set()
for record in self._bpm_records:
alleles.update(record.plus_strand_alleles)
return list(combinations_with_replacement(alleles, 2))
def _record_inconsistent_with_genotype(self, record, genotype):
"""Check if a particular BPM record is inconsitent with a given
genotype. Genotype should be tuple of alleles on plus strand.
Args:
record (BPMRecord)
genotype (tuple(string)) : Tuple of length 2 where each element is nucleotide allele
on plus strand
Returns:
bool
"""
# record_genotype is an integer (0 - NC, 1 - AA, 2 - AB, 3 - BB)
record_int_genotype = self._genotypes[record.index_num]
if record_int_genotype == 0:
return False
plus_strand_alleles = record.plus_strand_alleles
record_plus_genotype = convert_ab_genotype_to_nucleotide(
record_int_genotype, plus_strand_alleles)
for allele in record_plus_genotype:
# check for alleles that must be present in a consistent genotype
consistent_alleles = []
consistent_alleles.append(allele)
if record.assay_type == 0: # Inf II
consistent_alleles.append(CHANNEL_MAP[allele])
if not any([consistent_allele in genotype for consistent_allele in consistent_alleles]):
return True
# check for alleles that must be absent in a consistent genotype
absent_alleles = []
if record_int_genotype == 1 or record_int_genotype == 3: # homozygous
# for example, if assay is A/C Inf II and genotype is AA, real
# genotype can not contain C or G
absent_allele = plus_strand_alleles[0] if record_int_genotype == 3 else plus_strand_alleles[1]
absent_alleles.append(absent_allele)
if record.assay_type == 0: # Inf II
absent_alleles.append(CHANNEL_MAP[absent_allele])
if any([absent_allele in genotype for absent_allele in absent_alleles]):
return True
return False
def _filter_inconsistent_genotypes(self, possible_genotypes):
"""Filter the list of possible genotypes to remove
those that are inconsitent with any BPM record in this group
Args
possible_genotypes (list(list(string)) - List of possible genotypes. Each possible genotype is a list of
length 2 where each string is a nucleotide on the plus strand
Returns
list(list(string)) - List of genotypes consitent with assay data. Each remaining genotype is a list of
length 2 where each string is a nucleotide on the plus strand
"""
idx2inconsistent = [False] * len(possible_genotypes)
for idx in range(len(possible_genotypes)):
for record in self._bpm_records:
if self._record_inconsistent_with_genotype(record, possible_genotypes[idx]):
idx2inconsistent[idx] = True
break
return [genotype for (genotype, is_inconsistent) in zip(possible_genotypes, idx2inconsistent) if not is_inconsistent]
def combine_genotypes(self):
"""
Generate the combined genotype from all assays at this site
Args:
None
Returns:
(string, string): The combined genotype (on the plus strand) at this site (e.g., ("A", "C") ) No call is ("-", "-")
"""
possible_genotypes = self._generate_possible_genotypes()
# filter any genotypes which may be ambiguous due to the presence of
# InfII assays
allowable_genotypes = self._filter_inconsistent_genotypes(
possible_genotypes)
# if only one consistent genotype, then that is the genotype. Otherwise, the genotype
# is ambiguous (more than 1) or inconsistent (less than 1) and return a
# no-call
if len(allowable_genotypes) == 1:
return allowable_genotypes[0]
return ("-", "-")
def combine_names(self):
"""
Generate the combined name for thi sgroup of records
Args:
None
Returns:
string: The combined names
"""
record_names = []
for record in self._bpm_records:
record_names.append(record.name)
return ",".join(sorted(record_names))
class GenotypeFormat(object):
"""
Generate GT format information for VCF
"""
def __init__(self, logger, gender, genotypes):
self._gender = gender
self._genotypes = genotypes
self._logger = logger
@staticmethod
def get_id():
return "GT"
@staticmethod
def get_description():
return "Genotype"
@staticmethod
def get_format_obj():
return _Format(GenotypeFormat.get_id(), 1, "String", GenotypeFormat.get_description())
def generate_sample_format_info(self, bpm_records, vcf_record, sample_name):
"""
Get the sample genotype
Args:
bpm_records (list(BPMRecord)): List of BPM records
vcf_record (vcf._Record): Corresponding VCF record
sample_name (string): The sample name
Returns:
string: GT sample format string (e.g., "0/1")
"""
# use chrom from bpm_records where PAR chromsomes have not been
# converted
ploidy = get_expected_ploidy(self._gender, bpm_records[0].chromosome)
if any(record.is_indel() for record in bpm_records):
assert all(record.is_indel() for record in bpm_records)
nucleotide_genotypes = []
for record in bpm_records:
int_genotype = self._genotypes[record.index_num]
if int_genotype != 0:
nucleotide_genotypes.append(convert_ab_genotype_to_nucleotide(
int_genotype, record.plus_strand_alleles))
vcf_genotype = convert_indel_genotype_to_vcf(
nucleotide_genotypes, vcf_record, bpm_records[0].is_deletion, ploidy)
else:
if len(bpm_records) > 1:
combiner = RecordCombiner(
bpm_records, self._genotypes, self._logger)
nucleotide_genotype = combiner.combine_genotypes()
vcf_genotype = convert_nucleotide_genotype_to_vcf(
nucleotide_genotype, vcf_record, ploidy)
vcf_record.ID = combiner.combine_names()
else:
sample_genotype = self._genotypes[bpm_records[0].index_num]
if sample_genotype == 0:
nucleotide_genotype = ('-', '-')
else:
nucleotide_genotype = convert_ab_genotype_to_nucleotide(
sample_genotype, bpm_records[0].plus_strand_alleles)
vcf_genotype = convert_nucleotide_genotype_to_vcf(
nucleotide_genotype, vcf_record, ploidy)
return vcf_genotype