diff --git a/.gitignore b/.gitignore index 9e379c7..78db699 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ testData # Nextflow related files .nextflow +.nextflow.log* work \ No newline at end of file diff --git a/src/config/tests.config b/src/config/tests.config index a49d658..3473a3d 100644 --- a/src/config/tests.config +++ b/src/config/tests.config @@ -1,5 +1,6 @@ -profiles { +process.container = 'nextflow/bash:latest' +profiles { // detect tempdir tempDir = java.nio.file.Paths.get( System.getenv('NXF_TEMP') ?: @@ -26,6 +27,7 @@ profiles { } docker { + docker.fixOwnership = true docker.enabled = true // docker.userEmulation = true singularity.enabled = false diff --git a/src/dataflow/combine_samples/config.vsh.yaml b/src/dataflow/combine_samples/config.vsh.yaml new file mode 100644 index 0000000..d82717e --- /dev/null +++ b/src/dataflow/combine_samples/config.vsh.yaml @@ -0,0 +1,38 @@ +name: combine_samples +namespace: dataflow +description: Combine fastq files from across samples into one event with a list of fastq files per orientation. +argument_groups: + - name: Input arguments + arguments: + - name: "--id" + description: "ID of the new event" + type: string + required: true + - name: --forward_input + type: file + required: true + - name: --reverse_input + type: file + required: false + - name: Output arguments + arguments: + - name: --output_forward + type: file + direction: output + multiple: true + required: true + - name: --output_reverse + type: file + direction: output + multiple: true + required: false +resources: + - type: nextflow_script + path: main.nf + entrypoint: run_wf + +runners: + - type: nextflow + +engines: + - type: native diff --git a/src/dataflow/combine_samples/main.nf b/src/dataflow/combine_samples/main.nf new file mode 100644 index 0000000..1a16fa6 --- /dev/null +++ b/src/dataflow/combine_samples/main.nf @@ -0,0 +1,28 @@ +workflow run_wf { + take: + input_ch + + main: + output_ch = input_ch + | map { id, state -> + def newEvent = [state.id, state + ["_meta": ["join_id": id]]] + newEvent + } + | groupTuple(by: 0, sort: "hash") + | map {run_id, states -> + // Gather the following state for all samples + def forward_fastqs = states.collect{it.forward_input} + def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null} + + def resultState = [ + "output_forward": forward_fastqs, + "output_reverse": reverse_fastqs, + // The join ID is the same across all samples from the same run + "_meta": ["join_id": states[0]._meta.join_id] + ] + return [run_id, resultState] + } + + emit: + output_ch +} \ No newline at end of file diff --git a/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml b/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml new file mode 100644 index 0000000..e0b7733 --- /dev/null +++ b/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml @@ -0,0 +1,36 @@ +name: gather_fastqs_and_validate +namespace: dataflow +description: | + From a directory containing fastq files, gather the files per sample + and validate according to the contents of the sample sheet. +argument_groups: + - name: Input arguments + arguments: + - name: --input + description: Directory containing .fastq files + type: file + required: true + - name: --sample_sheet + description: Sample sheet + type: file + required: true + - name: Output arguments + arguments: + - name: --fastq_forward + type: file + direction: output + required: true + - name: "--fastq_reverse" + type: file + direction: output + required: false +resources: + - type: nextflow_script + path: main.nf + entrypoint: run_wf + +runners: + - type: nextflow + +engines: + - type: native diff --git a/src/dataflow/gather_fastqs_and_validate/main.nf b/src/dataflow/gather_fastqs_and_validate/main.nf new file mode 100644 index 0000000..e6afd77 --- /dev/null +++ b/src/dataflow/gather_fastqs_and_validate/main.nf @@ -0,0 +1,73 @@ +workflow run_wf { + take: + input_ch + + main: + output_ch = input_ch + // Gather input files from BCL convert output folder + | flatMap { id, state -> + println "Processing sample sheet: $state.sample_sheet" + def sample_sheet = state.sample_sheet + def start_parsing = false + def sample_id_column_index = null + def samples = ["Undetermined"] + def original_id = id + + // Parse sample sheet for sample IDs + csv_lines = sample_sheet.splitCsv(header: false, sep: ',') + csv_lines.any { csv_items -> + if (csv_items.isEmpty()) { + return + } + def possible_header = csv_items[0] + def header = possible_header.find(/\[(.*)\]/){fullmatch, header_name -> header_name} + if (header) { + if (start_parsing) { + // Stop parsing when encountering the next header + return true + } + if (header == "Data") { + start_parsing = true + } + } + if (start_parsing) { + if ( !sample_id_column_index ) { + sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} + assert sample_id_column_index != -1: + "Could not find column 'Sample_ID' in sample sheet!" + return + } + samples += csv_items[sample_id_column_index] + } + } + println "Looking for fastq files in ${state.input}." + def allfastqs = state.input.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/} + println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}." + processed_samples = samples.collect { sample_id -> + def forward_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R1_(\d+)\.fastq\.gz$/ + def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/ + def forward_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ forward_regex} + def reverse_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex} + assert forward_fastq : "No forward fastq files were found for sample ${sample_id}" + assert forward_fastq.size() < 2: + "Found multiple forward fastq files corresponding to sample ${sample_id}: ${forward_fastq}" + assert reverse_fastq.size() < 2: + "Found multiple reverse fastq files corresponding to sample ${sample_id}: ${reverse_fastq}." + assert !forward_fastq.isEmpty(): + "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." + // TODO: if one sample had reverse reads, the others must as well. + reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null + def fastqs_state = [ + "fastq_forward": forward_fastq[0], + "fastq_reverse": reverse_fastq, + "_meta": [ "join_id": original_id ], + ] + [sample_id, fastqs_state] + } + println "Finished processing sample sheet." + return processed_samples + } + + emit: + output_ch +} \ No newline at end of file diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index f840a76..1f005e1 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -8,9 +8,11 @@ argument_groups: type: file required: true - name: --sample_sheet - description: Sample sheet + description: | + Sample sheet as input for BCL Convert. If not specified, + will try to autodetect the sample sheet in the input directory type: file - required: true + required: false - name: Output arguments arguments: - name: --output @@ -18,6 +20,18 @@ argument_groups: type: file direction: output required: true + - name: "--output_falco" + description: Directory to write falco output to + type: file + direction: output + required: false + default: "$id/falco" + - name: "--output_multiqc" + description: Directory to write falco output to + type: file + direction: output + required: false + default: "$id/multiqc_report.html" resources: - type: nextflow_script path: main.nf @@ -31,14 +45,22 @@ test_resources: dependencies: - name: io/untar repository: local + - name: dataflow/gather_fastqs_and_validate + repository: local + - name: io/interop_summary_to_csv + repository: local + - name: dataflow/combine_samples + repository: local - name: bcl_convert repository: bb - + - name: falco + repository: bb + - name: multiqc + repository: bb repositories: - name: bb type: vsh repo: viash-hub/biobase - tag: main runners: - type: nextflow diff --git a/src/demultiplex/integration_tests.sh b/src/demultiplex/integration_tests.sh index d327e36..1042753 100755 --- a/src/demultiplex/integration_tests.sh +++ b/src/demultiplex/integration_tests.sh @@ -6,10 +6,11 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -viash ns build -q 'untar|demultiplex' --setup cb +viash ns build --setup cb nextflow run . \ -main-script src/demultiplex/test.nf \ -profile docker,no_publish \ -entry test_wf \ - -c src/config/tests.config + -c src/config/tests.config \ + -resume diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 2cae23b..4c401da 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -3,30 +3,129 @@ workflow run_wf { input_ch main: - output_ch = input_ch + samples_ch = input_ch // untar input if needed | untar.run( - runIf: {id, state -> + runIf: {id, state -> def inputStr = state.input.toString() - inputStr.endsWith(".tar.gz") || inputStr.endsWith(".tar") || inputStr.endsWith(".tgz") ? true : false + inputStr.endsWith(".tar.gz") || \ + inputStr.endsWith(".tar") || \ + inputStr.endsWith(".tgz") ? true : false }, fromState: [ "input": "input", ], toState: { id, result, state -> - state + [ input: result.output ] + state + ["input": result.output] }, ) + // Gather input files from folder + | map {id, state -> + def newState = [:] + if (!state.sample_sheet) { + def sample_sheet = state.input.resolve("SampleSheet.csv") + assert (sample_sheet && sample_sheet.isFile()): "Could not find 'SampleSheet.csv' file in input directory." + newState["sample_sheet"] = sample_sheet + } + + // Do not add InterOp to state because we generate the summary csv's in the next + // step based on the run dir, not the InterOp dir. + def interop_dir = state.input.resolve("InterOp") + assert interop_dir.isDirectory(): "Expected InterOp directory to be present." + + def resultState = state + newState + [id, resultState] + } + + | interop_summary_to_csv.run( + fromState: [ + "input": "input", + ], + toState: [ + "interop_run_summary": "output_run_summary", + "interop_index_summary": "output_index_summary", + ] + ) // run bcl_convert | bcl_convert.run( - fromState: { id, state -> - [ - bcl_input_directory: state.input, - sample_sheet: state.sample_sheet, - ] - }, - toState: { id, result, state -> [ output: result.output_directory ] } - ) + fromState: [ + "bcl_input_directory": "input", + "sample_sheet": "sample_sheet", + "output_directory": "output", + ], + toState: {id, result, state -> + def toAdd = [ + "output_bclconvert" : result.output_directory, + "bclconvert_reports": result.reports, + "run_id": id, + ] + def newState = state + toAdd + return newState + } + ) + | gather_fastqs_and_validate.run( + fromState: [ + "input": "output_bclconvert", + "sample_sheet": "sample_sheet", + ], + toState: [ + "fastq_forward": "fastq_forward", + "fastq_reverse": "fastq_reverse", + ], + ) + + output_ch = samples_ch + | combine_samples.run( + fromState: { id, state -> + [ + "id": state.run_id, + "forward_input": state.fastq_forward, + "reverse_input": state.fastq_reverse, + ] + }, + toState: [ + "forward_fastqs": "output_forward", + "reverse_fastqs": "output_reverse", + ] + ) + | falco.run( + fromState: {id, state -> + reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] + [ + "input": state.forward_fastqs + reverse_fastqs_list, + "outdir": "${state.output_falco}", + "summary_filename": null, + "report_filename": null, + "data_filename": null, + ] + }, + toState: { id, result, state -> + state + [ "output_falco" : result.outdir ] + }, + ) + | multiqc.run( + fromState: {id, state -> + [ + "input": [ + state.output_falco, + state.interop_run_summary.getParent(), + state.interop_index_summary.getParent() + ], + "output_report": state.output_multiqc, + "cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}', + ] + }, + toState: { id, result, state -> + state + [ "output_multiqc" : result.output_report ] + }, + ) + | setState( + [ + "output": "output_bclconvert", + "output_falco": "output_falco", + "output_multiqc": "output_multiqc" + ] + ) emit: output_ch diff --git a/src/demultiplex/test.nf b/src/demultiplex/test.nf index 51a5e03..c46a438 100644 --- a/src/demultiplex/test.nf +++ b/src/demultiplex/test.nf @@ -7,7 +7,7 @@ workflow test_wf { [ // sample_sheet: resources_test.resolve("bcl_convert_samplesheet.csv"), // input: resources_test.resolve("iseq-DI/"), - sample_sheet: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/SampleSheet.csv", + //sample_sheet: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/SampleSheet.csv", input: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/200624_A00834_0183_BHMTFYDRXX.tar.gz", publish_dir: "output_dir/", ] @@ -21,4 +21,9 @@ workflow test_wf { assert output.size() == 2 : "outputs should contain two elements; [id, file]" "Output: $output" } + | map {id, state -> + assert state.output.isDirectory(): "Expected bclconvert output to be a directory" + assert state.output_falco.isDirectory(): "Expected falco output to be a directory" + assert state.output_multiqc.isFile(): "Expected multiQC output to be a file" + } } diff --git a/src/io/interop_summary_to_csv/config.vsh.yaml b/src/io/interop_summary_to_csv/config.vsh.yaml new file mode 100644 index 0000000..2d74808 --- /dev/null +++ b/src/io/interop_summary_to_csv/config.vsh.yaml @@ -0,0 +1,41 @@ +name: interop_summary_to_csv +namespace: io +argument_groups: + - name: Input arguments + arguments: + - name: --input + description: Sequencing run folder (*not* InterOp folder). + type: file + required: true + - name: Output arguments + arguments: + - name: --output_run_summary + type: file + direction: output + required: true + - name: --output_index_summary + type: file + direction: output + required: true +requirements: + - commands: ["summary", "index-summary"] +resources: + - type: bash_script + path: script.sh +engines: + - type: docker + image: debian:stable-slim + setup: + - type: apt + packages: + - procps + - wget + - type: docker + run: | + wget https://github.com/Illumina/interop/releases/download/v1.3.1/interop-1.3.1-Linux-GNU.tar.gz -O /tmp/interop.tar.gz && \ + tar -C /tmp/ --no-same-owner --no-same-permissions -xvf /tmp/interop.tar.gz && \ + mv /tmp/interop-1.3.1-Linux-GNU/bin/index-summary /tmp/interop-1.3.1-Linux-GNU/bin/summary /usr/local/bin/ + +runners: + - type: executable + - type: nextflow \ No newline at end of file diff --git a/src/io/interop_summary_to_csv/script.sh b/src/io/interop_summary_to_csv/script.sh new file mode 100644 index 0000000..e4a37fb --- /dev/null +++ b/src/io/interop_summary_to_csv/script.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +set -eo pipefail + +if [ ! -d "$par_input" ]; then + echo "Input directory does not exist or is not a directory" + exit 1 +fi +$(which summary) --csv=1 "$par_input" 1> "$par_output_run_summary" +$(which index-summary) --csv=1 "$par_input" 1> "$par_output_index_summary"