-
Notifications
You must be signed in to change notification settings - Fork 1
/
fw_count.py
54 lines (45 loc) · 2.05 KB
/
fw_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from fireworks import Firework, LaunchPad, Workflow, ScriptTask
from firetasks import TrimTask, AlignTask, SortTask, CountTask
import seq_functions
import argparse
import os
import collections
import yaml
def main(sequencing_directory, library_prefix, num_libraries, raw_data_dir):
lpad = LaunchPad(**yaml.load(open("my_launchpad.yaml")))
workflow_fireworks = []
workflow_dependencies = collections.defaultdict(list)
library_dirs = [os.path.join(sequencing_directory, library_prefix + str(i + 1)) for i in xrange(num_libraries)]
subdirs = ['unzipped', 'trimmed', 'aligned', 'bammed', 'sorted', 'counted', 'pythonized']
for library_dir in library_dirs:
seq_functions.make_directories(library_dir, subdirs)
name = "Sort_%s" % os.path.basename(library_dir)
fw_sort = Firework(
[
SortTask(library_path = library_dir, aligned_name = "aligned", bammed_name = "bammed", sorted_name = "sorted")
],
name = name,
spec = {"_queueadapter": {"job_name": name}},
)
workflow_fireworks.append(fw_sort)
name = "Count_%s" % os.path.basename(library_dir)
fw_count = Firework(
[
CountTask(library_path = library_dir, aligned_name = "aligned", bammed_name = "bammed", counted_name = "counted")
],
name = name,
spec = {"_queueadapter": {"job_name": name}},
)
workflow_fireworks.append(fw_count)
workflow_dependencies[fw_sort].append(fw_count)
lpad.add_wf(
Workflow(workflow_fireworks, links_dict = workflow_dependencies)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("sequencing_directory", help = "Directory to operate on", type = str)
parser.add_argument("--library_prefix", help = "Prefix for library subdirectories", type = str, default = "library")
parser.add_argument("--num_libraries", help = "Number of libraries to process", type = int, default = 10)
parser.add_argument("--raw_data_dir", help = "Raw data directory name", type = str, default = "Raw_Data")
args = parser.parse_args().__dict__
main(args["sequencing_directory"], args["library_prefix"], args["num_libraries"], args["raw_data_dir"])