Skip to content

Commit

Permalink
Merge pull request #81 from broadinstitute/dpark-dev
Browse files Browse the repository at this point in the history
increase JVM memory for butterfly, improve a bunch of reports on assembly / depletion.
  • Loading branch information
dpark01 committed Jan 29, 2015
2 parents eaec753 + 46966d1 commit dad82ba
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 236 deletions.
7 changes: 5 additions & 2 deletions assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def parser_trim_rmdup_subsamp(parser=argparse.ArgumentParser()):
__commands__.append(('trim_rmdup_subsamp', parser_trim_rmdup_subsamp))


def assemble_trinity(inBam, outFasta, clipDb, n_reads=100000, outReads=None):
def assemble_trinity(inBam, outFasta, clipDb, n_reads=100000, outReads=None, JVMmemory=None):
''' This step runs the Trinity assembler.
First trim reads with trimmomatic, rmdup with prinseq,
and random subsample to no more than 100k reads.
Expand All @@ -110,7 +110,7 @@ def assemble_trinity(inBam, outFasta, clipDb, n_reads=100000, outReads=None):
trim_rmdup_subsamp_reads(inBam, clipDb, subsamp_bam, n_reads=n_reads)
subsampfq = list(map(util.file.mkstempfname, ['.subsamp.1.fastq', '.subsamp.2.fastq']))
tools.picard.SamToFastqTool().execute(subsamp_bam, subsampfq[0], subsampfq[1])
tools.trinity.TrinityTool().execute(subsampfq[0], subsampfq[1], outFasta)
tools.trinity.TrinityTool().execute(subsampfq[0], subsampfq[1], outFasta, JVMmemory=JVMmemory)
os.unlink(subsampfq[0])
os.unlink(subsampfq[1])

Expand All @@ -128,6 +128,9 @@ def parser_assemble_trinity(parser=argparse.ArgumentParser()):
help='Subsample reads to no more than this many pairs. (default %(default)s)')
parser.add_argument('--outReads', default=None,
help='Save the trimmomatic/prinseq/subsamp reads to a BAM file')
parser.add_argument('--JVMmemory',
default=tools.trinity.TrinityTool.jvmMemDefault,
help='JVM virtual memory size (default: %(default)s)')
util.cmd.common_args(parser, (('loglevel',None), ('version',None), ('tmpDir',None)))
util.cmd.attach_main(parser, assemble_trinity, split_args=True)
return parser
Expand Down
2 changes: 1 addition & 1 deletion docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Additionally, in order to use the **pipeline infrastructure**, Python 3.4
is required (Python 2 is not supported) and you must install snakemake
as well::

pip install snakemake==3.2 yappi=0.94
pip install snakemake==3.2.1 yappi=0.94

However, most of the real functionality is encapsulated in the command line
tools, which can be used without any of the pipeline infrastructure.
Expand Down
3 changes: 0 additions & 3 deletions pipes/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ rule all:
os.path.join(config["dataDir"], config["subdirs"]["interhost"], 'ref_guided.fasta'),
os.path.join(config["dataDir"], config["subdirs"]["interhost"], 'ref_guided.vcf.gz'),
# create summary reports
config["reportsDir"]+'/summary.coverage_ref.txt.gz',
config["reportsDir"]+'/summary.coverage_self.txt.gz',
config["reportsDir"]+'/summary.fastqc.txt',
config["reportsDir"]+'/summary.bamstats.txt',
config["reportsDir"]+'/summary.spike_count.txt'
params: LSF="-N"
run:
Expand Down
23 changes: 7 additions & 16 deletions pipes/rules/assembly.rules
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,14 @@ rule all_assemble:
# create BAMs of aligned reads to own consensus
expand("{dataDir}/{subdir}/{sample}.bam",
dataDir=config["dataDir"], subdir=config["subdirs"]["align_self"],
sample=read_samples_file(config["samples_assembly"])),
# assembly reports too
expand("{reportsDir}/assembly/{sample}.txt",
reportsDir=config["reportsDir"],
sample=read_samples_file(config["samples_assembly"]))
params: LSF="-N"

rule all_assemble_failures:
input:
expand("{dataDir}/{subdir}/{sample}.fasta",
dataDir=config["dataDir"], subdir=config["subdirs"]["assembly"],
sample=read_samples_file(config.get("samples_assembly_failures"))),
expand("{reportsDir}/assembly/{sample}.txt",
reportsDir=config["reportsDir"],
sample=read_samples_file(config["samples_assembly"]))
sample=read_samples_file(config.get("samples_assembly_failures")))
params: LSF="-N"


Expand All @@ -45,7 +38,7 @@ rule assemble_trinity:
'''
input: config["dataDir"]+'/'+config["subdirs"]["per_sample"]+'/{sample}.taxfilt.bam'
output: config["tmpDir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly1-trinity.fasta'
resources: mem=3
resources: mem=4
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
n_reads="100000",
logid="{sample}",
Expand Down Expand Up @@ -80,8 +73,7 @@ rule orient_and_impute:
multi-chromosome genomes.
'''
input: config["tmpDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.assembly1-trinity.fasta'
output: config["tmpDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.assembly2-vfat.fasta',
config["tmpDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.assembly3-modify.fasta'
output: config["tmpDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.assembly3-modify.fasta'
resources: mem=3
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
refGenome=config["ref_genome"],
Expand All @@ -90,10 +82,11 @@ rule orient_and_impute:
renamed_prefix="",
replace_length="55",
logid="{sample}",
vfat_fasta=config["tmpDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.assembly2-vfat.fasta',
subsamp_bam=config["tmpDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.subsamp.bam'
run:
shell("{config[binDir]}/assembly.py order_and_orient {input} {params.refGenome} {output[0]} --inReads {params.subsamp_bam}")
shell("{config[binDir]}/assembly.py impute_from_reference {output[0]} {params.refGenome} {output[1]} --newName {params.renamed_prefix}{wildcards.sample} --replaceLength {params.replace_length} --minLength {params.length} --minUnambig {params.min_unambig}")
shell("{config[binDir]}/assembly.py order_and_orient {input} {params.refGenome} {params.vfat_fasta} --inReads {params.subsamp_bam}")
shell("{config[binDir]}/assembly.py impute_from_reference {params.vfat_fasta} {params.refGenome} {output} --newName {params.renamed_prefix}{wildcards.sample} --replaceLength {params.replace_length} --minLength {params.length} --minUnambig {params.min_unambig}")

rule refine_assembly_1:
''' This a first pass refinement step where we take the VFAT assembly,
Expand Down Expand Up @@ -148,8 +141,7 @@ rule map_reads_to_self:
input: config["dataDir"]+'/'+config["subdirs"]["per_sample"]+'/{sample}.cleaned.bam',
config["dataDir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.fasta'
output: config["dataDir"]+'/'+config["subdirs"]["align_self"]+'/{sample}.bam',
config["dataDir"]+'/'+config["subdirs"]["align_self"]+'/{sample}.mapped.bam',
config["reportsDir"]+'/assembly/{sample}.txt'
config["dataDir"]+'/'+config["subdirs"]["align_self"]+'/{sample}.mapped.bam'
resources: mem=4
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
logid="{sample}",
Expand All @@ -158,4 +150,3 @@ rule map_reads_to_self:
makedirs([os.path.join(config["dataDir"], config["subdirs"]["align_self"]),
os.path.join(config["reportsDir"], "assembly")])
shell("{config[binDir]}/read_utils.py align_and_fix {input} --outBamAll {output[0]} --outBamFiltered {output[1]} --novoalign_options '{params.novoalign_options}'")
shell("{config[binDir]}/reports.py assembly_stats {wildcards.sample} {output[2]} --assembly_dir {config[dataDir]}/{config[subdirs][assembly]} --assembly_tmp {config[tmpDir]}/{config[subdirs][assembly]} --align_dir {config[dataDir]}/{config[subdirs][align_self]}")
110 changes: 1 addition & 109 deletions pipes/rules/reports.rules
Original file line number Diff line number Diff line change
Expand Up @@ -10,78 +10,10 @@ import os, os.path, gzip, shutil

rule all_reports:
input:
config["reportsDir"]+'/summary.coverage_ref.txt.gz',
config["reportsDir"]+'/summary.coverage_self.txt.gz',
config["reportsDir"]+'/summary.fastqc.txt',
config["reportsDir"]+'/summary.bamstats.txt',
config["reportsDir"]+'/summary.spike_count.txt'
params: LSF="-N"

#-----------BAMSTATS-------------------

def bamstats_input_file(wildcards):
adj = wildcards.adjective
if adj in ('raw', 'cleaned'):
subdir = "depletion"
else:
if adj.startswith('self_'):
subdir = "align_self"
elif adj.startswith('ref_'):
subdir = "align_ref"
else:
raise Exception()
if adj.endswith('_mapped') or adj.endswith('_rmdup') or adj.endswith('_realigned'):
adj = adj.split('_')[-1]
elif adj.endswith('_align'):
adj = None
else:
raise Exception()
if adj==None:
bamfile = wildcards.sample + '.bam'
else:
bamfile = '.'.join([wildcards.sample, adj, 'bam'])
return os.path.join(config["dataDir"], config["subdirs"][subdir], bamfile)

rule bamstats_report:
input: bamstats_input_file
output: config["reportsDir"]+'/bamstats/{sample}.{adjective}.txt'
params: LSF='-W 0:30 -sp 40',
logid="{sample}-{adjective}",
tmpf_report=config["reportsDir"]+'/bamstats/{sample}.{adjective}.tmp.txt'
run:
makedirs(config["reportsDir"]+'/bamstats')
shell("use BamTools; bamtools stats -insert -in {input} > {params.tmpf_report}")
out = []
out.append(['Sample', wildcards.sample])
out.append(['BAM', wildcards.adjective])
with open(params.tmpf_report, 'rt') as inf:
for line in inf:
row = line.strip().split(':')
if len(row)==2 and row[1]:
k,v = [x.strip() for x in row]
k = k.strip("'")
mo = re.match(r'^(\d+)\s+\(\S+\)$', v)
if mo:
v = mo.group(1)
out.append([k,v])
with open(output[0], 'wt') as outf:
for row in out:
outf.write('\t'.join(row)+'\n')
os.unlink(params.tmpf_report)

rule consolidate_bamstats:
input:
expand("{{dir}}/bamstats/{sample}.{adjective}.txt",
adjective=['raw','cleaned',
'ref_align', 'ref_mapped', 'ref_rmdup', 'ref_realigned'],
sample=read_samples_file(config["samples_per_run"])) + \
expand("{{dir}}/bamstats/{sample}.{adjective}.txt",
adjective=['self_align', 'self_mapped', 'self_rmdup', 'self_realigned'],
sample=read_samples_file(config["samples_assembly"]))
output: '{dir}/summary.bamstats.txt'
params: logid="all"
shell: "{config[binDir]}/reports.py consolidate_bamstats {input} {output}"


#-----------FASTQC---------------------

Expand All @@ -106,46 +38,6 @@ rule consolidate_fastqc:
params: logid="all"
shell: "{config[binDir]}/reports.py consolidate_fastqc {input} {output}"

#-----------COVERAGE-------------------

def coverage_input_files(wildcards):
if wildcards.adjective == 'ref':
ref = config["ref_genome"]
reads = os.path.join(config["dataDir"], config["subdirs"]["align_ref"], wildcards.sample + '.realigned.bam')
elif wildcards.adjective == 'self':
ref = os.path.join(config["dataDir"], config["subdirs"]["assembly"], wildcards.sample + '.fasta')
reads = os.path.join(config["dataDir"], config["subdirs"]["align_self"], wildcards.sample + '.realigned.bam')
else:
raise Exception()
return (ref, reads)

rule coverage_report:
input: coverage_input_files
output: config["reportsDir"]+'/coverage/{sample}.coverage_{adjective}.txt'
resources: mem=3
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00') + " -sp 40",
logid="{sample}-{adjective}"
run:
makedirs(os.path.join(config["reportsDir"], 'coverage'))
shell("/idi/sabeti-scratch/kandersen/bin/bedtools/bedtools genomecov -d -ibam {input[1]} -g {input[0]} > {output}")

def consolidate_coverage_inputs(wildcards):
if wildcards.adjective == 'ref':
samples = config['samples_per_run']
elif wildcards.adjective == 'self':
samples = config['samples_assembly']
else:
raise Exception()
return [wildcards.dir+'/coverage/'+sample+'.coverage_'+wildcards.adjective+'.txt'
for sample in read_samples_file(samples)]

rule consolidate_coverage:
input: consolidate_coverage_inputs
output: '{dir}/summary.coverage_{adjective}.txt.gz'
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
logid="all"
shell: "{config[binDir]}/reports.py consolidate_coverage {input} {wildcards.adjective} {output}"


#-----------SPIKE-INS------------------

Expand All @@ -171,5 +63,5 @@ rule consolidate_spike_count:
sample=read_samples_file(config["samples_per_run"]))
output: '{dir}/summary.spike_count.txt'
params: logid="all"
shell: "{config[binDir]}/reports.py consolidate_spike_count {input} {output}"
shell: "{config[binDir]}/reports.py consolidate_spike_count {wildcards.dir}/spike_count {output}"

7 changes: 6 additions & 1 deletion pipes/rules/reports_only
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,10 @@ rule assembly_report:
resources: mem=2
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
logid="{sample}"
shell: "{config[binDir]}/reports.py assembly_stats {wildcards.sample} {output} --assembly_dir {config[dataDir]}/{config[subdirs][assembly]} --assembly_tmp {config[tmpDir]}/{config[subdirs][assembly]} --align_dir {config[dataDir]}/{config[subdirs][align_self]}"
shell: "{config[binDir]}/reports.py assembly_stats {wildcards.sample} {output}" \
" --assembly_dir {config[dataDir]}/{config[subdirs][assembly]}" \
" --assembly_tmp {config[tmpDir]}/{config[subdirs][assembly]}" \
" --align_dir {config[dataDir]}/{config[subdirs][align_self]}" \
" --reads_dir {config[dataDir]}/{config[subdirs][per_sample]}" \
" --raw_reads_dir {config[dataDir]}/{config[subdirs][source]}"

Loading

0 comments on commit dad82ba

Please sign in to comment.