-
Notifications
You must be signed in to change notification settings - Fork 0
/
extract_fasta_around_a_position.py
executable file
·95 lines (71 loc) · 3.57 KB
/
extract_fasta_around_a_position.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# -*- coding: utf-8 -*-
"""
Created on Tue Dec 3 08:26:27 2013
@author: kalexiou
"""
import os.path as op
import os
from collections import OrderedDict as od
import subprocess as subp
import streamlit as st
import pysam
import pandas as pd
def infile2dict(file_contents):
chr2info = od()
for line in file_contents:
if line.startswith('#'):
continue
line = line.rstrip('\n\r').split('\t')
id_, chrom, pos = line[:3]
if chrom not in chr2info.keys():
pos2rest = od()
pos2rest[pos + '#' + id_] = line[3:]
chr2info[chrom] = pos2rest
return chr2info
def extract_fasta(fasta, range_, allele_info, infile_dict=od()):
output_list = list()
out_df = None
error_message = None
header = list()
for k, pos_dict in infile_dict.items():
for p, rest in pos_dict.items():
position, snpid = p.split('#')
ref = pysam.FastaFile(fasta)
string_coord = int(position) - 1
fasta_start_coord = int(string_coord) - range_
fasta_end_coord = int(string_coord) + range_
try:
l_seq = ref.fetch(k, fasta_start_coord,
string_coord) # using position because the right end of the string is open ( ref[start - end) )
genome_ref = ref.fetch(k, string_coord, string_coord + 1)
r_seq = ref.fetch(k, int(position), fasta_end_coord + 1)
if allele_info == 'yes':
header = ['#id', 'chrom', 'pos', 'genome_ref_allele', 'ref', 'alt', 'sequence']
refallele = rest[0].upper()
altallele = rest[1].upper()
if len(refallele) == len(altallele) and len(refallele) == 1: # it is a snp
output_list.append([snpid, k, position, genome_ref,refallele, altallele,
l_seq + '[' + refallele + '/' + altallele + ']' + r_seq])
else:
right_seq = ''
if len(refallele) > 1 and len(altallele) > 1:
st.error('Site {0}:{1} is a MNP'.format(k, position))
continue
elif (len(refallele) > 1 or len(altallele) > 1) and len(refallele) > len(
altallele): # it is a deletion
right_seq = ref.fetch(k, string_coord + len(refallele), fasta_end_coord + len(refallele))
elif (len(refallele) != 1 or len(altallele) != 1) and len(refallele) < len(
altallele): # it is an insertion
right_seq = r_seq
output_list.append([snpid, k, position, genome_ref, refallele, altallele,
l_seq + '[' + refallele + '/' + altallele + ']' + right_seq])
if allele_info == 'no':
header = ['#id', 'chrom', 'pos', 'genome_ref_allele', 'sequence']
output_list.append([snpid, k, position, genome_ref,
l_seq + '@' + r_seq])
out_df = pd.DataFrame(data=output_list, columns=header)
except KeyError:
error_message = "It seems that the input file's chromosome or protein IDs are not found in the " \
"fasta file selected above. Make sure that you have selected the correct fasta file " \
"from the drop-down menu."
return out_df, error_message