diff --git a/CHANGELOG.md b/CHANGELOG.md index 957a9f5..ba2616b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # demultiplex v0.1.2 +## Breaking changes + +* `demultiplex` workflow: renamed `sample_sheet` argument to `run_information` (PR #24) + +## New features + +* Add support for `bases2fastq` demultiplexer (PR #24) + ## Minor updates * Add resource labels to workflows (PR #21). diff --git a/src/dataflow/combine_samples/config.vsh.yaml b/src/dataflow/combine_samples/config.vsh.yaml index d82717e..3e34d99 100644 --- a/src/dataflow/combine_samples/config.vsh.yaml +++ b/src/dataflow/combine_samples/config.vsh.yaml @@ -11,9 +11,11 @@ argument_groups: - name: --forward_input type: file required: true + multiple: true - name: --reverse_input type: file required: false + multiple: true - name: Output arguments arguments: - name: --output_forward diff --git a/src/dataflow/combine_samples/main.nf b/src/dataflow/combine_samples/main.nf index 1a16fa6..a41e6f5 100644 --- a/src/dataflow/combine_samples/main.nf +++ b/src/dataflow/combine_samples/main.nf @@ -11,8 +11,8 @@ workflow run_wf { | groupTuple(by: 0, sort: "hash") | map {run_id, states -> // Gather the following state for all samples - def forward_fastqs = states.collect{it.forward_input} - def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null} + def forward_fastqs = states.collect{it.forward_input}.flatten() + def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null}.flatten() def resultState = [ "output_forward": forward_fastqs, diff --git a/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml b/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml index e0b7733..3f8e4b2 100644 --- a/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml +++ b/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml @@ -20,10 +20,12 @@ argument_groups: type: file direction: output required: true + multiple: true - name: "--fastq_reverse" type: file direction: output required: false + multiple: true resources: - type: nextflow_script path: main.nf diff --git a/src/dataflow/gather_fastqs_and_validate/main.nf b/src/dataflow/gather_fastqs_and_validate/main.nf index e6afd77..78fff61 100644 --- a/src/dataflow/gather_fastqs_and_validate/main.nf +++ b/src/dataflow/gather_fastqs_and_validate/main.nf @@ -14,9 +14,11 @@ workflow run_wf { def original_id = id // Parse sample sheet for sample IDs + println "Processing run information file ${sample_sheet}" csv_lines = sample_sheet.splitCsv(header: false, sep: ',') csv_lines.any { csv_items -> if (csv_items.isEmpty()) { + // skip empty line return } def possible_header = csv_items[0] @@ -24,22 +26,40 @@ workflow run_wf { if (header) { if (start_parsing) { // Stop parsing when encountering the next header + println "Encountered next header '[${start_parsing}]', stopping parsing." return true } - if (header == "Data") { + // [Data] for illumina + // [Samples] for Element Biosciences + if (header in ["Data", "Samples"]) { + println "Found header [${header}], start parsing." start_parsing = true + return } } if (start_parsing) { - if ( !sample_id_column_index ) { - sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} - assert sample_id_column_index != -1: - "Could not find column 'Sample_ID' in sample sheet!" + if ( sample_id_column_index == null) { + println "Looking for sample name column." + sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID" || it == "SampleName"} + assert (!sample_id_column_index.isEmpty()): + "Could not find column 'Sample_ID' (Illumina) or 'SampleName' " + + "(Element Biosciences) in run information! Found: ${sample_id_column_index}" + assert sample_id_column_index.size() == 1, "Expected run information file to contain " + + "a column 'Sample_ID' or 'SampleName', not both. Found: ${sample_id_column_index}" + sample_id_column_index = sample_id_column_index[0] + println "Found sample names column '${csv_items[sample_id_column_index]}'." return } samples += csv_items[sample_id_column_index] } + // This return is important! (If 'true' is returned, the parsing stops.) + return } + assert start_parsing: + "Sample information file does not contain [Data] or [Samples] header!" + assert samples.size() > 1: + "Sample information file does not seem to contain any information about the samples!" + println "Finished processing run information file, found samples: ${samples}." println "Looking for fastq files in ${state.input}." def allfastqs = state.input.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/} println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}." @@ -48,17 +68,15 @@ workflow run_wf { def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/ def forward_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ forward_regex} def reverse_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex} - assert forward_fastq : "No forward fastq files were found for sample ${sample_id}" - assert forward_fastq.size() < 2: - "Found multiple forward fastq files corresponding to sample ${sample_id}: ${forward_fastq}" - assert reverse_fastq.size() < 2: - "Found multiple reverse fastq files corresponding to sample ${sample_id}: ${reverse_fastq}." - assert !forward_fastq.isEmpty(): - "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." - // TODO: if one sample had reverse reads, the others must as well. - reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null + assert forward_fastq && !forward_fastq.isEmpty(): "No forward fastq files were found for sample ${sample_id}. " + + "All fastq files in directory: ${allfastqs.collect{it.name}}" + assert (reverse_fastq.isEmpty() || (forward_fastq.size() == reverse_fastq.size())): + "Expected equal number of forward and reverse fastq files for sample ${sample_id}. " + + "Found forward: ${forward_fastq} and reverse: ${reverse_fastq}." + println "Found ${forward_fastq.size()} forward and ${reverse_fastq.size()} reverse " + + "fastq files for sample ${sample_id}" def fastqs_state = [ - "fastq_forward": forward_fastq[0], + "fastq_forward": forward_fastq, "fastq_reverse": reverse_fastq, "_meta": [ "join_id": original_id ], ] diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index 2e13d3a..d275653 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -7,12 +7,24 @@ argument_groups: description: Directory containing raw sequencing data type: file required: true - - name: --sample_sheet + - name: --run_information description: | - Sample sheet as input for BCL Convert. If not specified, - will try to autodetect the sample sheet in the input directory + CSV file containing sample information, which will be used as + input for the demultiplexer. Canonically called 'SampleSheet.csv' (Illumina) + or 'RunManifest.csv' (Element Biosciences). If not specified, + will try to autodetect the sample sheet in the input directory. + Requires --demultiplexer to be set. type: file required: false + - name: "--demultiplexer" + type: string + required: false + choices: ["bases2fastq", "bclconvert"] + description: | + Demultiplexer to use, choice depends on the provider + of the instrument that was used to generate the data. + When not using --sample_sheet, specifying this argument is not + required. - name: Output arguments arguments: - name: --output @@ -40,7 +52,10 @@ resources: test_resources: - type: nextflow_script path: test.nf - entrypoint: test_wf + entrypoint: test_illumina + - type: nextflow_script + path: test.nf + entrypoint: test_bases2fastq dependencies: - name: io/untar @@ -53,6 +68,8 @@ dependencies: repository: local - name: bcl_convert repository: bb + - name: bases2fastq + repository: bb - name: falco repository: bb - name: multiqc @@ -61,7 +78,7 @@ repositories: - name: bb type: vsh repo: biobox - tag: v0.2.0 + tag: v0.3.0 runners: - type: nextflow diff --git a/src/demultiplex/integration_tests.sh b/src/demultiplex/integration_tests.sh index c5f6901..d2e486f 100755 --- a/src/demultiplex/integration_tests.sh +++ b/src/demultiplex/integration_tests.sh @@ -11,7 +11,14 @@ viash ns build --setup cb nextflow run . \ -main-script src/demultiplex/test.nf \ -profile docker,no_publish,local \ - -entry test_wf \ + -entry test_illumina \ -c src/config/labels.config \ --resources_test https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/ \ -resume + + nextflow run . \ + -main-script src/demultiplex/test.nf \ + -profile docker,no_publish,local \ + -entry test_bases2fastq \ + -c src/config/labels.config \ + -resume \ No newline at end of file diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index fa3c3ae..c4cb8ab 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -23,22 +23,79 @@ workflow run_wf { // Gather input files from folder | map {id, state -> def newState = [:] - if (!state.sample_sheet) { - def sample_sheet = state.input.resolve("SampleSheet.csv") - assert (sample_sheet && sample_sheet.isFile()): "Could not find 'SampleSheet.csv' file in input directory." - newState["sample_sheet"] = sample_sheet + println("Provided run information: ${state.run_information} and demultiplexer: ${state.demultiplexer}") + if (!state.run_information) { + println("Run information was not specified, auto-detecting...") + // The supported_platforms hashmap must be a 1-on-1 mapping + // Also, it's keys must be present in the 'choices' field + // for the 'run_information' argument in the viash config. + def supported_platforms = [ + "bclconvert": "SampleSheet.csv", // Illumina + "bases2fastq": "RunManifest.csv" // Element Biosciences + ] + def found_sample_information = supported_platforms.collectEntries{demultiplexer, filename -> + println("Checking if ${filename} can be found in input folder ${state.input}.") + def resolved_filename = state.input.resolve(filename) + if (!resolved_filename.isFile()) { + resolved_filename = null + } + println("Result after looking for run information for ${demultiplexer}: ${resolved_filename}.") + [demultiplexer, resolved_filename] + } + def demultiplexer = null + def run_information = null + found_sample_information.each{demultiplexer_candidate, file_path -> + if (file_path) { + // At this point, a candicate run information file was found. + assert !run_information: "Autodetection of run information " + + "(SampleSheet, RunManifest) failed: " + + "multiple candidate files found in input folder. " + + "Please specify one using --run_information." + run_information = file_path + demultiplexer = demultiplexer_candidate + } + } + + // When autodetecting, the run information should have been found + assert run_information: "No run information file (SampleSheet, RunManifest) " + + "found in input directory." + + // When autodetecting, the demultiplexer must be set if the run information was found + assert demultiplexer, "State error: the demultiplexer should have been autodetected. " + + "Please report this as a bug." + + // When autodetecting, the found demultiplexer must match + // with the demultiplexer that the user has provided (in case it was provided). + if (state.demultiplexer) { + assert state.demultiplexer == demultiplexer, + "Requested to use demultiplexer ${state.demultiplexer} " + + "but demultiplexer based on the autodetected run information " + "file ${run_information} seems to indicate that the demultiplexer " + "should be ${demultiplexer}. Either avoid specifying the demultiplexer " + "or override the autodetection of the run information by providing " + "the file." + } + println("Using run information ${run_information} and demultiplexer ${demultiplexer}") + // At this point, the autodetected state can override the user provided state. + newState = newState + [ + "run_information": run_information, + "demultiplexer": demultiplexer, + ] } - // Do not add InterOp to state because we generate the summary csv's in the next - // step based on the run dir, not the InterOp dir. - def interop_dir = state.input.resolve("InterOp") - assert interop_dir.isDirectory(): "Expected InterOp directory to be present." + if (newState.demultiplexer in ["bclconvert"]) { + // Do not add InterOp to state because we generate the summary csv's in the next + // step based on the run dir, not the InterOp dir. + def interop_dir = state.input.resolve("InterOp") + assert interop_dir.isDirectory(): "Expected InterOp directory to be present." + } def resultState = state + newState [id, resultState] } | interop_summary_to_csv.run( + runIf: {id, state -> state.demultiplexer in ["bclconvert"]}, directives: [label: ["lowmem", "verylowcpu"]], fromState: [ "input": "input", @@ -50,16 +107,40 @@ workflow run_wf { ) // run bcl_convert | bcl_convert.run( + runIf: {id, state -> state.demultiplexer in ["bclconvert"]}, directives: [label: ["highmem", "midcpu"]], fromState: [ "bcl_input_directory": "input", - "sample_sheet": "sample_sheet", + "sample_sheet": "run_information", "output_directory": "output", ], toState: {id, result, state -> def toAdd = [ - "output_bclconvert" : result.output_directory, - "bclconvert_reports": result.reports, + "output_demultiplexer" : result.output_directory, + "run_id": id, + ] + def newState = state + toAdd + return newState + } + ) + // run bases2fastq + | bases2fastq.run( + runIf: {id, state -> state.demultiplexer in ["bases2fastq"]}, + directives: [label: ["highmem", "midcpu"]], + fromState: [ + "analysis_directory": "input", + "run_manifest": "run_information", + "output_directory": "output", + ], + args: [ + "no_projects": true, // Do not put output files in a subfolder for project + //"split_lanes": true, + "legacy_fastq": true, // Illumina style output names + "group_fastq": true, // No subdir per sample + ], + toState: {id, result, state -> + def toAdd = [ + "output_demultiplexer" : result.output_directory, "run_id": id, ] def newState = state + toAdd @@ -68,8 +149,8 @@ workflow run_wf { ) | gather_fastqs_and_validate.run( fromState: [ - "input": "output_bclconvert", - "sample_sheet": "sample_sheet", + "input": "output_demultiplexer", + "sample_sheet": "run_information", ], toState: [ "fastq_forward": "fastq_forward", @@ -110,15 +191,18 @@ workflow run_wf { | multiqc.run( directives: [label: ["lowcpu", "lowmem"]], fromState: {id, state -> - [ - "input": [ - state.output_falco, - state.interop_run_summary.getParent(), - state.interop_index_summary.getParent() - ], + def new_state = [ + "input": [state.output_falco], "output_report": state.output_multiqc, - "cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}', + "cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}' ] + if (state.demultiplexer == "bclconvert") { + new_state["input"] += [ + state.interop_run_summary.getParent(), + state.interop_index_summary.getParent() + ] + } + return new_state }, toState: { id, result, state -> state + [ "output_multiqc" : result.output_report ] @@ -126,7 +210,7 @@ workflow run_wf { ) | setState( [ - "output": "output_bclconvert", + "output": "output_demultiplexer", "output_falco": "output_falco", "output_multiqc": "output_multiqc" ] diff --git a/src/demultiplex/test.nf b/src/demultiplex/test.nf index 922fb81..92d708d 100644 --- a/src/demultiplex/test.nf +++ b/src/demultiplex/test.nf @@ -4,7 +4,7 @@ include { demultiplex } from params.rootDir + "/target/nextflow/demultiplex/main params.resources_test = params.rootDir + "/testData/" -workflow test_wf { +workflow test_illumina { output_ch = Channel.fromList([ [ // sample_sheet: resources_test.resolve("bcl_convert_samplesheet.csv"), @@ -27,5 +27,36 @@ workflow test_wf { assert state.output.isDirectory(): "Expected bclconvert output to be a directory" assert state.output_falco.isDirectory(): "Expected falco output to be a directory" assert state.output_multiqc.isFile(): "Expected multiQC output to be a file" + fastq_files = state.output.listFiles().collect{it.name} + assert ["Undetermined_S0_L001_R1_001.fastq.gz", "Sample23_S3_L001_R1_001.fastq.gz", + "sampletest_S4_L001_R1_001.fastq.gz", "Sample1_S1_L001_R1_001.fastq.gz", + "SampleA_S2_L001_R1_001.fastq.gz"].toSet() == fastq_files.toSet(): \ + "Output directory should contain the expected FASTQ files" + fastq_files.each{ + assert it.length() != 0: "Expected FASTQ file to not be empty" + } + } +} + +workflow test_bases2fastq { + output_ch = Channel.fromList([ + [ + input: "http://element-public-data.s3.amazonaws.com/bases2fastq-share/bases2fastq-v2/20230404-bases2fastq-sim-151-151-9-9.tar.gz", + publish_dir: "output_dir/", + ] + ]) + | map { state -> [ "run", state ] } + | demultiplex.run( + toState: { id, output, state -> + output + [ orig_input: state.input ] } + ) + | view { output -> + assert output.size() == 2 : "outputs should contain two elements; [id, file]" + "Output: $output" + } + | map {id, state -> + assert state.output.isDirectory(): "Expected bases2fastq output to be a directory" + assert state.output_falco.isDirectory(): "Expected falco output to be a directory" + assert state.output_multiqc.isFile(): "Expected multiQC output to be a file" } }