forked from broadinstitute/viral-ngs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroad_utils.py
executable file
·129 lines (101 loc) · 3.98 KB
/
broad_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
"""
Utilities for getting sequences out of the Broad walk-up sequencing pipeline.
These utilities are probably not of much use outside the Broad.
"""
__author__ = "[email protected]"
__commands__ = []
import argparse
import logging
import os
import os.path
import json
import glob
import util.cmd
import util.file
log = logging.getLogger(__name__)
# ==========================================
# *** get stuff from Picard json file ***
# ==========================================
def get_json_from_picard(picardDir):
''' for example, /seq/walkup/picard/{flowcell_minus_first_char} '''
analysisDir = max(
(os.path.getmtime(os.path.join(picardDir, d)), d) for d in os.listdir(picardDir)
if os.path.isdir(os.path.join(picardDir, d)))[1]
jsonfile = list(glob.glob(os.path.join(picardDir, analysisDir, 'info', 'logs', '*.json')))
if len(jsonfile) != 1:
raise Exception("error")
return jsonfile[0]
def get_run_date(jsonfile):
with open(jsonfile, 'rt') as inf:
runDate = json.load(inf)['workflow']['runDate']
return runDate
def get_bustard_dir(jsonfile):
with open(jsonfile, 'rt') as inf:
bustard = json.load(inf)['workflow']['runFolder']
return bustard
def parser_get_bustard_dir(parser=argparse.ArgumentParser()):
parser.add_argument('inDir', help='Picard directory')
util.cmd.common_args(parser, (('loglevel', 'ERROR'),))
util.cmd.attach_main(parser, main_get_bustard_dir)
return parser
def main_get_bustard_dir(args):
'Find the basecalls directory from a Picard directory'
print(get_bustard_dir(get_json_from_picard(args.inDir)))
return 0
__commands__.append(('get_bustard_dir', parser_get_bustard_dir))
def parser_get_run_date(parser=argparse.ArgumentParser()):
parser.add_argument('inDir', help='Picard directory')
util.cmd.common_args(parser, (('loglevel', 'ERROR'),))
util.cmd.attach_main(parser, main_get_run_date)
return parser
def main_get_run_date(args):
'Find the sequencing run date from a Picard directory'
print(get_run_date(get_json_from_picard(args.inDir)))
return 0
__commands__.append(('get_run_date', parser_get_run_date))
# ===============
# *** misc ***
# ===============
def iterate_wells(runfile):
for lane in util.file.read_tabfile_dict(runfile):
for well in util.file.read_tabfile_dict(lane['barcode_file']):
yield (lane, well)
def get_all_samples(runfile):
return list(sorted(set(well['sample'] for lane, well in iterate_wells(runfile))))
def get_all_libraries(runfile):
return list(sorted(set(well['sample'] + '.l' + well['library_id_per_sample'] for lane, well in iterate_wells(
runfile))))
def get_run_id(well):
run_id = well['sample']
if well.get('library_id_per_sample'):
run_id += '.l' + well['library_id_per_sample']
if well.get('run_id_per_library'):
run_id += '.r' + well['run_id_per_library']
return run_id
def get_all_runs(runfile):
return list(sorted(get_run_id(well) + '.' + lane['flowcell'] + '.' + lane['lane'] for lane, well in iterate_wells(
runfile)))
def parser_get_all_names(parser=argparse.ArgumentParser()):
parser.add_argument('type', help='Type of name', choices=['samples', 'libraries', 'runs'])
parser.add_argument('runfile', help='File with seq run information')
util.cmd.common_args(parser, (('loglevel', 'ERROR'),))
util.cmd.attach_main(parser, main_get_all_names)
return parser
def main_get_all_names(args):
'Get all samples'
if args.type == 'samples':
method = get_all_samples
elif args.type == 'libraries':
method = get_all_libraries
elif args.type == 'runs':
method = get_all_runs
for s in method(args.runfile):
print(s)
return 0
__commands__.append(('get_all_names', parser_get_all_names))
# =======================
def full_parser():
return util.cmd.make_parser(__commands__, __doc__)
if __name__ == '__main__':
util.cmd.main_argparse(__commands__, __doc__)