From a72eb62f13818842c513eac1b8879573b065bd6a Mon Sep 17 00:00:00 2001 From: averagehat Date: Thu, 21 Jan 2016 15:54:06 -0500 Subject: [PATCH 1/4] added pydoit example stuff --- pydoit/doit_ex.py | 94 +++++++++++++++++++++++++++++++++++++++++ pydoit/helpers.py | 82 +++++++++++++++++++++++++++++++++++ pydoit/requirements.txt | 2 + 3 files changed, 178 insertions(+) create mode 100644 pydoit/doit_ex.py create mode 100644 pydoit/helpers.py create mode 100644 pydoit/requirements.txt diff --git a/pydoit/doit_ex.py b/pydoit/doit_ex.py new file mode 100644 index 0000000..76caeab --- /dev/null +++ b/pydoit/doit_ex.py @@ -0,0 +1,94 @@ +from fn import _ +from helpers import D, F, fifo, to_fifo, group_fastqs +#NOTE: +''' + they're dropping python2 support in next release + it's by modification time + the output will always have to be created as a fifo, but not the input + to wrap "targets" in fifo + 'task_dep' does exist +''' + +class Opts(object): pass +opts = Opts() +opts.cutadapt = { 'headcrop' : 33 } +opts.bwa = { 'threads' : 1, 'keep_temp' : False} +opts.ref = F("ref.fasta") + +'''example tasks''' +def task_index_ref(): + return { 'file_dep' : [opts.ref], + 'actions' : ['bwa index %(dependencies)'], + 'targets' : [opts.ref.fai] } +def task_merge_bam(): + return { 'task_dep' : 'mapping'} + + +DIR="." +R1s, R2s, unpaireds = group_fastqs(DIR) +R1, R2, unpaired, paired = map(F, ["R1", "R2", "unpaired", "paired"]) + +forward = compose(sorted, partial(glob1, pattern="*_R1_*")) +reverse = compose(sorted, partial(glob1, pattern="*_R2_*")) +def unpaired(dir): return set(glob1(dir, '*')) - set(forward(dir) + reverse(dir)) +def group_fastqs(dir): return forward(dir), reverse(dir), unpaired(dir) + +merge_files = D({ + 'actions' : ['cat %(dependencies) %(targets)']}) + +# _.cutdadapt = lambda x: x.cudadapt +task_R1 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R1s), targets=R1.fastq) +task_R2 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R2s), targets=R2.fastq) +task_unpaired = lambda: merge_files.assoc(file_dep=map(_.cutadapt, unpaireds), targetsunpaired.fastq) + + +mapping = D({'file_dep' : [opts.ref], + 'actions' : [bwa_map, [], opts.bwa]}) + +mapping_up = mapping.apply(file_dep = _ + [unpaired.fastq]).assoc(targets=[unpaired.bam]) +mapping_paired = mapping.apply(file_dep = _ + [R1.fastq, R2.fastq]).assoc(targets=[paried.bam]) +task_maping_up = lambda : mapping_up +task_maping_paired = lambda : mapping_paired + + + +@fifo('targets') +def bwa_map(dependencies, targets, threads, keep_temp): + sh.bwa.mem(*dependencies, t=threads, _out=targets[0]) + +R1s, R2s, unpaireds = group_fastqs(files) + +def task_run_cutadapt(): + for fqs in zip(R1s, R2s) + unpaireds: + yield gen_cutdapt(fqs) + +def gen_cutdapt(*fqs): + func = cutadapt_paired if len(fqs) > 1 else cutadapt_up + return { + 'file_dep' : fqs, + 'targets' : map(_.cutadapt, fqs), + 'actions' : [to_fifo(func, 'targets'), [], opts.cutadapt]} + +def cutadapt_paired(dependencies, targets, quality): + sh.cutadapt(o=targets[0], p=targets[1], q="{0},{0}".format(quality), *dependencies) + +def cutadapt_up(dependencies, targets, quality): + sh.cutadapt(q=quality, o=targets[0], *dependencies) + + #'actions': ["cutadapt %(dependencies)"] # not work + +#fwd, rev <- fwd, rev : cutadapt_paired + + +def task_sff2fastq(): + convert_sff = lambda i,o: SeqIO.convert(i, 'sff', o, 'fastq') + convert_sff = to_fifo(convert_sff, 'o') + for sff in sffs: + yield {'target' : [swap_ext(sff, 'fastq')], + 'file_deps' : [sff], + 'actions' : convert_sff} +''' +actions take kwargs. so add the dict associated with a task (e.g., ngs_filter) +dynamically, based on func name # +dy of task-creators are executed even if the task is not going to be executed. +''' diff --git a/pydoit/helpers.py b/pydoit/helpers.py new file mode 100644 index 0000000..3494da2 --- /dev/null +++ b/pydoit/helpers.py @@ -0,0 +1,82 @@ +import os +from glob import glob# ls = partial(glob.glob1, ".") +from functools import wraps + + +class F (str): + """represents a file""" + def __div__(self, other): + return F(os.path.join(self, other)) + + def __rdiv__(self, other): + return F(os.path.join(other, self)) + + def __getattr__(self, ext): + try: + return super(self.__class__, self).__getattr__(ext) + except: + return F('{0}.{1}'.format(self, ext)) + +def assoc_all(d, **kwargs): + dict = d.copy() + for k,v in kwargs.items(): + dict[k] = v + return dict + +def update_all(d, **kwargs): + dict = d.copy() + for k,f in kwargs.items(): + dict[k] = f(dict[k]) + return dict + +class D (dict): + """dictionary with extra methods""" + + def assoc(self, **kwargs): + return assoc_all(self, **kwargs) + + def apply(self, **kwargs): + return update_all(self, **kwargs) + +def to_fifo(func, *argnames): #works as composition + code = func.func_code + names = code.co_varnames[:code.co_argcount] + @wraps(func) + def decorated(*args,**kwargs): + for argname in argnames: + argval = kwargs.get(argname, args[names.index(argname)]) +# try: +# argval = args[names.index(argname)] +# except ValueError: +# argval = kwargs[argname] + if hasattr(argval, '__iter__'): + for f in argval: + os.mkfifo(f) + else: + os.mkfifo(argval) + + return func(*args, **kwargs) + return decorated + +#decorator +def fifo(*argnames): + def decorated(func): + return to_fifo(func, *argnames) + return decorated + +fwd, rev = glob("*_R1_*.fastq"), glob("*_R2_*.fastq") +ext = lambda s: s.split('.')[-1] +swap_ext = lambda s, ext: '.'.join(s.split('.')[:-1] + [ext]) + + +#{ +# cutadapt_up : filter_up, +# filter_up : get_input_up, +# cutadapt_paired : filter_paired, +# filter_paired : get_input_paired, +# bwa_up : cutadapt_up, +# bwa_paired : cutadapt_paired, +# samtools_merge : [bwa_up, bwa_paired], +# samtools_index : samtools_merge, +# freebayes : [samtools_index +# diff --git a/pydoit/requirements.txt b/pydoit/requirements.txt new file mode 100644 index 0000000..e1ced4b --- /dev/null +++ b/pydoit/requirements.txt @@ -0,0 +1,2 @@ +doit +fn From 390cb4e790de139c924790c8e243d3e7917e9201 Mon Sep 17 00:00:00 2001 From: averagehat Date: Thu, 21 Jan 2016 15:58:23 -0500 Subject: [PATCH 2/4] little fixes --- pydoit/doit_ex.py | 16 ++++++---------- pydoit/helpers.py | 8 ++++++-- pydoit/requirements.txt | 1 + 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pydoit/doit_ex.py b/pydoit/doit_ex.py index 76caeab..9dd5e13 100644 --- a/pydoit/doit_ex.py +++ b/pydoit/doit_ex.py @@ -24,22 +24,18 @@ def task_merge_bam(): return { 'task_dep' : 'mapping'} -DIR="." -R1s, R2s, unpaireds = group_fastqs(DIR) -R1, R2, unpaired, paired = map(F, ["R1", "R2", "unpaired", "paired"]) - -forward = compose(sorted, partial(glob1, pattern="*_R1_*")) -reverse = compose(sorted, partial(glob1, pattern="*_R2_*")) -def unpaired(dir): return set(glob1(dir, '*')) - set(forward(dir) + reverse(dir)) -def group_fastqs(dir): return forward(dir), reverse(dir), unpaired(dir) - +''' examples of programmatically created tasks ''' merge_files = D({ 'actions' : ['cat %(dependencies) %(targets)']}) # _.cutdadapt = lambda x: x.cudadapt task_R1 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R1s), targets=R1.fastq) task_R2 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R2s), targets=R2.fastq) -task_unpaired = lambda: merge_files.assoc(file_dep=map(_.cutadapt, unpaireds), targetsunpaired.fastq) +task_unpaired = lambda: merge_files.assoc(file_dep=map(_.cutadapt, unpaireds), targets=unpaired.fastq) + +DIR="." +R1s, R2s, unpaireds = group_fastqs(DIR) +R1, R2, unpaired, paired = map(F, ["R1", "R2", "unpaired", "paired"]) mapping = D({'file_dep' : [opts.ref], diff --git a/pydoit/helpers.py b/pydoit/helpers.py index 3494da2..ff10ccb 100644 --- a/pydoit/helpers.py +++ b/pydoit/helpers.py @@ -1,7 +1,11 @@ import os from glob import glob# ls = partial(glob.glob1, ".") -from functools import wraps - +from functools import wraps, partial +from toolz import compose +forward = compose(sorted, partial(glob1, pattern="*_R1_*")) +reverse = compose(sorted, partial(glob1, pattern="*_R2_*")) +def _unpaired(dir): return set(glob1(dir, '*')) - set(forward(dir) + reverse(dir)) +def group_fastqs(dir): return forward(dir), reverse(dir), _unpaired(dir) class F (str): """represents a file""" diff --git a/pydoit/requirements.txt b/pydoit/requirements.txt index e1ced4b..0848421 100644 --- a/pydoit/requirements.txt +++ b/pydoit/requirements.txt @@ -1,2 +1,3 @@ doit fn +toolz From 940fd5d8a4937777e6b8e8afafcb34cf10c1cb20 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Fri, 22 Jan 2016 09:35:33 -0500 Subject: [PATCH 3/4] fixed cat command --- pydoit/doit_ex.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pydoit/doit_ex.py b/pydoit/doit_ex.py index 9dd5e13..d16bc8f 100644 --- a/pydoit/doit_ex.py +++ b/pydoit/doit_ex.py @@ -26,7 +26,7 @@ def task_merge_bam(): ''' examples of programmatically created tasks ''' merge_files = D({ - 'actions' : ['cat %(dependencies) %(targets)']}) + 'actions' : ['cat %(dependencies) > %(targets)']}) # _.cutdadapt = lambda x: x.cudadapt task_R1 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R1s), targets=R1.fastq) @@ -69,8 +69,7 @@ def cutadapt_paired(dependencies, targets, quality): sh.cutadapt(o=targets[0], p=targets[1], q="{0},{0}".format(quality), *dependencies) def cutadapt_up(dependencies, targets, quality): - sh.cutadapt(q=quality, o=targets[0], *dependencies) - + sh.cutadapt(q=quality, o=targets[0], *dependencies) #'actions': ["cutadapt %(dependencies)"] # not work #fwd, rev <- fwd, rev : cutadapt_paired From 28c615a6114b624ff399816c642bff79f04d3361 Mon Sep 17 00:00:00 2001 From: michaelpanciera Date: Fri, 22 Jan 2016 11:00:08 -0500 Subject: [PATCH 4/4] reorganized and extended example --- pydoit/doit_ex.py | 132 ++++++++++++++++++++++++++++++---------------- 1 file changed, 88 insertions(+), 44 deletions(-) diff --git a/pydoit/doit_ex.py b/pydoit/doit_ex.py index d16bc8f..5e9fb73 100644 --- a/pydoit/doit_ex.py +++ b/pydoit/doit_ex.py @@ -1,5 +1,6 @@ from fn import _ -from helpers import D, F, fifo, to_fifo, group_fastqs +from helpers import D, F, fifo, to_fifo, group_fastqs, swap_ext +from glob import glob1 #NOTE: ''' they're dropping python2 support in next release @@ -9,79 +10,122 @@ 'task_dep' does exist ''' +'''Options''' class Opts(object): pass opts = Opts() opts.cutadapt = { 'headcrop' : 33 } opts.bwa = { 'threads' : 1, 'keep_temp' : False} +opts.tagreads = { 'CA' : "Foo"} +opts.freebayes = { 'observations' : 5, 'haplotype_length' : 50} + +'''Input files''' +DIR="." opts.ref = F("ref.fasta") +R1s, R2s, unpaireds = group_fastqs(DIR) +R1, R2, unpaired, paired, merged, consensus = map(F, ["R1", "R2", "unpaired", "paired", "merged", "consensus"]) +sffs = glob1(DIR, "*.sff") +unpaireds += [swap_ext(sff, 'fastq') for sff in sffs] '''example tasks''' def task_index_ref(): - return { 'file_dep' : [opts.ref], - 'actions' : ['bwa index %(dependencies)'], - 'targets' : [opts.ref.fai] } -def task_merge_bam(): - return { 'task_dep' : 'mapping'} + return { 'targets' : [opts.ref.fai], + 'file_dep' : [opts.ref], + 'actions' : ['bwa index %(dependencies)'] } + +def task_sff2fastq(): + @fifo('o') + def convert_sff(i, o): SeqIO.convert(i, 'sff', o, 'fastq') + + for sff in sffs: + yield {'target' : [swap_ext(sff, 'fastq')], + 'file_deps' : [sff], + 'actions' : convert_sff} + +''' examples of programmatically created tasks +tasks must be wrapped in functions starting with `task_` +(so using lambda: dict) ''' + + +def cutadapt_paired(dependencies, targets, quality): + sh.cutadapt(o=targets[0], p=targets[1], q="{0},{0}".format(quality), *dependencies) + +def cutadapt_up(dependencies, targets, quality): + sh.cutadapt(q=quality, o=targets[0], *dependencies) + #'actions': ["cutadapt %(dependencies)"] # not work + +def gen_cutdapt(fqs): + func = cutadapt_paired if len(fqs) > 1 else cutadapt_up + return { 'targets' : map(_.cutadapt, fqs), + 'file_dep' : fqs, + 'actions' : [to_fifo(func, 'targets'), [], opts.cutadapt] } + +def task_run_cutadapt(): + for fqs in zip(R1s, R2s) + unpaireds: + yield gen_cutdapt(fqs) -''' examples of programmatically created tasks ''' merge_files = D({ 'actions' : ['cat %(dependencies) > %(targets)']}) # _.cutdadapt = lambda x: x.cudadapt +# `.assoc` is like `.update` but returns a new dictionary without changing the old one task_R1 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R1s), targets=R1.fastq) task_R2 = lambda: merge_files.assoc(file_dep=map(_.cutadapt, R2s), targets=R2.fastq) task_unpaired = lambda: merge_files.assoc(file_dep=map(_.cutadapt, unpaireds), targets=unpaired.fastq) -DIR="." -R1s, R2s, unpaireds = group_fastqs(DIR) -R1, R2, unpaired, paired = map(F, ["R1", "R2", "unpaired", "paired"]) +@fifo('targets') +def bwa_map(dependencies, targets, threads, keep_temp): + '''`dependencies` and `targets` are lists auto-provided by + pydoit with `file_deps` and `targets` from the task dictionary. + The keyword arguments are provided by the dictionary in the + third element of `'actions'`, in this case `opts.bwa`''' + sh.bwa.mem(dependencies, t=threads, _out=targets[0]) mapping = D({'file_dep' : [opts.ref], - 'actions' : [bwa_map, [], opts.bwa]}) + 'actions' : [bwa_map, [], opts.bwa]}) #opts.bwa here sends the kwargs to `bwa_map` + +#`.apply(key=func)` applies the func to the value at that key mapping_up = mapping.apply(file_dep = _ + [unpaired.fastq]).assoc(targets=[unpaired.bam]) mapping_paired = mapping.apply(file_dep = _ + [R1.fastq, R2.fastq]).assoc(targets=[paried.bam]) task_maping_up = lambda : mapping_up -task_maping_paired = lambda : mapping_paired - +task_maping_paired = lambda : mapping_paired +def task_merge_bam(): + return { 'targets' : [unsorted.bam] + 'file_dep' : [paired.bam, unpaired.bam], #'task_dep' : 'mapping', + 'actions' : ['mkfifo %(targets)', 'samtools merge %(targets) %(dependencies)']} -@fifo('targets') -def bwa_map(dependencies, targets, threads, keep_temp): - sh.bwa.mem(*dependencies, t=threads, _out=targets[0]) - -R1s, R2s, unpaireds = group_fastqs(files) - -def task_run_cutadapt(): - for fqs in zip(R1s, R2s) + unpaireds: - yield gen_cutdapt(fqs) - -def gen_cutdapt(*fqs): - func = cutadapt_paired if len(fqs) > 1 else cutadapt_up - return { - 'file_dep' : fqs, - 'targets' : map(_.cutadapt, fqs), - 'actions' : [to_fifo(func, 'targets'), [], opts.cutadapt]} - -def cutadapt_paired(dependencies, targets, quality): - sh.cutadapt(o=targets[0], p=targets[1], q="{0},{0}".format(quality), *dependencies) - -def cutadapt_up(dependencies, targets, quality): - sh.cutadapt(q=quality, o=targets[0], *dependencies) - #'actions': ["cutadapt %(dependencies)"] # not work +def make_task(target, dep, action): + return { 'targets' : [target], 'file_dep' : [dep], 'actions' : [action] } -#fwd, rev <- fwd, rev : cutadapt_paired +task_sort_bam = lambda : make_task(merged.bam, unsorted.bam, 'mkfifo %(targets) && samtools sort %(dependencies) > %(targets)') +task_index_bam = lambda : make_task(merged.bam.bai, merged.bam, 'samtools index %(dependencies)') +task_tag_bam = lambda : { 'file_dep' : [merged.bam], + 'actions' : ['tagreads %(dependencies) --CN %s' % opts.tagreads['CN'] ] } +# this is screwed up because the tagged bam file may not be sorted. Should swap out `tagreads` for +# something that creates a new file, it's simpler +# also that doesn't play well with fifo +@fifo('targets') +def run_freebayes(dependencies, targets, haplotype_length, observations): + sh.freebayes(dependencies[0], f=dependencies[1], + haplotype_length=haplotype_length, C=observations, _out=targets[0]) + +def task_freebayes(): + return { 'targets' : [merged.bam.vcf] + 'file_dep' : [merged.bam, opts.ref, ref.fasta.fai] + 'task_dep' : ['tag_bam'] + 'actions' : [run_freebayes, [], opts.freebayes] } + +def task_consensus(): + def TODO(dependecies, targets): + pass + return { 'targets' : [consensus.fasta] + 'file_dep' : [merged.bam.vcf, opts.ref] + 'actions' : [TODO] } -def task_sff2fastq(): - convert_sff = lambda i,o: SeqIO.convert(i, 'sff', o, 'fastq') - convert_sff = to_fifo(convert_sff, 'o') - for sff in sffs: - yield {'target' : [swap_ext(sff, 'fastq')], - 'file_deps' : [sff], - 'actions' : convert_sff} ''' actions take kwargs. so add the dict associated with a task (e.g., ngs_filter) dynamically, based on func name #