From edf439795dcdedc3186c245b52bcef06c75391f4 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Fri, 3 May 2024 09:17:13 +0200 Subject: [PATCH 01/11] FEAT: Add falco. --- src/demultiplex/config.vsh.yaml | 5 +- src/demultiplex/main.nf | 100 +++++++++++++++++++++++++++++--- 2 files changed, 96 insertions(+), 9 deletions(-) diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index f840a76..8c39865 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -33,12 +33,13 @@ dependencies: repository: local - name: bcl_convert repository: bb - + - name: falco + repository: bb repositories: - name: bb type: vsh repo: viash-hub/biobase - tag: main + tag: falco runners: - type: nextflow diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 2cae23b..288f2f0 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -3,10 +3,10 @@ workflow run_wf { input_ch main: - output_ch = input_ch + samples_ch = input_ch // untar input if needed | untar.run( - runIf: {id, state -> + runIf: {id, state -> def inputStr = state.input.toString() inputStr.endsWith(".tar.gz") || inputStr.endsWith(".tar") || inputStr.endsWith(".tgz") ? true : false }, @@ -14,19 +14,105 @@ workflow run_wf { "input": "input", ], toState: { id, result, state -> - state + [ input: result.output ] + state + [ "input": result.output ] }, ) // run bcl_convert | bcl_convert.run( fromState: { id, state -> [ - bcl_input_directory: state.input, - sample_sheet: state.sample_sheet, + "bcl_input_directory": state.input, + "sample_sheet": state.sample_sheet, + "output_directory": "${state.output}/bclconvert", ] }, - toState: { id, result, state -> [ output: result.output_directory ] } - ) + toState: { id, result, state -> state + [ "bclconvert_output" : result.output_directory ] } + ) + // Gather input files from BCL convert output folder + | flatMap { id, state -> + println "Processing sample sheet: $state.sample_sheet" + def sample_sheet = file(state.sample_sheet.toAbsolutePath()) + def start_parsing = false + def sample_id_column_index = null + def samples = ["Undetermined"] + def original_id = id + + // Parse sample sheet for sample IDs + csv_lines = sample_sheet.splitCsv(header: false, sep: ',') + csv_lines.any { csv_items -> + if (csv_items.isEmpty()) { + return + } + def possible_header = csv_items[0] + def header = possible_header.find(/\[(.*)\]/){fullmatch, header_name -> header_name} + if (header) { + if (start_parsing) { + // Stop parsing when encountering the next header + return true + } + if (header == "Data") { + start_parsing = true + } + } + if (start_parsing) { + if ( !sample_id_column_index ) { + sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} + assert sample_id_column_index != -1: "Could not find column 'Sample_ID' in sample sheet!" + return + } + samples += csv_items[sample_id_column_index] + } + } + println "Looking for fastq files in ${state.bclconvert_output}." + def allfastqs = files("${state.bclconvert_output}/*.fastq.gz") + println "Found ${allfastqs.size()} fastq files." + processed_samples = samples.collect { sample_id -> + def forward_fastq = files("${state.bclconvert_output}/${sample_id}_S[0-9]*_R1_00?.fastq.gz", type: 'file') + def reverse_fastq = files("${state.bclconvert_output}/${sample_id}_S[0-9]*_R2_00?.fastq.gz", type: 'file') + assert forward_fastq.size() < 2: "Found multiple forward fastq files corresponding to sample ${sample_id}." + assert reverse_fastq.size() < 2: "Found multiple reverse fastq files corresponding to sample ${sample_id}." + assert !forward_fastq.isEmpty(): "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." + // TODO: if one sample had reverse reads, the others must as well. + reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null + [sample_id, ["fastq_forward": forward_fastq[0], "fastq_reverse": reverse_fastq, "run_id": original_id] + state] + } + return processed_samples + } + + output_ch = samples_ch + // Going back to run-level, set the run ID back to the first element so we can use groupTuple + // Using toSortedList will not work when multiple runs are being processed at the same time. + | map { id, state -> + def newEvent = [state.run_id, state + ["sample_id": id]] + newEvent + } + | groupTuple(by: 0, sort: "hash") + | map {run_id, states -> + // Gather the following state for all samples + def forward_fastqs = states.collect{it.fastq_forward} + def reverse_fastqs = states.collect{it.fastq_reverse}.findAll{it != null} + def sample_ids = states.collect{it.sample_id} + // Other arguments should be the same for all samples, just pick the first + // TODO: verify this + def old_state = states[0] + old_state.remove("sample_id") + return [run_id, old_state + ["forward_fastqs": forward_fastqs, "reverse_fastqs": reverse_fastqs, "sample_ids": sample_ids]] + + } + | falco.run( + fromState: {id, state -> + reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] + [ + "input": state.forward_fastqs + reverse_fastqs_list, + "outdir": "${state.output}/falco", + "summary_filename": null, + "report_filename": null, + "data_filename": null, + ] + }, + toState: { id, result, state -> state + [ "falco_output" : result.outdir ] }, + ) + | setState(["output"]) emit: output_ch From 807e7e9132f93c57cc248a1f192f5842b2a06e4e Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Fri, 3 May 2024 10:40:14 +0200 Subject: [PATCH 02/11] Fix output and adjust formatting --- src/demultiplex/config.vsh.yaml | 10 +++++-- src/demultiplex/main.nf | 51 +++++++++++++++++++++++---------- src/demultiplex/test.nf | 4 +++ 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index 8c39865..863ccbe 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -17,7 +17,14 @@ argument_groups: description: Directory to write fastq data to type: file direction: output - required: true + required: false + default: "$id/fastq" + - name: "--output_falco" + description: Directory to write falco output to + type: file + direction: output + required: false + default: "$id/falco" resources: - type: nextflow_script path: main.nf @@ -39,7 +46,6 @@ repositories: - name: bb type: vsh repo: viash-hub/biobase - tag: falco runners: - type: nextflow diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 288f2f0..97b2d8c 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -8,7 +8,9 @@ workflow run_wf { | untar.run( runIf: {id, state -> def inputStr = state.input.toString() - inputStr.endsWith(".tar.gz") || inputStr.endsWith(".tar") || inputStr.endsWith(".tgz") ? true : false + inputStr.endsWith(".tar.gz") || \ + inputStr.endsWith(".tar") || \ + inputStr.endsWith(".tgz") ? true : false }, fromState: [ "input": "input", @@ -23,10 +25,12 @@ workflow run_wf { [ "bcl_input_directory": state.input, "sample_sheet": state.sample_sheet, - "output_directory": "${state.output}/bclconvert", + "output_directory": "${state.output}", ] }, - toState: { id, result, state -> state + [ "bclconvert_output" : result.output_directory ] } + toState: { id, result, state -> + state + [ "output_bclconvert" : result.output_directory ] + } ) // Gather input files from BCL convert output folder | flatMap { id, state -> @@ -57,24 +61,35 @@ workflow run_wf { if (start_parsing) { if ( !sample_id_column_index ) { sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} - assert sample_id_column_index != -1: "Could not find column 'Sample_ID' in sample sheet!" + assert sample_id_column_index != -1: + "Could not find column 'Sample_ID' in sample sheet!" return } samples += csv_items[sample_id_column_index] } } - println "Looking for fastq files in ${state.bclconvert_output}." - def allfastqs = files("${state.bclconvert_output}/*.fastq.gz") + println "Looking for fastq files in ${state.output_bclconvert}." + def allfastqs = files("${state.output_bclconvert}/*.fastq.gz") println "Found ${allfastqs.size()} fastq files." processed_samples = samples.collect { sample_id -> - def forward_fastq = files("${state.bclconvert_output}/${sample_id}_S[0-9]*_R1_00?.fastq.gz", type: 'file') - def reverse_fastq = files("${state.bclconvert_output}/${sample_id}_S[0-9]*_R2_00?.fastq.gz", type: 'file') - assert forward_fastq.size() < 2: "Found multiple forward fastq files corresponding to sample ${sample_id}." - assert reverse_fastq.size() < 2: "Found multiple reverse fastq files corresponding to sample ${sample_id}." - assert !forward_fastq.isEmpty(): "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." + def forward_glob = "${sample_id}_S[0-9]*_R1_00?.fastq.gz" + def reverse_glob = "${sample_id}_S[0-9]*_R2_00?.fastq.gz" + def forward_fastq = files("${state.output_bclconvert}/${forward_glob}", type: 'file') + def reverse_fastq = files("${state.output_bclconvert}/${reverse_glob}", type: 'file') + assert forward_fastq.size() < 2: + "Found multiple forward fastq files corresponding to sample ${sample_id}." + assert reverse_fastq.size() < 2: + "Found multiple reverse fastq files corresponding to sample ${sample_id}." + assert !forward_fastq.isEmpty(): + "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." // TODO: if one sample had reverse reads, the others must as well. reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null - [sample_id, ["fastq_forward": forward_fastq[0], "fastq_reverse": reverse_fastq, "run_id": original_id] + state] + def new_state = [ + "fastq_forward": forward_fastq[0], + "fastq_reverse": reverse_fastq, + "run_id": original_id + ] + [sample_id, new_state + state] } return processed_samples } @@ -96,7 +111,13 @@ workflow run_wf { // TODO: verify this def old_state = states[0] old_state.remove("sample_id") - return [run_id, old_state + ["forward_fastqs": forward_fastqs, "reverse_fastqs": reverse_fastqs, "sample_ids": sample_ids]] + + def keys_to_overwrite = [ + "forward_fastqs": forward_fastqs, + "reverse_fastqs": reverse_fastqs, + "sample_ids": sample_ids, + ] + return [run_id, old_state + keys_to_overwrite] } | falco.run( @@ -110,9 +131,9 @@ workflow run_wf { "data_filename": null, ] }, - toState: { id, result, state -> state + [ "falco_output" : result.outdir ] }, + toState: { id, result, state -> state + [ "output_falco" : result.outdir ] }, ) - | setState(["output"]) + | setState(["output": "output_bclconvert", "output_falco": "output_falco"]) emit: output_ch diff --git a/src/demultiplex/test.nf b/src/demultiplex/test.nf index 51a5e03..b5bf5b5 100644 --- a/src/demultiplex/test.nf +++ b/src/demultiplex/test.nf @@ -21,4 +21,8 @@ workflow test_wf { assert output.size() == 2 : "outputs should contain two elements; [id, file]" "Output: $output" } + | map {id, state -> + assert state.output.isDirectory(): "Expected bclconvert output to be a directory" + assert state.output_falco.isDirectory(): "Expected falco output to be a directory" + } } From adcb1512ee7296c5ea27d8238f8258af7f71b943 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Mon, 6 May 2024 09:06:56 +0200 Subject: [PATCH 03/11] Add test for permissions --- src/config/tests.config | 4 +- src/demultiplex/config.vsh.yaml | 11 +++- src/demultiplex/integration_tests.sh | 5 +- src/demultiplex/main.nf | 94 ++++++++++++++++++---------- src/io/untar/config.vsh.yaml | 4 ++ 5 files changed, 79 insertions(+), 39 deletions(-) diff --git a/src/config/tests.config b/src/config/tests.config index a49d658..5b8d0be 100644 --- a/src/config/tests.config +++ b/src/config/tests.config @@ -1,5 +1,6 @@ -profiles { +process.container = 'nextflow/bash:latest' +profiles { // detect tempdir tempDir = java.nio.file.Paths.get( System.getenv('NXF_TEMP') ?: @@ -26,6 +27,7 @@ profiles { } docker { + docker.fixOwnership = true docker.enabled = true // docker.userEmulation = true singularity.enabled = false diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index 863ccbe..9c1dde7 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -17,14 +17,19 @@ argument_groups: description: Directory to write fastq data to type: file direction: output - required: false - default: "$id/fastq" + required: true - name: "--output_falco" description: Directory to write falco output to type: file direction: output required: false default: "$id/falco" + - name: "--output_multiqc" + description: Directory to write falco output to + type: file + direction: output + required: false + default: "$id/multiqc" resources: - type: nextflow_script path: main.nf @@ -42,6 +47,8 @@ dependencies: repository: bb - name: falco repository: bb + - name: multiqc + repository: bb repositories: - name: bb type: vsh diff --git a/src/demultiplex/integration_tests.sh b/src/demultiplex/integration_tests.sh index d327e36..c887416 100755 --- a/src/demultiplex/integration_tests.sh +++ b/src/demultiplex/integration_tests.sh @@ -6,10 +6,11 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -viash ns build -q 'untar|demultiplex' --setup cb +# viash ns build -q 'untar|demultiplex' --setup cb nextflow run . \ -main-script src/demultiplex/test.nf \ - -profile docker,no_publish \ + -profile no_publish \ + -with-docker \ -entry test_wf \ -c src/config/tests.config diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 97b2d8c..1c426dd 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -16,9 +16,21 @@ workflow run_wf { "input": "input", ], toState: { id, result, state -> - state + [ "input": result.output ] + def newState = state + ["input": result.output] + newState }, ) + // Gather input files from folder + | map {id, state -> + // Get InterOp folder + // TODO: check if InterOp folder is empty + def interop_files = files("${state.input}/InterOp/*.bin") + // assert interop_files.size() > 0: "Could not find any InterOp files." + println "Found InterOp files: ${interop_files}" + def newState = state + ["interop_files": interop_files] + [id, newState] + } + // run bcl_convert | bcl_convert.run( fromState: { id, state -> @@ -40,6 +52,12 @@ workflow run_wf { def sample_id_column_index = null def samples = ["Undetermined"] def original_id = id + + // Gather reports file + def report_dir = files("${state.output_bclconvert}/Reports", type: "dir") + assert report_dir.size() == 1: + "Could not the Reports directory in the output from BCL convert." + report_dir = report_dir[0] // Parse sample sheet for sample IDs csv_lines = sample_sheet.splitCsv(header: false, sep: ',') @@ -73,7 +91,7 @@ workflow run_wf { println "Found ${allfastqs.size()} fastq files." processed_samples = samples.collect { sample_id -> def forward_glob = "${sample_id}_S[0-9]*_R1_00?.fastq.gz" - def reverse_glob = "${sample_id}_S[0-9]*_R2_00?.fastq.gz" + def reverse_glob = "${sample_id}_S[0-9]*_R2_00?.fastq.gz" def forward_fastq = files("${state.output_bclconvert}/${forward_glob}", type: 'file') def reverse_fastq = files("${state.output_bclconvert}/${reverse_glob}", type: 'file') assert forward_fastq.size() < 2: @@ -87,7 +105,8 @@ workflow run_wf { def new_state = [ "fastq_forward": forward_fastq[0], "fastq_reverse": reverse_fastq, - "run_id": original_id + "run_id": original_id, + "bclconvert_reports": report_dir, ] [sample_id, new_state + state] } @@ -102,38 +121,45 @@ workflow run_wf { newEvent } | groupTuple(by: 0, sort: "hash") - | map {run_id, states -> - // Gather the following state for all samples - def forward_fastqs = states.collect{it.fastq_forward} - def reverse_fastqs = states.collect{it.fastq_reverse}.findAll{it != null} - def sample_ids = states.collect{it.sample_id} - // Other arguments should be the same for all samples, just pick the first - // TODO: verify this - def old_state = states[0] - old_state.remove("sample_id") + // | map {run_id, states -> + // // Gather the following state for all samples + // def forward_fastqs = states.collect{it.fastq_forward} + // def reverse_fastqs = states.collect{it.fastq_reverse}.findAll{it != null} + // def sample_ids = states.collect{it.sample_id} + // // Other arguments should be the same for all samples, just pick the first + // // TODO: verify this + // def old_state = states[0] + // old_state.remove("sample_id") - def keys_to_overwrite = [ - "forward_fastqs": forward_fastqs, - "reverse_fastqs": reverse_fastqs, - "sample_ids": sample_ids, - ] - return [run_id, old_state + keys_to_overwrite] - - } - | falco.run( - fromState: {id, state -> - reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] - [ - "input": state.forward_fastqs + reverse_fastqs_list, - "outdir": "${state.output}/falco", - "summary_filename": null, - "report_filename": null, - "data_filename": null, - ] - }, - toState: { id, result, state -> state + [ "output_falco" : result.outdir ] }, - ) - | setState(["output": "output_bclconvert", "output_falco": "output_falco"]) + // def keys_to_overwrite = [ + // "forward_fastqs": forward_fastqs, + // "reverse_fastqs": reverse_fastqs, + // "sample_ids": sample_ids, + // ] + // return [run_id, old_state + keys_to_overwrite] + // } + // | falco.run( + // fromState: {id, state -> + // reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] + // [ + // "input": state.forward_fastqs + reverse_fastqs_list, + // "outdir": "${state.output}/falco", + // "summary_filename": null, + // "report_filename": null, + // "data_filename": null, + // ] + // }, + // toState: { id, result, state -> state + [ "output_falco" : result.outdir ] }, + // ) + // | multiqc.run( + // fromState: {id, state -> + // // Gather text files from Falco output directory + // falco_txt_output = files("$state.output_falco/*.txt", type: "file") + // ["input": falco_txt_output + state.interop_files] + // }, + // toState: { id, result, state -> state + [ "output_multiqc" : result.outdir ] }, + // ) + // | setState(["output": "output_bclconvert", "output_falco": "output_falco", "output_multiqc": "output_multiqc"]) emit: output_ch diff --git a/src/io/untar/config.vsh.yaml b/src/io/untar/config.vsh.yaml index a8ac4ac..857c5b1 100644 --- a/src/io/untar/config.vsh.yaml +++ b/src/io/untar/config.vsh.yaml @@ -34,6 +34,10 @@ test_resources: engines: - type: docker image: debian:stable-slim + setup: + - type: apt + packages: + - procps runners: - type: executable From 35ba2f13b02801e5109b446ecc5be0d3c7d50514 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 7 May 2024 09:17:20 +0200 Subject: [PATCH 04/11] FEAT: Add multiqc. --- .gitignore | 1 + src/config/tests.config | 2 +- src/demultiplex/config.vsh.yaml | 2 +- src/demultiplex/integration_tests.sh | 5 +- src/demultiplex/main.nf | 116 +++++++++++++++------------ src/io/untar/script.sh | 4 +- 6 files changed, 70 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 9e379c7..78db699 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ testData # Nextflow related files .nextflow +.nextflow.log* work \ No newline at end of file diff --git a/src/config/tests.config b/src/config/tests.config index 5b8d0be..3473a3d 100644 --- a/src/config/tests.config +++ b/src/config/tests.config @@ -27,7 +27,7 @@ profiles { } docker { - docker.fixOwnership = true + docker.fixOwnership = true docker.enabled = true // docker.userEmulation = true singularity.enabled = false diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index 9c1dde7..b6b8ee7 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -29,7 +29,7 @@ argument_groups: type: file direction: output required: false - default: "$id/multiqc" + default: "$id/multiqc_report.html" resources: - type: nextflow_script path: main.nf diff --git a/src/demultiplex/integration_tests.sh b/src/demultiplex/integration_tests.sh index c887416..d327e36 100755 --- a/src/demultiplex/integration_tests.sh +++ b/src/demultiplex/integration_tests.sh @@ -6,11 +6,10 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -# viash ns build -q 'untar|demultiplex' --setup cb +viash ns build -q 'untar|demultiplex' --setup cb nextflow run . \ -main-script src/demultiplex/test.nf \ - -profile no_publish \ - -with-docker \ + -profile docker,no_publish \ -entry test_wf \ -c src/config/tests.config diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 1c426dd..887f919 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -24,10 +24,8 @@ workflow run_wf { | map {id, state -> // Get InterOp folder // TODO: check if InterOp folder is empty - def interop_files = files("${state.input}/InterOp/*.bin") - // assert interop_files.size() > 0: "Could not find any InterOp files." - println "Found InterOp files: ${interop_files}" - def newState = state + ["interop_files": interop_files] + def interop_dir = files("${state.input}/InterOp/", type: 'dir') + def newState = state + ["interop_dir": interop_dir] [id, newState] } @@ -40,8 +38,8 @@ workflow run_wf { "output_directory": "${state.output}", ] }, - toState: { id, result, state -> - state + [ "output_bclconvert" : result.output_directory ] + toState: { id, result, state -> + state + [ "output_bclconvert" : result.output_directory ] } ) // Gather input files from BCL convert output folder @@ -52,7 +50,7 @@ workflow run_wf { def sample_id_column_index = null def samples = ["Undetermined"] def original_id = id - + // Gather reports file def report_dir = files("${state.output_bclconvert}/Reports", type: "dir") assert report_dir.size() == 1: @@ -79,7 +77,7 @@ workflow run_wf { if (start_parsing) { if ( !sample_id_column_index ) { sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} - assert sample_id_column_index != -1: + assert sample_id_column_index != -1: "Could not find column 'Sample_ID' in sample sheet!" return } @@ -88,28 +86,30 @@ workflow run_wf { } println "Looking for fastq files in ${state.output_bclconvert}." def allfastqs = files("${state.output_bclconvert}/*.fastq.gz") - println "Found ${allfastqs.size()} fastq files." + println "Found ${allfastqs.size()} fastq files for the following samples: ${samples}." processed_samples = samples.collect { sample_id -> def forward_glob = "${sample_id}_S[0-9]*_R1_00?.fastq.gz" def reverse_glob = "${sample_id}_S[0-9]*_R2_00?.fastq.gz" def forward_fastq = files("${state.output_bclconvert}/${forward_glob}", type: 'file') def reverse_fastq = files("${state.output_bclconvert}/${reverse_glob}", type: 'file') - assert forward_fastq.size() < 2: + assert forward_fastq.size() < 2: "Found multiple forward fastq files corresponding to sample ${sample_id}." - assert reverse_fastq.size() < 2: + assert reverse_fastq.size() < 2: "Found multiple reverse fastq files corresponding to sample ${sample_id}." - assert !forward_fastq.isEmpty(): + assert !forward_fastq.isEmpty(): "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." // TODO: if one sample had reverse reads, the others must as well. reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null - def new_state = [ + def bcl_convert_output_state = [ "fastq_forward": forward_fastq[0], "fastq_reverse": reverse_fastq, "run_id": original_id, "bclconvert_reports": report_dir, ] - [sample_id, new_state + state] + def newState = bcl_convert_output_state + state + [sample_id, newState] } + println "Finished processing sample sheet." return processed_samples } @@ -121,45 +121,55 @@ workflow run_wf { newEvent } | groupTuple(by: 0, sort: "hash") - // | map {run_id, states -> - // // Gather the following state for all samples - // def forward_fastqs = states.collect{it.fastq_forward} - // def reverse_fastqs = states.collect{it.fastq_reverse}.findAll{it != null} - // def sample_ids = states.collect{it.sample_id} - // // Other arguments should be the same for all samples, just pick the first - // // TODO: verify this - // def old_state = states[0] - // old_state.remove("sample_id") - - // def keys_to_overwrite = [ - // "forward_fastqs": forward_fastqs, - // "reverse_fastqs": reverse_fastqs, - // "sample_ids": sample_ids, - // ] - // return [run_id, old_state + keys_to_overwrite] - // } - // | falco.run( - // fromState: {id, state -> - // reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] - // [ - // "input": state.forward_fastqs + reverse_fastqs_list, - // "outdir": "${state.output}/falco", - // "summary_filename": null, - // "report_filename": null, - // "data_filename": null, - // ] - // }, - // toState: { id, result, state -> state + [ "output_falco" : result.outdir ] }, - // ) - // | multiqc.run( - // fromState: {id, state -> - // // Gather text files from Falco output directory - // falco_txt_output = files("$state.output_falco/*.txt", type: "file") - // ["input": falco_txt_output + state.interop_files] - // }, - // toState: { id, result, state -> state + [ "output_multiqc" : result.outdir ] }, - // ) - // | setState(["output": "output_bclconvert", "output_falco": "output_falco", "output_multiqc": "output_multiqc"]) + | map {run_id, states -> + // Gather the following state for all samples + def forward_fastqs = states.collect{it.fastq_forward} + def reverse_fastqs = states.collect{it.fastq_reverse}.findAll{it != null} + def sample_ids = states.collect{it.sample_id} + // Other arguments should be the same for all samples, just pick the first + // TODO: verify this + def old_state = states[0] + old_state.remove("sample_id") + + def keys_to_overwrite = [ + "forward_fastqs": forward_fastqs, + "reverse_fastqs": reverse_fastqs, + "sample_ids": sample_ids, + ] + return [run_id, old_state + keys_to_overwrite] + } + | falco.run( + fromState: {id, state -> + reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] + [ + "input": state.forward_fastqs + reverse_fastqs_list, + "outdir": "${state.output_falco}", + "summary_filename": null, + "report_filename": null, + "data_filename": null, + ] + }, + toState: { id, result, state -> + def newState = state + [ "output_falco" : result.outdir ] + return newState + }, + ) + | multiqc.run( + fromState: {id, state -> + [ + "input": [state.output_falco, state.interop_dir], + "output_report": state.output_multiqc, + "cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}', + "strict": true + ] + }, + toState: { id, result, state -> + def newState = state + [ "output_multiqc" : result.output_report ] + return newState + }, + ) + | view {"After multiqc: $it"} + | setState(["output": "output_bclconvert", "output_falco": "output_falco", "output_multiqc": "output_multiqc"]) emit: output_ch diff --git a/src/io/untar/script.sh b/src/io/untar/script.sh index 42da3d1..253ff47 100644 --- a/src/io/untar/script.sh +++ b/src/io/untar/script.sh @@ -36,6 +36,6 @@ fi echo "Starting extraction of tarball '$par_input' to output directory '$par_output'." mkdir -p "$par_output" -echo "executing 'tar --directory=$par_output ${extra_args[@]} -xavf $par_input'" -tar --directory="$par_output" ${extra_args[@]} -xavf "$par_input" +echo "executing 'tar --no-same-owner --no-same-permissions --directory=$par_output ${extra_args[@]} -xavf $par_input'" +tar --no-same-owner --no-same-permissions --directory="$par_output" ${extra_args[@]} -xavf "$par_input" From 0576fcf9841bd650be64dbe928ed9e8d7019d930 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 7 May 2024 09:47:59 +0200 Subject: [PATCH 05/11] Cleanup code. --- src/demultiplex/main.nf | 126 ++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 49 deletions(-) diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 887f919..f576a3b 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -1,47 +1,9 @@ -workflow run_wf { +workflow split_samples_and_validate { take: input_ch main: - samples_ch = input_ch - // untar input if needed - | untar.run( - runIf: {id, state -> - def inputStr = state.input.toString() - inputStr.endsWith(".tar.gz") || \ - inputStr.endsWith(".tar") || \ - inputStr.endsWith(".tgz") ? true : false - }, - fromState: [ - "input": "input", - ], - toState: { id, result, state -> - def newState = state + ["input": result.output] - newState - }, - ) - // Gather input files from folder - | map {id, state -> - // Get InterOp folder - // TODO: check if InterOp folder is empty - def interop_dir = files("${state.input}/InterOp/", type: 'dir') - def newState = state + ["interop_dir": interop_dir] - [id, newState] - } - - // run bcl_convert - | bcl_convert.run( - fromState: { id, state -> - [ - "bcl_input_directory": state.input, - "sample_sheet": state.sample_sheet, - "output_directory": "${state.output}", - ] - }, - toState: { id, result, state -> - state + [ "output_bclconvert" : result.output_directory ] - } - ) + output_ch = input_ch // Gather input files from BCL convert output folder | flatMap { id, state -> println "Processing sample sheet: $state.sample_sheet" @@ -113,8 +75,18 @@ workflow run_wf { return processed_samples } - output_ch = samples_ch - // Going back to run-level, set the run ID back to the first element so we can use groupTuple + emit: + output_ch +} + + +workflow combine_samples { + take: + input_ch + + main: + output_ch = input_ch + // Going back to run-level, set the run ID back to the first element so we can use groupTuple // Using toSortedList will not work when multiple runs are being processed at the same time. | map { id, state -> def newEvent = [state.run_id, state + ["sample_id": id]] @@ -138,6 +110,60 @@ workflow run_wf { ] return [run_id, old_state + keys_to_overwrite] } + + emit: + output_ch + +} + +workflow run_wf { + take: + input_ch + + main: + samples_ch = input_ch + // untar input if needed + | untar.run( + runIf: {id, state -> + def inputStr = state.input.toString() + inputStr.endsWith(".tar.gz") || \ + inputStr.endsWith(".tar") || \ + inputStr.endsWith(".tgz") ? true : false + }, + fromState: [ + "input": "input", + ], + toState: { id, result, state -> + state + ["input": result.output] + }, + ) + // Gather input files from folder + | map {id, state -> + // Get InterOp folder + // TODO: check if InterOp folder is empty + def interop_dir = files("${state.input}/InterOp/", type: 'dir') + def newState = state + ["interop_dir": interop_dir] + [id, newState] + } + + // run bcl_convert + | bcl_convert.run( + fromState: { id, state -> + [ + "bcl_input_directory": state.input, + "sample_sheet": state.sample_sheet, + "output_directory": "${state.output}", + ] + }, + toState: { id, result, state -> + state + [ "output_bclconvert" : result.output_directory ] + } + ) + | split_samples_and_validate + + + output_ch = samples_ch + | combine_samples | falco.run( fromState: {id, state -> reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] @@ -150,8 +176,7 @@ workflow run_wf { ] }, toState: { id, result, state -> - def newState = state + [ "output_falco" : result.outdir ] - return newState + state + [ "output_falco" : result.outdir ] }, ) | multiqc.run( @@ -160,16 +185,19 @@ workflow run_wf { "input": [state.output_falco, state.interop_dir], "output_report": state.output_multiqc, "cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}', - "strict": true ] }, toState: { id, result, state -> - def newState = state + [ "output_multiqc" : result.output_report ] - return newState + state + [ "output_multiqc" : result.output_report ] }, ) - | view {"After multiqc: $it"} - | setState(["output": "output_bclconvert", "output_falco": "output_falco", "output_multiqc": "output_multiqc"]) + | setState( + [ + "output": "output_bclconvert", + "output_falco": "output_falco", + "output_multiqc": "output_multiqc" + ] + ) emit: output_ch From 9696a92c2d715d024441962d2f0b59bd2dbcc426 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 7 May 2024 13:05:37 +0200 Subject: [PATCH 06/11] Update file parsing --- src/demultiplex/main.nf | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index f576a3b..d55d30e 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -7,18 +7,12 @@ workflow split_samples_and_validate { // Gather input files from BCL convert output folder | flatMap { id, state -> println "Processing sample sheet: $state.sample_sheet" - def sample_sheet = file(state.sample_sheet.toAbsolutePath()) + def sample_sheet = state.sample_sheet def start_parsing = false def sample_id_column_index = null def samples = ["Undetermined"] def original_id = id - // Gather reports file - def report_dir = files("${state.output_bclconvert}/Reports", type: "dir") - assert report_dir.size() == 1: - "Could not the Reports directory in the output from BCL convert." - report_dir = report_dir[0] - // Parse sample sheet for sample IDs csv_lines = sample_sheet.splitCsv(header: false, sep: ',') csv_lines.any { csv_items -> @@ -47,17 +41,18 @@ workflow split_samples_and_validate { } } println "Looking for fastq files in ${state.output_bclconvert}." - def allfastqs = files("${state.output_bclconvert}/*.fastq.gz") - println "Found ${allfastqs.size()} fastq files for the following samples: ${samples}." + def allfastqs = state.output_bclconvert.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/} + println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}." processed_samples = samples.collect { sample_id -> - def forward_glob = "${sample_id}_S[0-9]*_R1_00?.fastq.gz" - def reverse_glob = "${sample_id}_S[0-9]*_R2_00?.fastq.gz" - def forward_fastq = files("${state.output_bclconvert}/${forward_glob}", type: 'file') - def reverse_fastq = files("${state.output_bclconvert}/${reverse_glob}", type: 'file') + def forward_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R1_(\d+)\.fastq\.gz$/ + def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/ + def forward_fastq = state.output_bclconvert.listFiles().findAll{it.isFile() && it.name ==~ forward_regex} + def reverse_fastq = state.output_bclconvert.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex} + assert forward_fastq : "No forward fastq files were found for sample ${sample_id}" assert forward_fastq.size() < 2: - "Found multiple forward fastq files corresponding to sample ${sample_id}." + "Found multiple forward fastq files corresponding to sample ${sample_id}: ${forward_fastq}" assert reverse_fastq.size() < 2: - "Found multiple reverse fastq files corresponding to sample ${sample_id}." + "Found multiple reverse fastq files corresponding to sample ${sample_id}: ${reverse_fastq}." assert !forward_fastq.isEmpty(): "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." // TODO: if one sample had reverse reads, the others must as well. @@ -66,7 +61,6 @@ workflow split_samples_and_validate { "fastq_forward": forward_fastq[0], "fastq_reverse": reverse_fastq, "run_id": original_id, - "bclconvert_reports": report_dir, ] def newState = bcl_convert_output_state + state [sample_id, newState] @@ -156,7 +150,11 @@ workflow run_wf { ] }, toState: { id, result, state -> - state + [ "output_bclconvert" : result.output_directory ] + def newState = [ + "output_bclconvert" : result.output_directory, + "bclconvert_reports": result.reports, + ] + state + newState } ) | split_samples_and_validate From 852532eefdb0a5d14c415f42953be6fa4a93c9f2 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 7 May 2024 16:00:36 +0200 Subject: [PATCH 07/11] Refactor into different workflows --- src/dataflow/combine_samples/config.vsh.yaml | 38 ++++ src/dataflow/combine_samples/main.nf | 28 +++ .../config.vsh.yaml | 36 ++++ .../gather_fastqs_and_validate/main.nf | 73 ++++++++ src/demultiplex/config.vsh.yaml | 4 + src/demultiplex/integration_tests.sh | 5 +- src/demultiplex/main.nf | 174 +++++------------- 7 files changed, 223 insertions(+), 135 deletions(-) create mode 100644 src/dataflow/combine_samples/config.vsh.yaml create mode 100644 src/dataflow/combine_samples/main.nf create mode 100644 src/dataflow/gather_fastqs_and_validate/config.vsh.yaml create mode 100644 src/dataflow/gather_fastqs_and_validate/main.nf diff --git a/src/dataflow/combine_samples/config.vsh.yaml b/src/dataflow/combine_samples/config.vsh.yaml new file mode 100644 index 0000000..d82717e --- /dev/null +++ b/src/dataflow/combine_samples/config.vsh.yaml @@ -0,0 +1,38 @@ +name: combine_samples +namespace: dataflow +description: Combine fastq files from across samples into one event with a list of fastq files per orientation. +argument_groups: + - name: Input arguments + arguments: + - name: "--id" + description: "ID of the new event" + type: string + required: true + - name: --forward_input + type: file + required: true + - name: --reverse_input + type: file + required: false + - name: Output arguments + arguments: + - name: --output_forward + type: file + direction: output + multiple: true + required: true + - name: --output_reverse + type: file + direction: output + multiple: true + required: false +resources: + - type: nextflow_script + path: main.nf + entrypoint: run_wf + +runners: + - type: nextflow + +engines: + - type: native diff --git a/src/dataflow/combine_samples/main.nf b/src/dataflow/combine_samples/main.nf new file mode 100644 index 0000000..1a16fa6 --- /dev/null +++ b/src/dataflow/combine_samples/main.nf @@ -0,0 +1,28 @@ +workflow run_wf { + take: + input_ch + + main: + output_ch = input_ch + | map { id, state -> + def newEvent = [state.id, state + ["_meta": ["join_id": id]]] + newEvent + } + | groupTuple(by: 0, sort: "hash") + | map {run_id, states -> + // Gather the following state for all samples + def forward_fastqs = states.collect{it.forward_input} + def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null} + + def resultState = [ + "output_forward": forward_fastqs, + "output_reverse": reverse_fastqs, + // The join ID is the same across all samples from the same run + "_meta": ["join_id": states[0]._meta.join_id] + ] + return [run_id, resultState] + } + + emit: + output_ch +} \ No newline at end of file diff --git a/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml b/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml new file mode 100644 index 0000000..e0b7733 --- /dev/null +++ b/src/dataflow/gather_fastqs_and_validate/config.vsh.yaml @@ -0,0 +1,36 @@ +name: gather_fastqs_and_validate +namespace: dataflow +description: | + From a directory containing fastq files, gather the files per sample + and validate according to the contents of the sample sheet. +argument_groups: + - name: Input arguments + arguments: + - name: --input + description: Directory containing .fastq files + type: file + required: true + - name: --sample_sheet + description: Sample sheet + type: file + required: true + - name: Output arguments + arguments: + - name: --fastq_forward + type: file + direction: output + required: true + - name: "--fastq_reverse" + type: file + direction: output + required: false +resources: + - type: nextflow_script + path: main.nf + entrypoint: run_wf + +runners: + - type: nextflow + +engines: + - type: native diff --git a/src/dataflow/gather_fastqs_and_validate/main.nf b/src/dataflow/gather_fastqs_and_validate/main.nf new file mode 100644 index 0000000..e6afd77 --- /dev/null +++ b/src/dataflow/gather_fastqs_and_validate/main.nf @@ -0,0 +1,73 @@ +workflow run_wf { + take: + input_ch + + main: + output_ch = input_ch + // Gather input files from BCL convert output folder + | flatMap { id, state -> + println "Processing sample sheet: $state.sample_sheet" + def sample_sheet = state.sample_sheet + def start_parsing = false + def sample_id_column_index = null + def samples = ["Undetermined"] + def original_id = id + + // Parse sample sheet for sample IDs + csv_lines = sample_sheet.splitCsv(header: false, sep: ',') + csv_lines.any { csv_items -> + if (csv_items.isEmpty()) { + return + } + def possible_header = csv_items[0] + def header = possible_header.find(/\[(.*)\]/){fullmatch, header_name -> header_name} + if (header) { + if (start_parsing) { + // Stop parsing when encountering the next header + return true + } + if (header == "Data") { + start_parsing = true + } + } + if (start_parsing) { + if ( !sample_id_column_index ) { + sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} + assert sample_id_column_index != -1: + "Could not find column 'Sample_ID' in sample sheet!" + return + } + samples += csv_items[sample_id_column_index] + } + } + println "Looking for fastq files in ${state.input}." + def allfastqs = state.input.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/} + println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}." + processed_samples = samples.collect { sample_id -> + def forward_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R1_(\d+)\.fastq\.gz$/ + def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/ + def forward_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ forward_regex} + def reverse_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex} + assert forward_fastq : "No forward fastq files were found for sample ${sample_id}" + assert forward_fastq.size() < 2: + "Found multiple forward fastq files corresponding to sample ${sample_id}: ${forward_fastq}" + assert reverse_fastq.size() < 2: + "Found multiple reverse fastq files corresponding to sample ${sample_id}: ${reverse_fastq}." + assert !forward_fastq.isEmpty(): + "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." + // TODO: if one sample had reverse reads, the others must as well. + reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null + def fastqs_state = [ + "fastq_forward": forward_fastq[0], + "fastq_reverse": reverse_fastq, + "_meta": [ "join_id": original_id ], + ] + [sample_id, fastqs_state] + } + println "Finished processing sample sheet." + return processed_samples + } + + emit: + output_ch +} \ No newline at end of file diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index b6b8ee7..ef69f48 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -43,6 +43,10 @@ test_resources: dependencies: - name: io/untar repository: local + - name: dataflow/gather_fastqs_and_validate + repository: local + - name: dataflow/combine_samples + repository: local - name: bcl_convert repository: bb - name: falco diff --git a/src/demultiplex/integration_tests.sh b/src/demultiplex/integration_tests.sh index d327e36..a69b85f 100755 --- a/src/demultiplex/integration_tests.sh +++ b/src/demultiplex/integration_tests.sh @@ -6,10 +6,11 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -viash ns build -q 'untar|demultiplex' --setup cb +viash ns build -q 'untar|demultiplex|gather_fastqs_and_validate|combine_samples' --setup cb nextflow run . \ -main-script src/demultiplex/test.nf \ -profile docker,no_publish \ -entry test_wf \ - -c src/config/tests.config + -c src/config/tests.config \ + -resume diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index d55d30e..4532d7d 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -1,115 +1,3 @@ -workflow split_samples_and_validate { - take: - input_ch - - main: - output_ch = input_ch - // Gather input files from BCL convert output folder - | flatMap { id, state -> - println "Processing sample sheet: $state.sample_sheet" - def sample_sheet = state.sample_sheet - def start_parsing = false - def sample_id_column_index = null - def samples = ["Undetermined"] - def original_id = id - - // Parse sample sheet for sample IDs - csv_lines = sample_sheet.splitCsv(header: false, sep: ',') - csv_lines.any { csv_items -> - if (csv_items.isEmpty()) { - return - } - def possible_header = csv_items[0] - def header = possible_header.find(/\[(.*)\]/){fullmatch, header_name -> header_name} - if (header) { - if (start_parsing) { - // Stop parsing when encountering the next header - return true - } - if (header == "Data") { - start_parsing = true - } - } - if (start_parsing) { - if ( !sample_id_column_index ) { - sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"} - assert sample_id_column_index != -1: - "Could not find column 'Sample_ID' in sample sheet!" - return - } - samples += csv_items[sample_id_column_index] - } - } - println "Looking for fastq files in ${state.output_bclconvert}." - def allfastqs = state.output_bclconvert.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/} - println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}." - processed_samples = samples.collect { sample_id -> - def forward_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R1_(\d+)\.fastq\.gz$/ - def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/ - def forward_fastq = state.output_bclconvert.listFiles().findAll{it.isFile() && it.name ==~ forward_regex} - def reverse_fastq = state.output_bclconvert.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex} - assert forward_fastq : "No forward fastq files were found for sample ${sample_id}" - assert forward_fastq.size() < 2: - "Found multiple forward fastq files corresponding to sample ${sample_id}: ${forward_fastq}" - assert reverse_fastq.size() < 2: - "Found multiple reverse fastq files corresponding to sample ${sample_id}: ${reverse_fastq}." - assert !forward_fastq.isEmpty(): - "Expected a forward fastq file to have been created correspondig to sample ${sample_id}." - // TODO: if one sample had reverse reads, the others must as well. - reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null - def bcl_convert_output_state = [ - "fastq_forward": forward_fastq[0], - "fastq_reverse": reverse_fastq, - "run_id": original_id, - ] - def newState = bcl_convert_output_state + state - [sample_id, newState] - } - println "Finished processing sample sheet." - return processed_samples - } - - emit: - output_ch -} - - -workflow combine_samples { - take: - input_ch - - main: - output_ch = input_ch - // Going back to run-level, set the run ID back to the first element so we can use groupTuple - // Using toSortedList will not work when multiple runs are being processed at the same time. - | map { id, state -> - def newEvent = [state.run_id, state + ["sample_id": id]] - newEvent - } - | groupTuple(by: 0, sort: "hash") - | map {run_id, states -> - // Gather the following state for all samples - def forward_fastqs = states.collect{it.fastq_forward} - def reverse_fastqs = states.collect{it.fastq_reverse}.findAll{it != null} - def sample_ids = states.collect{it.sample_id} - // Other arguments should be the same for all samples, just pick the first - // TODO: verify this - def old_state = states[0] - old_state.remove("sample_id") - - def keys_to_overwrite = [ - "forward_fastqs": forward_fastqs, - "reverse_fastqs": reverse_fastqs, - "sample_ids": sample_ids, - ] - return [run_id, old_state + keys_to_overwrite] - } - - emit: - output_ch - -} - workflow run_wf { take: input_ch @@ -133,35 +21,55 @@ workflow run_wf { ) // Gather input files from folder | map {id, state -> - // Get InterOp folder - // TODO: check if InterOp folder is empty - def interop_dir = files("${state.input}/InterOp/", type: 'dir') + def interop_dir = state.input.resolve("InterOp") + assert interop_dir.isDirectory(): "Expected InterOp directory to be present." def newState = state + ["interop_dir": interop_dir] [id, newState] } // run bcl_convert | bcl_convert.run( - fromState: { id, state -> - [ - "bcl_input_directory": state.input, - "sample_sheet": state.sample_sheet, - "output_directory": "${state.output}", - ] - }, - toState: { id, result, state -> - def newState = [ - "output_bclconvert" : result.output_directory, - "bclconvert_reports": result.reports, - ] - state + newState - } + fromState: [ + "bcl_input_directory": "input", + "sample_sheet": "sample_sheet", + "output_directory": "output", + ], + toState: {id, result, state -> + def toAdd = [ + "output_bclconvert" : result.output_directory, + "bclconvert_reports": result.reports, + "run_id": id, + ] + def newState = state + toAdd + return newState + } + ) + | gather_fastqs_and_validate.run( + fromState: [ + "input": "output_bclconvert", + "sample_sheet": "sample_sheet", + ], + toState: [ + "fastq_forward": "fastq_forward", + "fastq_reverse": "fastq_reverse", + ], ) - | split_samples_and_validate - - output_ch = samples_ch - | combine_samples + output_ch = samples_ch + | view {"Before combine_samples: $it"} + | combine_samples.run( + fromState: { id, state -> + [ + "id": state.run_id, + "forward_input": state.fastq_forward, + "reverse_input": state.fastq_reverse, + ] + }, + toState: [ + "forward_fastqs": "output_forward", + "reverse_fastqs": "output_reverse", + ] + ) | falco.run( fromState: {id, state -> reverse_fastqs_list = state.reverse_fastqs ? state.reverse_fastqs : [] From 6554abbbad89cfd6ed63715982d3d009423abd0a Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 7 May 2024 16:02:25 +0200 Subject: [PATCH 08/11] Remove debbuging view() --- src/demultiplex/main.nf | 1 - 1 file changed, 1 deletion(-) diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 4532d7d..7548f1d 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -56,7 +56,6 @@ workflow run_wf { ) output_ch = samples_ch - | view {"Before combine_samples: $it"} | combine_samples.run( fromState: { id, state -> [ From c48b92d5d5c3196927964bda231e1fe59ee4003f Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Tue, 7 May 2024 16:03:05 +0200 Subject: [PATCH 09/11] Add test for multiqc --- src/demultiplex/test.nf | 1 + 1 file changed, 1 insertion(+) diff --git a/src/demultiplex/test.nf b/src/demultiplex/test.nf index b5bf5b5..4ece866 100644 --- a/src/demultiplex/test.nf +++ b/src/demultiplex/test.nf @@ -24,5 +24,6 @@ workflow test_wf { | map {id, state -> assert state.output.isDirectory(): "Expected bclconvert output to be a directory" assert state.output_falco.isDirectory(): "Expected falco output to be a directory" + assert state.output_multiqc.isFile(): "Expected multiQC output to be a file" } } From 73eb3bc8abcd1a2b8980081ba3fb7b975723d9bb Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Wed, 8 May 2024 13:50:36 +0200 Subject: [PATCH 10/11] Autodetect SampleSheet from input directory --- src/demultiplex/config.vsh.yaml | 6 ++++-- src/demultiplex/main.nf | 12 ++++++++++-- src/demultiplex/test.nf | 2 +- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index ef69f48..893c3c9 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -8,9 +8,11 @@ argument_groups: type: file required: true - name: --sample_sheet - description: Sample sheet + description: | + Sample sheet as input for BCL Convert. If not specified, + will try to autodetect the sample sheet in the input directory type: file - required: true + required: false - name: Output arguments arguments: - name: --output diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index 7548f1d..c532769 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -21,10 +21,18 @@ workflow run_wf { ) // Gather input files from folder | map {id, state -> + def newState = [:] + if (!state.sample_sheet) { + def sample_sheet = state.input.resolve("SampleSheet.csv") + assert (sample_sheet && sample_sheet.isFile()): "Could not find 'SampleSheet.csv' file in input directory." + newState["sample_sheet"] = sample_sheet + } + def interop_dir = state.input.resolve("InterOp") assert interop_dir.isDirectory(): "Expected InterOp directory to be present." - def newState = state + ["interop_dir": interop_dir] - [id, newState] + newState["interop_dir"] = interop_dir + def resultState = state + newState + [id, resultState] } // run bcl_convert diff --git a/src/demultiplex/test.nf b/src/demultiplex/test.nf index 4ece866..c46a438 100644 --- a/src/demultiplex/test.nf +++ b/src/demultiplex/test.nf @@ -7,7 +7,7 @@ workflow test_wf { [ // sample_sheet: resources_test.resolve("bcl_convert_samplesheet.csv"), // input: resources_test.resolve("iseq-DI/"), - sample_sheet: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/SampleSheet.csv", + //sample_sheet: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/SampleSheet.csv", input: "https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/200624_A00834_0183_BHMTFYDRXX.tar.gz", publish_dir: "output_dir/", ] From 82c32b367f702103fff3594b9ab5a2552c903991 Mon Sep 17 00:00:00 2001 From: DriesSchaumont <5946712+DriesSchaumont@users.noreply.github.com> Date: Wed, 8 May 2024 16:11:52 +0200 Subject: [PATCH 11/11] Fix InterOp statistics. --- src/demultiplex/config.vsh.yaml | 2 + src/demultiplex/integration_tests.sh | 2 +- src/demultiplex/main.nf | 19 ++++++++- src/io/interop_summary_to_csv/config.vsh.yaml | 41 +++++++++++++++++++ src/io/interop_summary_to_csv/script.sh | 10 +++++ 5 files changed, 71 insertions(+), 3 deletions(-) create mode 100644 src/io/interop_summary_to_csv/config.vsh.yaml create mode 100644 src/io/interop_summary_to_csv/script.sh diff --git a/src/demultiplex/config.vsh.yaml b/src/demultiplex/config.vsh.yaml index 893c3c9..1f005e1 100644 --- a/src/demultiplex/config.vsh.yaml +++ b/src/demultiplex/config.vsh.yaml @@ -47,6 +47,8 @@ dependencies: repository: local - name: dataflow/gather_fastqs_and_validate repository: local + - name: io/interop_summary_to_csv + repository: local - name: dataflow/combine_samples repository: local - name: bcl_convert diff --git a/src/demultiplex/integration_tests.sh b/src/demultiplex/integration_tests.sh index a69b85f..1042753 100755 --- a/src/demultiplex/integration_tests.sh +++ b/src/demultiplex/integration_tests.sh @@ -6,7 +6,7 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -viash ns build -q 'untar|demultiplex|gather_fastqs_and_validate|combine_samples' --setup cb +viash ns build --setup cb nextflow run . \ -main-script src/demultiplex/test.nf \ diff --git a/src/demultiplex/main.nf b/src/demultiplex/main.nf index c532769..4c401da 100644 --- a/src/demultiplex/main.nf +++ b/src/demultiplex/main.nf @@ -28,13 +28,24 @@ workflow run_wf { newState["sample_sheet"] = sample_sheet } + // Do not add InterOp to state because we generate the summary csv's in the next + // step based on the run dir, not the InterOp dir. def interop_dir = state.input.resolve("InterOp") assert interop_dir.isDirectory(): "Expected InterOp directory to be present." - newState["interop_dir"] = interop_dir + def resultState = state + newState [id, resultState] } + | interop_summary_to_csv.run( + fromState: [ + "input": "input", + ], + toState: [ + "interop_run_summary": "output_run_summary", + "interop_index_summary": "output_index_summary", + ] + ) // run bcl_convert | bcl_convert.run( fromState: [ @@ -95,7 +106,11 @@ workflow run_wf { | multiqc.run( fromState: {id, state -> [ - "input": [state.output_falco, state.interop_dir], + "input": [ + state.output_falco, + state.interop_run_summary.getParent(), + state.interop_index_summary.getParent() + ], "output_report": state.output_multiqc, "cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}', ] diff --git a/src/io/interop_summary_to_csv/config.vsh.yaml b/src/io/interop_summary_to_csv/config.vsh.yaml new file mode 100644 index 0000000..2d74808 --- /dev/null +++ b/src/io/interop_summary_to_csv/config.vsh.yaml @@ -0,0 +1,41 @@ +name: interop_summary_to_csv +namespace: io +argument_groups: + - name: Input arguments + arguments: + - name: --input + description: Sequencing run folder (*not* InterOp folder). + type: file + required: true + - name: Output arguments + arguments: + - name: --output_run_summary + type: file + direction: output + required: true + - name: --output_index_summary + type: file + direction: output + required: true +requirements: + - commands: ["summary", "index-summary"] +resources: + - type: bash_script + path: script.sh +engines: + - type: docker + image: debian:stable-slim + setup: + - type: apt + packages: + - procps + - wget + - type: docker + run: | + wget https://github.com/Illumina/interop/releases/download/v1.3.1/interop-1.3.1-Linux-GNU.tar.gz -O /tmp/interop.tar.gz && \ + tar -C /tmp/ --no-same-owner --no-same-permissions -xvf /tmp/interop.tar.gz && \ + mv /tmp/interop-1.3.1-Linux-GNU/bin/index-summary /tmp/interop-1.3.1-Linux-GNU/bin/summary /usr/local/bin/ + +runners: + - type: executable + - type: nextflow \ No newline at end of file diff --git a/src/io/interop_summary_to_csv/script.sh b/src/io/interop_summary_to_csv/script.sh new file mode 100644 index 0000000..e4a37fb --- /dev/null +++ b/src/io/interop_summary_to_csv/script.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +set -eo pipefail + +if [ ! -d "$par_input" ]; then + echo "Input directory does not exist or is not a directory" + exit 1 +fi +$(which summary) --csv=1 "$par_input" 1> "$par_output_run_summary" +$(which index-summary) --csv=1 "$par_input" 1> "$par_output_index_summary"