-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathconvertHeaders.py
127 lines (105 loc) · 4.54 KB
/
convertHeaders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import argparse
import gzip
# Rewrite as illumina header
def convertIllumina(elements_dict):
template_str = "@{instrument}:{run_number}:{flowcell_id}:{lane}:{tile}:{x_pos}:{y_pos} {read}:N:0:{sample_barcode}\n"
formatted_string = template_str.format(instrument = elements_dict["instrument"],
run_number = elements_dict["run_number"],
flowcell_id = elements_dict["flowcell-id"],
lane = elements_dict["lane"],
tile = elements_dict["tile"],
x_pos = elements_dict["x-pos"],
y_pos = elements_dict["y-pos"],
read = elements_dict["read_no"],
sample_barcode = elements_dict["sample_barcode"])
return(formatted_string)
# Collect elements from BGI format
# Please note that you will need to attach the sample barcode sequence
# to the headers of the input FASTQ file
# eg.
def parseBGI(header):
# Extract sample barcode sequence
if header[-10] == "_":
# Get sample barcode
header_split1 = header.split("_")
sample_barcode = header_split1[-1]
header = header.replace("_" + sample_barcode, "")
# Get basic information from split
# Takes into account presence of sample name
# Even if the sample name has underscores, it will take the
# last set of values which will be the header information
if "_" in header:
header = header.split("_")
header_split1 = header[-1]
else:
header_split1 = header
header_split2 = header_split1.split("/")
paired_read_direction = int(header_split2[1])
# Get rest of the information from the header
seq_header = header_split2[0]
seq_header = seq_header.replace("@", "")
# First 10 charcters of sequence header, after sample number
# are the flowcell number
flowcell_id = seq_header[:10]
# Followed by flowcell
lane_id = seq_header[10:12]
lane_id = int(lane_id.replace("L", ""))
# Get location of C (x location)
x_id = seq_header[12:16]
x_id = int(x_id.replace("C", ""))
# Get location of R (y location)
y_id = seq_header[16:20]
y_id = int(y_id.replace("R", ""))
# Get run number (last part of the string)
tile_number = int(seq_header[20:])
# Pack into dictionary
elements_dict = {"instrument": "MGISEQ-2000", "run_number": 1, "sample_barcode": sample_barcode, "lane": lane_id, "x-pos": x_id, "y-pos": y_id, "tile": tile_number, "flowcell-id": flowcell_id, "read_no": paired_read_direction}
return elements_dict
else:
print("Sample index barcode not detected. Please check you have attached the barcode with attachBarcodes.sh.")
sys.exit()
# Parses the entry
def parseLine(x):
if x.startswith("@"):
elements_dict = parseBGI(x)
converted_header = convertIllumina(elements_dict)
return(converted_header)
else:
return(x)
def open_file(input_file, mode = "rb"):
if ".gz" in input_file:
input_fastq = gzip.open(input_file, mode)
else:
input_fastq = open(input_file, mode)
return(input_fastq)
# Retrieves arguments inputted by user
def parseArguments(args):
# Retrieve from parser
input_file = args.input
output_file = args.output
# Extract barcode from filename
return input_file, output_file
# Sets up arguments for user input
def setupParser():
# Set up argument parser
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", action = "store", help = "fastq.gz file generated by BGI platform.", type = str, required = True)
parser.add_argument("-o", "--output", action = "store", help = "Name of output converted file.", type = str, required = True)
args = parser.parse_args()
return(args)
# Code starts here
if __name__ == "__main__":
args = setupParser()
input_file, output_file = parseArguments(args)
input_fastq = open_file(input_file, "rt")
output_fastq = open_file(output_file, "wt")
for line in input_fastq:
parsed_line = parseLine(line)
output_fastq.write(parsed_line)
input_fastq.close()
output_fastq.close()
print("Conversion of %s complete!" % input_file)