diff --git a/changelog.md b/changelog.md index 78bb9b10..fb9e89aa 100644 --- a/changelog.md +++ b/changelog.md @@ -11,10 +11,13 @@ ### New components - `Kraken2`: Taxonomic identification on FastQ files +- `insert_size`: calculates the insert size of a sample from mapping the reads +back to an assembly ### Bug fixes -- Fix bug in `momps`component related to added in the introduction of the clear input parameter +- Fix bug in `momps`component added in the introduction of the clear +input option - Fixed bug with the `-ft` parameters not retrieving the dockerhub tags for all the components. - Fixed bug in the `megahit` process where the fastg mode would break the process @@ -25,7 +28,13 @@ position in the `nextflow run` command inside the .nextflow.log file. ### Minor/Other changes -- Added option to `dengue_typing` to retrive closest referece sequence and link it +- Added `insert_size` to `innuca` recipe +- `integrity_coverage` now checks the integrity of the compressed read files with the +appropriate software. +- `mlst` component now has it's own process template +- `assembly_mapping` now verifies the percentage of mapped reads, issuing a quality +control warning when it falls bellow 95% +- Added option to `dengue_typing` to retrieve closest reference sequence and link it with a secondary channel into `mafft` - New version of DEN-IM recipe - Now prints an ordered list of components diff --git a/flowcraft/generator/components/mapping.py b/flowcraft/generator/components/mapping.py index c6fa751f..71b318cd 100644 --- a/flowcraft/generator/components/mapping.py +++ b/flowcraft/generator/components/mapping.py @@ -5,7 +5,8 @@ class Bowtie(Process): - """bowtie2 to align short paired-end sequencing reads to long reference sequences + """ + bowtie2 process to align short paired-end sequencing reads to long reference sequences This process is set with: @@ -13,7 +14,7 @@ class Bowtie(Process): - ``output_type``: bam - ``ptype``: mapping - """ + """ def __init__(self, **kwargs): @@ -65,7 +66,8 @@ def __init__(self, **kwargs): class RetrieveMapped(Process): - """Samtools process to to align short paired-end sequencing reads to + """ + Samtools process to to align short paired-end sequencing reads to long reference sequences This process is set with: @@ -74,7 +76,7 @@ class RetrieveMapped(Process): - ``output_type``: fastq - ``ptype``: mapping - """ + """ def __init__(self, **kwargs): @@ -108,3 +110,64 @@ def __init__(self, **kwargs): self.status_channels = [ "retrieve_mapped" ] + + +class InsertSize(Process): + """ + Determines the sequencing insert size by reads mapping + to an assembly file + + This process is set with: + + - ``input_type``: fasta + - ``output_type``: None + - ``ptype``: mapping + + It contains one **secondary channel link end**: + + - ``MAIN_fq`` (alias: ``_MAIN_assembly``): Receives the FastQ files + from the last process with ``fastq`` output type. + + """ + + def __init__(self, **kwargs): + + super().__init__(**kwargs) + + self.input_type = "fasta" + self.output_type = None + + self.link_end.append({"link": "__fastq", "alias": "_LAST_fastq"}) + + self.params = { + "distribution_plot": { + "default": "false", + "description": "Produces a distribution plot of the insert sizes." + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." + } + } + + self.directives = { + "assembly_mapping_statistics": { + "container": "flowcraft/bowtie2_samtools", + "version": "1.0.0-1", + "memory": "{1.Gb*task.cpus*task.attempt}", + "cpus": 1 + }, + "insert_size": { + "container": "flowcraft/plotly", + "version": "3.5.0-1" + } + } + + self.status_channels = [ + "assembly_mapping_statistics", + "insert_size" + ] diff --git a/flowcraft/generator/components/mlst.py b/flowcraft/generator/components/mlst.py index dba1bfdf..b8388b7f 100644 --- a/flowcraft/generator/components/mlst.py +++ b/flowcraft/generator/components/mlst.py @@ -27,7 +27,8 @@ def __init__(self, **kwargs): self.output_type = "fasta" self.directives = {"mlst": { - "container": "ummidock/mlst", + "container": "flowcraft/mlst", + "version": "2.15.1-1" }} self.params = { diff --git a/flowcraft/generator/components/reads_quality_control.py b/flowcraft/generator/components/reads_quality_control.py index 6e029956..f539b01c 100644 --- a/flowcraft/generator/components/reads_quality_control.py +++ b/flowcraft/generator/components/reads_quality_control.py @@ -45,6 +45,11 @@ def __init__(self, **kwargs): self.link_start.extend(["SIDE_phred", "SIDE_max_len"]) + self.directives = {"integrity_coverage": { + "container": "flowcraft/integrity_coverage", + "version": "1.0-1" + }} + class CheckCoverage(Process): """Process template interface for additional integrity_coverage process @@ -112,7 +117,7 @@ def __init__(self, **kwargs): "cpus": 4, "memory": "'1GB'", "container": "flowcraft/true_coverage", - "version": "3.2-1" + "version": "3.3-1" } } diff --git a/flowcraft/generator/recipes/innuca.py b/flowcraft/generator/recipes/innuca.py index e59e4f03..99b317a1 100644 --- a/flowcraft/generator/recipes/innuca.py +++ b/flowcraft/generator/recipes/innuca.py @@ -26,7 +26,8 @@ def __init__(self): "spades " \ "process_spades " \ "pilon " \ - "mlst " + "mlst " \ + "insert_size" # Recipe parameters and directives self.directives = { diff --git a/flowcraft/generator/templates/insert_size.nf b/flowcraft/generator/templates/insert_size.nf new file mode 100644 index 00000000..ada6eef9 --- /dev/null +++ b/flowcraft/generator/templates/insert_size.nf @@ -0,0 +1,72 @@ +IN_plot{{ pid }} = params.distribution_plot{{ param_id }} ? "True" : "False" + + +process assembly_mapping_statistics_{{ pid }} { + + // Send POST request to platform + {% include "post.txt" ignore missing %} + + tag { sample_id } + + input: + set sample_id, file(assembly), file(fastq) from {{ input_channel }}.join(_LAST_fastq_{{ pid }}) + + output: + set sample_id, file("samtools_stats.txt") into IN_insert_size_{{ pid }} + {% with task_name="assembly_mapping_statistics" %} + {%- include "compiler_channels.txt" ignore missing -%} + {% endwith %} + + script: + """ + { + echo [DEBUG] BUILDING BOWTIE INDEX FOR ASSEMBLY: $assembly >> .command.log 2>&1 + bowtie2-build --threads ${task.cpus} $assembly genome_index >> .command.log 2>&1 + + echo [DEBUG] MAPPING READS FROM $fastq >> .command.log 2>&1 + bowtie2 -q --very-fast --threads ${task.cpus} -x genome_index -1 ${fastq[0]} -2 ${fastq[1]} \ + --fr -I 0 -X 2000 --no-discordant --no-mixed --no-unal -S alignment.sam >> .command.log 2>&1 + + echo [DEBUG] GET STATISTICS FROM SAM: alignment.sam + samtools stats alignment.sam > samtools_stats.txt + + if [ -f "alignment.sam" ] && [ -f "samtools_stats.txt" ] + then + echo pass > .status + else + echo fail > .status + fi + + echo -n "" > .report.json + echo -n "" > .versions + } || { + echo fail > .status + } + """ +} + + +process insert_size_{{ pid }} { + + // Send POST request to platform + {% include "post.txt" ignore missing %} + + tag { sample_id } + + publishDir "results/assembly/insert_size_{{ pid }}/" + + input: + set sample_id, file(sam_stats) from IN_insert_size_{{ pid }} + val plot from IN_plot{{ pid }} + + output: + file ("*insert_size_report.tab") + file ("*insert_size_distribution.html") optional true + {% with task_name="insert_size" %} + {%- include "compiler_channels.txt" ignore missing -%} + {% endwith %} + + script: + template "insert_size.py" + +} \ No newline at end of file diff --git a/flowcraft/generator/templates/mlst.nf b/flowcraft/generator/templates/mlst.nf index efe0b72e..0d62caaa 100644 --- a/flowcraft/generator/templates/mlst.nf +++ b/flowcraft/generator/templates/mlst.nf @@ -1,3 +1,9 @@ +// If a species is not provided, it bypasses the species verification +if (params.mlstSpecies{{ param_id }} == null){ + IN_expected_species_{{ pid }} = Channel.value("PASS") +} else { + IN_expected_species_{{ pid }} = Channel.value(params.mlstSpecies{{ param_id }}) +} process mlst_{{ pid }} { @@ -10,6 +16,7 @@ process mlst_{{ pid }} { input: set sample_id, file(assembly) from {{ input_channel }} + val expected_species from IN_expected_species_{{ pid }} output: file '*.mlst.txt' into LOG_mlst_{{ pid }} @@ -19,30 +26,8 @@ process mlst_{{ pid }} { {% endwith %} script: - """ - { - expectedSpecies=${params.mlstSpecies{{ param_id }}} - mlst $assembly >> ${sample_id}.mlst.txt - mlstSpecies=\$(cat *.mlst.txt | cut -f2) - json_str="{'expectedSpecies':\'\$expectedSpecies\',\ - 'species':'\$mlstSpecies',\ - 'st':'\$(cat *.mlst.txt | cut -f3)',\ - 'tableRow':[{'sample':'${sample_id}','data':[\ - {'header':'MLST species','value':'\$mlstSpecies','table':'typing'},\ - {'header':'MLST ST','value':'\$(cat *.mlst.txt | cut -f3)','table':'typing'}]}]}" - echo \$json_str > .report.json - - if [ ! \$mlstSpecies = \$expectedSpecies ]; - then - printf fail > .status - else - printf pass > .status - fi - - } || { - printf fail > .status - } - """ + template "run_mlst.py" + } process compile_mlst_{{ pid }} { diff --git a/flowcraft/templates/fastqc_report.py b/flowcraft/templates/fastqc_report.py index debe03d1..e7c6ffe3 100644 --- a/flowcraft/templates/fastqc_report.py +++ b/flowcraft/templates/fastqc_report.py @@ -446,13 +446,13 @@ def check_summary_health(summary_file, **kwargs): # Store the summary categories that cannot fail. If they fail, do not # proceed with this sample - fail_sensitive = kwargs.get("fail_sensitive", [ + fail_if_fail = kwargs.get("fail_if_fail", [ "Per base sequence quality", "Overrepresented sequences", "Sequence Length Distribution", "Per sequence GC content" ]) - logger.debug("Fail sensitive categories: {}".format(fail_sensitive)) + logger.debug("Must not fail categories: {}".format(fail_if_fail)) # Store summary categories that must pass. If they do not, do not proceed # with that sample @@ -462,15 +462,17 @@ def check_summary_health(summary_file, **kwargs): ]) logger.debug("Must pass categories: {}".format(must_pass)) - warning_fail_sensitive = kwargs.get("warning_fail_sensitive", [ + warning_if_warning = kwargs.get("warning_if_warning", [ "Per base sequence quality", "Overrepresented sequences", ]) + logger.debug("Warninf categories: {}".format(warning_if_warning)) - warning_must_pass = kwargs.get("warning_must_pass", [ + warning_if_fail = kwargs.get("warning_if_fail", [ "Per base sequence content" ]) + logger.debug("Warning if fail categories: {}".format(warning_if_fail)) # Get summary dictionary summary_info = get_summary(summary_file) @@ -486,31 +488,29 @@ def check_summary_health(summary_file, **kwargs): logger.debug("Assessing category {} with result {}".format(cat, test)) - # FAILURES - # Check for fail sensitive - if cat in fail_sensitive and test == "FAIL": + # Check for must not fail + if cat in fail_if_fail and test == "FAIL": health = False - failed.append("{}:{}".format(cat, test)) - logger.error("Category {} failed a fail sensitive " + failed.append("{}: {}".format(cat, test)) + logger.error("Category {} failed a must not fail " "category".format(cat)) # Check for must pass if cat in must_pass and test != "PASS": health = False - failed.append("{}:{}".format(cat, test)) + failed.append("{}: {}".format(cat, test)) logger.error("Category {} failed a must pass category".format( cat)) # WARNINGS - # Check for fail sensitive - if cat in warning_fail_sensitive and test == "FAIL": - warning.append("Failed category: {}".format(cat)) - logger.warning("Category {} flagged at a fail sensitive " + if cat in warning_if_warning and test == "WARN": + warning.append("{}: {}".format(cat, test)) + logger.warning("Category {} flagged at a warning " "category".format(cat)) - if cat in warning_must_pass and test != "PASS": - warning.append("Did not pass category: {}".format(cat)) - logger.warning("Category {} flagged at a must pass " + if cat in warning_if_fail and test == "FAIL": + warning.append("{}: {}".format(cat, test)) + logger.warning("Category {} flagged at warning if fail " "category".format(cat)) # Passed all tests diff --git a/flowcraft/templates/insert_size.py b/flowcraft/templates/insert_size.py new file mode 100644 index 00000000..642d279f --- /dev/null +++ b/flowcraft/templates/insert_size.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 + +""" +Purpose +------- + +This module is intended to calculate the insert size on sam files +resulting from the mapping the read data back to the assembly. + +Expected input +-------------- + +The following variables are expected whether using NextFlow or the +:py:func:`main` executor. + +- ``sample_id`` : Sample Identification string. + - e.g.: ``'SampleA'`` +- ``fasta_file`` : Fasta file paths. + - e.g.: ``'SampleA.fasta'`` +- ``fastq_file``: List with fastq file paths. + - e.g.: ``'SampleA_1.fastq.gz SampleA_2.fastq.gz'` +- ``plot`` : Boolean to draw insert size plot + +Generated output +---------------- + + +Code documentation +------------------ + +""" + +__version__ = "1.0.1" +__build__ = "09012019" +__template__ = "insert_size-nf" + +import os +import sys +import plotly.offline as plot_off +import plotly.graph_objs as graph_obj + +from flowcraft_utils.flowcraft_base import get_logger, MainWrapper + +logger = get_logger(__file__) + + +if __file__.endswith(".command.sh"): + SAMPLE_ID = '$sample_id' + SAM_STATS = '$sam_stats' + PLOT = '$plot' + logger.debug("Running {} with parameters:".format( + os.path.basename(__file__))) + logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID)) + logger.debug("SAM: {}".format(SAM_STATS)) + logger.debug("PLOT: {}".format(PLOT)) + + +def prepare_insert_size_distribution(samtools_stats): + """ + Collect the data to produce a distribution plot of the insert sizes + + Parameters + ---------- + samtools_stats : str + Path to the samtools stats file + + Returns + ------- + x : list + List of x axis values + y : list + List of y axis values + """ + + x = [] + y = [] + with open(samtools_stats, 'rt') as reader: + for line in reader: + if line.startswith('IS'): + line = line.split('\t') + if int(line[2]) > 0: + x.append(int(line[1])) + y.append(int(line[2])) + + return x, y + + +def draw_plot(sam_stats, sample_id): + + x_values, y_values = prepare_insert_size_distribution(samtools_stats=sam_stats) + + # Converting absolute counts to frequencies + y_values = list(map(lambda x: float(x) / sum(y_values), y_values)) + + plot_trace = graph_obj.Scatter(name='', + x=x_values, + y=y_values, + mode='lines', + line=dict(color='rgb(0, 0, 0)') + ) + + plot_off.plot({"data": [plot_trace], + "layout": graph_obj.Layout(title="{} Insert Size Distribution".format(sample_id), + xaxis=dict(title="Insert size"), + yaxis=dict(title="Frequency")) + }, + show_link=False, + output_type="file", + filename="{}_insert_size_distribution.html".format(sample_id), + include_plotlyjs=True, + auto_open=False + ) + + +@MainWrapper +def main(sample_id, sam_stats, plot): + """ + Parse Samtools statistics output file and get insert size and standard deviation information + + Parameters + ---------- + sample_id: str + Sample name + sam_stats : str + Path to the samtools stats file + plot: Bool + Boolean to draw or not the insert size plot + + Returns + ------- + statistics : dict + Dictionary with the statistics. Keys are statistics description and values are the correspondent values + """ + + statistics = {} + + with open(sam_stats, "rt") as reader: + counter = 0 + + for line in reader: + + if counter <= 2: + + if line.startswith("SN"): + line = line.split("\t") + + if line[1] == "insert size average:": + statistics["insert_size"] = round(float(line[2]), 0) + counter += 1 + + elif line[1] == "insert size standard deviation:": + statistics["insert_size_sd"] = float(line[2]) + counter += 1 + else: + break + + with open("{}_insert_size_report.tab".format(sample_id), "wt") as writer: + writer.write("#" + "\\t".join(sorted(statistics.keys())) + "\\n") + writer.write("\\t".join([str(statistics[k]) for k in sorted(statistics.keys())]) + "\\n") + + # TODO - .report.json for webApp + + if plot == "True": + logger.debug("Generating insert size distribution plot.") + draw_plot(sam_stats, sample_id) + + + +if __name__ == "__main__": + + main(SAMPLE_ID, SAM_STATS, PLOT) + diff --git a/flowcraft/templates/integrity_coverage.py b/flowcraft/templates/integrity_coverage.py index 71007ba6..41a39f8b 100755 --- a/flowcraft/templates/integrity_coverage.py +++ b/flowcraft/templates/integrity_coverage.py @@ -74,16 +74,19 @@ """ -__version__ = "1.0.1" -__build__ = "03082018" +__version__ = "1.0.2" +__build__ = "08012019" __template__ = "integrity_coverage-nf" +import sys import os import bz2 import gzip import json import zipfile +import subprocess +from subprocess import PIPE from itertools import chain from flowcraft_utils.flowcraft_base import get_logger, MainWrapper @@ -126,6 +129,12 @@ "zip": zipfile.ZipFile } +CTEST = { + "gz": ["gzip", "-t"], + "bz2": ["gzip", "-t"], + "zip": ["zip", "-T"] +} + MAGIC_DICT = { b"\\x1f\\x8b\\x08": "gz", b"\\x42\\x5a\\x68": "bz2", @@ -180,6 +189,42 @@ def guess_file_compression(file_path, magic_dict=None): return None +def uncompress_fastq(fastq_files, compression_type): + """Method to test fastq integrity using the shell command. + + It uses the determined compression obtained in :py:func:`guess_file_compression` + and tests the integrity with the appropriate command. The command is stored + in the :py:data:`CTEST` dictionary. If the exit code if different than 0, the + file is corrupted and the method returns ``False``, otherwise returns ``True``. + + Parameters + ---------- + file_path : list + list containing the path to input files. + compression_type : str + File compression format. + + Returns + ------- + corrupted : True or False + If the uncompress test fails, it returns ``False``, otherwise, + returns ``True``. + """ + + for file_path in fastq_files: + + compression_command = CTEST[compression_type] + cli = compression_command + [file_path] + + p = subprocess.Popen(cli, stdout=PIPE) + p.communicate() + + if p.returncode > 0: + return False + + return True + + def get_qual_range(qual_str): """ Get range of the Unicode encode range for a given string of characters. @@ -268,6 +313,7 @@ def main(sample_id, fastq_pair, gsize, minimum_coverage, opts): gmin, gmax = 99, 0 encoding = [] phred = None + ftype = None # Information for coverage estimation chars = 0 @@ -296,8 +342,6 @@ def main(sample_id, fastq_pair, gsize, minimum_coverage, opts): "uncompressed file".format(fastq)) file_objects.append(open(fastq)) - logger.info("Starting FastQ file parsing") - # The '*_encoding' file stores a string with the encoding ('Sanger') # If no encoding is guessed, 'None' should be stored # The '*_phred' file stores a string with the phred score ('33') @@ -315,6 +359,13 @@ def main(sample_id, fastq_pair, gsize, minimum_coverage, opts): open(".fail", "w") as fail_fh: try: + + logger.info("Testing uncompressing the files") + + if not uncompress_fastq(fastq_pair, ftype): + raise EOFError + + logger.info("Starting FastQ file parsing") # Iterate over both pair files sequentially using itertools.chain for i, line in enumerate(chain(*file_objects)): diff --git a/flowcraft/templates/metaspades.py b/flowcraft/templates/metaspades.py index 80a4d4db..4502b854 100644 --- a/flowcraft/templates/metaspades.py +++ b/flowcraft/templates/metaspades.py @@ -212,7 +212,7 @@ def main(sample_id, fastq_pair, max_len, kmer, clear): logger.info("Finished metaSPAdes subprocess with STDOUT:\\n" "======================================\\n{}".format(stdout)) - logger.info("Fished metaSPAdes subprocesswith STDERR:\\n" + logger.info("Fished metaSPAdes subprocess with STDERR:\\n" "======================================\\n{}".format(stderr)) logger.info("Finished metaSPAdes with return code: {}".format( p.returncode)) diff --git a/flowcraft/templates/process_assembly.py b/flowcraft/templates/process_assembly.py index 23aab3e5..9d3e2750 100644 --- a/flowcraft/templates/process_assembly.py +++ b/flowcraft/templates/process_assembly.py @@ -479,6 +479,7 @@ def main(sample_id, assembly_file, gsize, opts, assembler): with open(".warnings", "w") as warn_fh: t_80 = gsize * 1000000 * 0.8 t_150 = gsize * 1000000 * 1.5 + # Check if assembly size of the first assembly is lower than 80% of the # estimated genome size. If True, redo the filtering without the # k-mer coverage filter @@ -499,13 +500,22 @@ def main(sample_id, assembly_file, gsize, opts, assembler): logger.debug("Checking updated assembly length: " "{}".format(assembly_len)) if assembly_len < t_80: - - warn_msg = "Assembly size smaller than the minimum" \ - " threshold of 80% of expected genome size: {}".format( - assembly_len) - logger.warning(warn_msg) - warn_fh.write(warn_msg) - fails = warn_msg + # The assembly size is still lower than 80% of the + # estimated genome size. Redoing the filtering without + # the k-mer coverage filter and the length filer. + assembly_obj.filter_contigs(*[]) + + assembly_len = assembly_obj.get_assembly_length() + logger.debug("Checking updated assembly length: " + "{}".format(assembly_len)) + + if assembly_len < t_80: + warn_msg = "Assembly size smaller than the minimum" \ + " threshold of 80% of expected genome size: {}".format( + assembly_len) + logger.warning(warn_msg) + warn_fh.write(warn_msg) + fails = warn_msg if assembly_len > t_150: @@ -536,9 +546,11 @@ def main(sample_id, assembly_file, gsize, opts, assembler): "{}.old".format(assembly_file))) assembly_obj.write_assembly("{}_proc.fasta".format( os.path.splitext(assembly_file)[0])) + # Write report output_report = "{}.report.csv".format(sample_id) assembly_obj.write_report(output_report) + # Write json report with open(".report.json", "w") as json_report: json_dic = { diff --git a/flowcraft/templates/process_assembly_mapping.py b/flowcraft/templates/process_assembly_mapping.py index 6e892341..7d4869bb 100644 --- a/flowcraft/templates/process_assembly_mapping.py +++ b/flowcraft/templates/process_assembly_mapping.py @@ -303,7 +303,7 @@ def filter_bam(coverage_info, bam_file, min_coverage, output_bam): def check_filtered_assembly(coverage_info, coverage_bp, minimum_coverage, genome_size, contig_size, max_contigs, - sample_id): + sample_id, total_reads, total_mapped_reads): """Checks whether a filtered assembly passes a size threshold Given a minimum coverage threshold, this function evaluates whether an @@ -327,12 +327,16 @@ def check_filtered_assembly(coverage_info, coverage_bp, minimum_coverage, Expected genome size. contig_size : dict Dictionary with the len of each contig. Contig headers as keys and - the corresponding lenght as values. + the corresponding length as values. max_contigs : int Maximum threshold for contig number. A warning is issued if this threshold is crossed. sample_id : str Id or name of the current sample + total_reads: int + Number of reads in the sample + total_mapped_reads: int + Number of reads that mapped to the assembly Returns ------- @@ -342,6 +346,8 @@ def check_filtered_assembly(coverage_info, coverage_bp, minimum_coverage, """ + min_mapping = 0.95 + # Get size of assembly after filtering contigs below minimum_coverage assembly_len = sum([v for k, v in contig_size.items() if coverage_info[k]["cov"] >= minimum_coverage]) @@ -368,6 +374,25 @@ def check_filtered_assembly(coverage_info, coverage_bp, minimum_coverage, with open(".warnings", "w") as warn_fh, \ open(".report.json", "w") as json_report: + logger.debug("Checking mapping statistics") + + logger.debug("Number of mapped reads: {}".format(total_mapped_reads)) + logger.debug("Total number of reads: {}".format(total_reads)) + + if total_mapped_reads > 0 and total_reads > 0: + if round((float(total_mapped_reads) / float(total_reads)), 2) >= min_mapping: + logger.debug("Mapped reads: {}%".format( + round((float(total_mapped_reads) / float(total_reads)), 2) * 100)) + else: + warn_msg = "Mapped reads: {}% (lower than {}%)".format( + round((float(total_mapped_reads) / float(total_reads)), 2) * 100, min_mapping * 100) + logger.warning(warn_msg) + warn_fh.write(warn_msg) + fails.append("Mapped reads: {}%".format( + round((float(total_mapped_reads) / float(total_reads)), 2) * 100)) + else: + fails.append("No reads were mapped.") + logger.debug("Checking assembly size after filtering : {}".format( assembly_len)) @@ -556,6 +581,76 @@ def get_assembly_size(assembly_file): return assembly_size, contig_size +def get_mapping_statistics(mapping_file): + """Through stamtools flagstats, obtains the mapping statistic form a + mapping file. + + Parameters + ---------- + mapping_file : str + Path to mapping file (sam or bam). + + Returns + ------- + total_reads: int + Number of reads in the sample + total_mapped_reads: int + Number of reads that map to the assembly file + """ + + cli = ["samtools", "flagstat", mapping_file] + + logger.debug("Runnig samtools flagstat subprocess with command: {}".format( + cli)) + + p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + + # Attempt to decode STDERR output from bytes. If unsuccessful, coerce to + # string + try: + stderr = stderr.decode("utf8") + stdout = stdout.decode("utf8") + except (UnicodeDecodeError, AttributeError): + stderr = str(stderr) + stdout = str(stdout) + + logger.info("Finished samtools flagstats subprocess with STDOUT:\\n" + "======================================\\n{}".format(stdout)) + logger.info("Fished samtools flagstats subprocesswith STDERR:\\n" + "======================================\\n{}".format(stderr)) + logger.info("Finished samtools flagstats with return code: {}".format( + p.returncode)) + + mapping_stats = {} + + total_reads = 0 + total_mapped_reads = 0 + + if not p.returncode: + stdout = stdout.splitlines() + for line in stdout: + line = line.splitlines()[0] + if len(line) > 0: + line = line.split(' ', 3) + field = line[3].split('(', 1) + if len(field) == 0: + field = field[0].replace(' ', '_') + else: + field = field[0].rsplit(' ', 1)[0].replace(' ', '_') + #mapped and unmapped reads? + mapping_stats[field] = {'qc_passed': int(line[0]), 'qc_failed': int(line[2])} + + for field in sorted(mapping_stats): + if field == 'in_total': + total_reads = mapping_stats[field]['qc_passed'] + mapping_stats[field]['qc_failed'] + elif field == 'mapped': + total_mapped_reads = mapping_stats[field]['qc_passed'] + mapping_stats[field][ + 'qc_failed'] + + return total_reads, total_mapped_reads + + @MainWrapper def main(sample_id, assembly_file, coverage_file, coverage_bp_file, bam_file, opts, gsize): @@ -582,6 +677,10 @@ def main(sample_id, assembly_file, coverage_file, coverage_bp_file, bam_file, min_assembly_coverage, max_contigs = opts + logger.info("Getting mapping statistics") + + total_reads, total_mapped_reads = get_mapping_statistics(bam_file) + logger.info("Starting assembly mapping processing") # Get coverage info, total size and total coverage from the assembly @@ -608,7 +707,7 @@ def main(sample_id, assembly_file, coverage_file, coverage_bp_file, bam_file, logger.info("Checking filtered assembly") if check_filtered_assembly(coverage_info, coverage_bp_data, min_coverage, gsize, contig_size, int(max_contigs), - sample_id): + sample_id, total_reads, total_mapped_reads): # Filter assembly contigs based on the minimum coverage. logger.info("Filtered assembly passed minimum size threshold") logger.info("Writting filtered assembly") diff --git a/flowcraft/templates/run_mlst.py b/flowcraft/templates/run_mlst.py new file mode 100755 index 00000000..03db8484 --- /dev/null +++ b/flowcraft/templates/run_mlst.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 + +""" +Purpose +------- + +This module is intended execute mlst on Fasta files. + +Expected input +-------------- + +The following variables are expected whether using NextFlow or the +:py:func:`main` executor. + +- ``sample_id`` : Sample Identification string. + - e.g.: ``'SampleA'`` +- ``fasta_file`` : Fasta file paths. + - e.g.: ``'SampleA.fasta'`` +- ``mlstSpecies`` : Expected species + +Generated output +---------------- + + +Code documentation +------------------ + +""" + +__version__ = "1.0.1" +__build__ = "09012019" +__template__ = "mlst-nf" + +import os +import sys +import subprocess +import json + +from itertools import groupby +from subprocess import PIPE + +from flowcraft_utils.flowcraft_base import get_logger, MainWrapper + +logger = get_logger(__file__) + + +if __file__.endswith(".command.sh"): + SAMPLE_ID = '$sample_id' + ASSEMBLY = '$assembly' + EXPECTED_SPECIES = '$expected_species' + logger.debug("Running {} with parameters:".format( + os.path.basename(__file__))) + logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID)) + logger.debug("ASSEMBLY: {}".format(ASSEMBLY)) + logger.debug("EXPECTED_SPECIES: {}".format(EXPECTED_SPECIES)) + + +def chunkstring(string, length): + """ + Divides sequences in a multifasta file to the provided length. + + Parameters + ---------- + string: Str + Sequence to divide. + length: int + maximum sequence length. + """ + return (string[0 + i:length + i] for i in range(0, len(string), length)) + + +def get_species_scheme_map_version(mlst_folder): + """ + Since release v2.16.1, the file that maps the schema genus + to the species name changed from "species_scheme_map" to + "scheme_soecies_map.tab". This method determines which version + is in the container provided and returns it. If no file is found, + it terminates the code execution. + + Parameters + ---------- + mlst_folder: str + Path to the mlst source code + + Returns + ------- + mlst_db_path: str + Path to the mlst database containing the species and respective schema + species_scheme_map_version: int + Version on the mlst scheme database + """ + + mlst_db_path_version1 = os.path.join(os.path.dirname(os.path.dirname(mlst_folder)), 'db', 'species_scheme_map.tab') + + mlst_db_path_version2 = os.path.join(os.path.dirname(os.path.dirname(mlst_folder)), 'db', 'scheme_species_map.tab') + + if os.path.isfile(mlst_db_path_version1): + return mlst_db_path_version1, 1 + + elif os.path.isfile(mlst_db_path_version2): + return mlst_db_path_version2, 2 + + else: + logger.error("Species_scheme_map not found.") + sys.exit(1) + + +def set_species_scheme_map_variables(list_values, species_scheme_map_version): + """ + Depending on the version of the mlst database containing + the species and respective schema, it retrieves the entries for the + genus, species and scheme name. + + Parameters + ---------- + list_values: list + line, in list form, of the mlst scheme database + species_scheme_map_version: int + Version of the mlst dabase. + + Returns + ------- + val_genus: str + genus in line + val_species: str + species in line + Val_scheme: str + scheme name in line + """ + + if species_scheme_map_version == 1: + val_genus = list_values[0] + val_species = list_values[1] + val_scheme = list_values[2] + elif species_scheme_map_version == 2: + val_genus = list_values[1] + val_species = list_values[2] + val_scheme = list_values[0] + + return val_genus, val_species, val_scheme + + +def parse_species_scheme_map(species_splited, mlst_db_path, species_scheme_map_version): + """ + Parses the mlst scheme database and returns the full scheme for the species + and the genus. + + Parameters + ---------- + species_splited: list + List with the genus and specific epithet for the expected species + mlst_db_path: str + Path to the mlst database containing the species and respective schema + species_scheme_map_version: int + Version on the mlst database + + Returns + ------- + scheme: str + mlst scheme name + genus_mlst_scheme: str + genus name for the mlst scheme + """ + scheme = 'unknown' + genus_mlst_scheme = None + + with open(mlst_db_path, 'rtU') as reader: + for line in reader: + scheme_line = line.splitlines()[0] + + if scheme_line and not scheme_line.startswith('#'): + scheme_line = scheme_line.lower().split('\\t') + scheme_line_data = [scheme_line[i].split(' ')[0] for i in range(0, len(scheme_line))] + val_genus, val_species, val_scheme = set_species_scheme_map_variables(scheme_line_data, + species_scheme_map_version) + # checking if genus from expected species and scheme match + if val_genus == species_splited[0]: + + # if the scheme is not species specific (works for genus), the genus is set as the scheme name + if val_species == '': + genus_mlst_scheme = val_scheme + + elif val_species == species_splited[1]: + scheme = val_scheme + + if scheme == 'unknown' and genus_mlst_scheme is not None: + scheme = genus_mlst_scheme + + return scheme, genus_mlst_scheme + + +def clean_novel_alleles(novel_alleles, scheme_mlst, profile): + """ + Clean the fasta file with the novel alleles produced by mlst + + Parameters + ---------- + novel_alleles : str + Path for fasta file containing the novel alleles + scheme_mlst : str + MLST schema found by mlst + profile : list + List of strings with the profile found + + Returns + ------- + """ + unknown_genes = [] + + #get novel alleles + for gene_allele in profile: + print(gene_allele) + gene = gene_allele.split('(')[0] + try: + allele = gene_allele.split('(')[1].rstrip(')') + if allele.startswith('~'): + unknown_genes.append(gene) + except IndexError as e: + logger.warning("WARNING: Found unexpected formatting on mlst profile {}".format(e)) + + novel_alleles_keep = {} + + if unknown_genes: + + try: + reader = open(novel_alleles, mode='rt', newline=None) + + fasta_iter = (g for k, g in groupby(reader, lambda x: x.startswith('>'))) + + for header in fasta_iter: + + header = header.__next__()[1:].rstrip('\\r\\n') + seq = ''.join(s.rstrip('\\r\\n') for s in fasta_iter.__next__()) + + if header.startswith(scheme_mlst): + gene = header.split('.')[1].split('~')[0] + + if gene in unknown_genes: + novel_alleles_keep[header] = seq + reader.close() + + os.remove(novel_alleles) + + if novel_alleles_keep: + with open(novel_alleles, 'wt') as writer: + for header, seq in novel_alleles_keep.items(): + writer.write('>{}\\n'.format(header)) + writer.write('\\n'.join(chunkstring(seq, 80)) + '\\n') + except FileNotFoundError as e: + logger.info("An unknown ST was found but no novel alleles fasta file was " + "produced by mlst software: {}".format(e)) + + +def getScheme(species): + """ + Get mlst scheme for the expected species. + + Parameters + ---------- + species: str + Expected species + + Returns + ------- + scheme : str + mlst scheme name + species_genus : str + genus of the expected species + genus_mlst_scheme : str + genus for the mlst scheme + """ + cli = ['which', 'mlst'] + + p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + + # Attempt to decode STDERR output from bytes. If unsuccessful, coerce to + # string + try: + stderr = stderr.decode("utf8") + stdout = stdout.decode("utf8") + except (UnicodeDecodeError, AttributeError): + stderr = str(stderr) + stdout = str(stdout) + + mlst_folder = os.path.abspath(os.path.realpath(stdout.splitlines()[0])) + + mlst_db_path, species_scheme_map_new = get_species_scheme_map_version(mlst_folder) + + scheme, genus_mlst_scheme = parse_species_scheme_map(species.lower().split(' '), mlst_db_path, + species_scheme_map_new) + + logger.info('MLST scheme found for {species}: {scheme}'.format(species=species, scheme=scheme)) + + species_genus = species.lower().split(' ')[0] + + return scheme, species_genus, genus_mlst_scheme + + +def parse_stdout(stdout): + """ + Parses mlst stdout to retrieve the hit's mlst + scheme, st and profile + + Parameters + ---------- + stdout: str + mlst stdout + + Returns + ------- + scheme_mlst : str + mlst scheme name + st : str + mlst st for the sample + profile : list + list of strings containing the profile + + """ + + mlst_data = stdout.splitlines()[0].split("\\t") + + scheme_mlst = mlst_data[1].split("_")[0] + st = mlst_data[2] + profile = mlst_data[3:] + + return scheme_mlst, st, profile + + +@MainWrapper +def main(sample_id, assembly, expected_species): + """ + Main executor of the mlst template. + + Parameters + ---------- + sample_id : str + Sample Identification string. + assembly : str + Fasta file. + expected_species : str + Expected species + + """ + + pass_qc = False + + novel_alleles = "{}_mlst_novel_alleles.fasta".format(sample_id) + + cli = [ + "mlst", + "--novel", + novel_alleles, + assembly + ] + + logger.debug("Running mlst subprocess with command: {}".format(cli)) + + p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE) + stdout, stderr = p.communicate() + + # Attempt to decode STDERR output from bytes. If unsuccessful, coerce to + # string + try: + stderr = stderr.decode("utf8") + stdout = stdout.decode("utf8") + except (UnicodeDecodeError, AttributeError): + stderr = str(stderr) + stdout = str(stdout) + + logger.info("Finished mlst subprocess with STDOUT:\\n" + "======================================\\n{}".format(stdout)) + logger.info("Fished mlst subprocess with STDERR:\\n" + "======================================\\n{}".format(stderr)) + logger.info("Finished mlst with return code: {}".format( + p.returncode)) + + if p.returncode == 0: + + with open("{}.mlst.txt".format(sample_id), "wt") as writer: + writer.write(stdout) + + scheme_mlst, st, profile = parse_stdout(stdout) + + # In case it's an unkown ST, cleans the novel alleles file. + if st == "-": + clean_novel_alleles(novel_alleles=novel_alleles, scheme_mlst=scheme_mlst, profile=profile) + else: + # in case mlst fails to create the novel alleles file + if os.path.isfile(novel_alleles): + os.remove(novel_alleles) + + # if the expected_species is set to PASS, it bypasses species verification + if expected_species == "PASS": + pass_qc = True + + else: + # returns the schema for the expected species, the genus for that species and the genus for the mlst schema + expected_scheme, expected_species_genus, expected_mlst_scheme_genus = getScheme(expected_species) + + if scheme_mlst.split('_', 1)[0] == expected_scheme.split("_", 1)[0]: + pass_qc = True + + # If the scheme is not the same as the expected species, + else: + if expected_scheme == "unknown": + pass_qc = True + if scheme_mlst != "-": + logger.warning("Found {} scheme for expected species".format(scheme_mlst)) + + # in case of yersinia, it passes QC if the genus match as there's a scheme just for the genus, + # and one for yersinia pseudotuberculosis + elif expected_species_genus == 'yersinia' and expected_mlst_scheme_genus == 'yersinia': + pass_qc = True + logger.warning("Found a Yersinia scheme ({}), but it is different from what it was" + " expected ({})".format(scheme_mlst, expected_scheme)) + else: + if expected_mlst_scheme_genus is not None and \ + scheme_mlst == expected_scheme == expected_mlst_scheme_genus: + pass_qc = True + else: + logger.error("MLST scheme found ({}) and provided ({}) are not the same" + .format(scheme_mlst, expected_scheme)) + + # writing .report.json + report_dic = { + "expectedSpecies": expected_species, + "species": scheme_mlst, + "st": st, + "tableRow": [ + {"sample": sample_id, + "data": [ + {'header': 'MLST species', + 'value': scheme_mlst, + 'table': 'typing' + }, + {'header': 'MLST ST', + 'value': st, + 'table': 'typing', + "columnBar": False + } + ]} + ] + } + + with open(".report.json", "w") as report_fh, \ + open(".status", "w") as status_fh: + + report_fh.write(json.dumps(report_dic, separators=(",", ":"))) + + if pass_qc: + status_fh.write("pass") + else: + status_fh.write("fail") + else: + logger.error("Sample {} did not run successfully".format(sample_id)) + with open(".status", "w") as status_fh: + status_fh.write("error") + + +if __name__ == '__main__': + + main(SAMPLE_ID, ASSEMBLY, EXPECTED_SPECIES) diff --git a/flowcraft/templates/spades.py b/flowcraft/templates/spades.py index 130172de..b35f02ef 100644 --- a/flowcraft/templates/spades.py +++ b/flowcraft/templates/spades.py @@ -84,7 +84,6 @@ def __get_version_spades(): FASTQ_PAIR = '$fastq_pair'.split() MAX_LEN = int('$max_len'.strip()) KMERS = '$kmers'.strip() - CLEAR = '$clear' DISABLE_RR = '$disable_rr' OPTS = [x.strip() for x in '$opts'.strip("[]").split(",")] CLEAR = '$clear'