Skip to content

Commit

Permalink
Add bases2fastq demultiplexer (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
DriesSchaumont authored Dec 5, 2024
1 parent 5cb1323 commit 4376973
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 45 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# demultiplex v0.1.2

## Breaking changes

* `demultiplex` workflow: renamed `sample_sheet` argument to `run_information` (PR #24)

## New features

* Add support for `bases2fastq` demultiplexer (PR #24)

## Minor updates

* Add resource labels to workflows (PR #21).
Expand Down
2 changes: 2 additions & 0 deletions src/dataflow/combine_samples/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ argument_groups:
- name: --forward_input
type: file
required: true
multiple: true
- name: --reverse_input
type: file
required: false
multiple: true
- name: Output arguments
arguments:
- name: --output_forward
Expand Down
4 changes: 2 additions & 2 deletions src/dataflow/combine_samples/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ workflow run_wf {
| groupTuple(by: 0, sort: "hash")
| map {run_id, states ->
// Gather the following state for all samples
def forward_fastqs = states.collect{it.forward_input}
def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null}
def forward_fastqs = states.collect{it.forward_input}.flatten()
def reverse_fastqs = states.collect{it.reverse_input}.findAll{it != null}.flatten()

def resultState = [
"output_forward": forward_fastqs,
Expand Down
2 changes: 2 additions & 0 deletions src/dataflow/gather_fastqs_and_validate/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ argument_groups:
type: file
direction: output
required: true
multiple: true
- name: "--fastq_reverse"
type: file
direction: output
required: false
multiple: true
resources:
- type: nextflow_script
path: main.nf
Expand Down
48 changes: 33 additions & 15 deletions src/dataflow/gather_fastqs_and_validate/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,52 @@ workflow run_wf {
def original_id = id

// Parse sample sheet for sample IDs
println "Processing run information file ${sample_sheet}"
csv_lines = sample_sheet.splitCsv(header: false, sep: ',')
csv_lines.any { csv_items ->
if (csv_items.isEmpty()) {
// skip empty line
return
}
def possible_header = csv_items[0]
def header = possible_header.find(/\[(.*)\]/){fullmatch, header_name -> header_name}
if (header) {
if (start_parsing) {
// Stop parsing when encountering the next header
println "Encountered next header '[${start_parsing}]', stopping parsing."
return true
}
if (header == "Data") {
// [Data] for illumina
// [Samples] for Element Biosciences
if (header in ["Data", "Samples"]) {
println "Found header [${header}], start parsing."
start_parsing = true
return
}
}
if (start_parsing) {
if ( !sample_id_column_index ) {
sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID"}
assert sample_id_column_index != -1:
"Could not find column 'Sample_ID' in sample sheet!"
if ( sample_id_column_index == null) {
println "Looking for sample name column."
sample_id_column_index = csv_items.findIndexValues{it == "Sample_ID" || it == "SampleName"}
assert (!sample_id_column_index.isEmpty()):
"Could not find column 'Sample_ID' (Illumina) or 'SampleName' " +
"(Element Biosciences) in run information! Found: ${sample_id_column_index}"
assert sample_id_column_index.size() == 1, "Expected run information file to contain " +
"a column 'Sample_ID' or 'SampleName', not both. Found: ${sample_id_column_index}"
sample_id_column_index = sample_id_column_index[0]
println "Found sample names column '${csv_items[sample_id_column_index]}'."
return
}
samples += csv_items[sample_id_column_index]
}
// This return is important! (If 'true' is returned, the parsing stops.)
return
}
assert start_parsing:
"Sample information file does not contain [Data] or [Samples] header!"
assert samples.size() > 1:
"Sample information file does not seem to contain any information about the samples!"
println "Finished processing run information file, found samples: ${samples}."
println "Looking for fastq files in ${state.input}."
def allfastqs = state.input.listFiles().findAll{it.isFile() && it.name ==~ /^.+\.fastq.gz$/}
println "Found ${allfastqs.size()} fastq files, matching them to the following samples: ${samples}."
Expand All @@ -48,17 +68,15 @@ workflow run_wf {
def reverse_regex = ~/^${sample_id}_S(\d+)_(L(\d+)_)?R2_(\d+)\.fastq\.gz$/
def forward_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ forward_regex}
def reverse_fastq = state.input.listFiles().findAll{it.isFile() && it.name ==~ reverse_regex}
assert forward_fastq : "No forward fastq files were found for sample ${sample_id}"
assert forward_fastq.size() < 2:
"Found multiple forward fastq files corresponding to sample ${sample_id}: ${forward_fastq}"
assert reverse_fastq.size() < 2:
"Found multiple reverse fastq files corresponding to sample ${sample_id}: ${reverse_fastq}."
assert !forward_fastq.isEmpty():
"Expected a forward fastq file to have been created correspondig to sample ${sample_id}."
// TODO: if one sample had reverse reads, the others must as well.
reverse_fastq = !reverse_fastq.isEmpty() ? reverse_fastq[0] : null
assert forward_fastq && !forward_fastq.isEmpty(): "No forward fastq files were found for sample ${sample_id}. " +
"All fastq files in directory: ${allfastqs.collect{it.name}}"
assert (reverse_fastq.isEmpty() || (forward_fastq.size() == reverse_fastq.size())):
"Expected equal number of forward and reverse fastq files for sample ${sample_id}. " +
"Found forward: ${forward_fastq} and reverse: ${reverse_fastq}."
println "Found ${forward_fastq.size()} forward and ${reverse_fastq.size()} reverse " +
"fastq files for sample ${sample_id}"
def fastqs_state = [
"fastq_forward": forward_fastq[0],
"fastq_forward": forward_fastq,
"fastq_reverse": reverse_fastq,
"_meta": [ "join_id": original_id ],
]
Expand Down
27 changes: 22 additions & 5 deletions src/demultiplex/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,24 @@ argument_groups:
description: Directory containing raw sequencing data
type: file
required: true
- name: --sample_sheet
- name: --run_information
description: |
Sample sheet as input for BCL Convert. If not specified,
will try to autodetect the sample sheet in the input directory
CSV file containing sample information, which will be used as
input for the demultiplexer. Canonically called 'SampleSheet.csv' (Illumina)
or 'RunManifest.csv' (Element Biosciences). If not specified,
will try to autodetect the sample sheet in the input directory.
Requires --demultiplexer to be set.
type: file
required: false
- name: "--demultiplexer"
type: string
required: false
choices: ["bases2fastq", "bclconvert"]
description: |
Demultiplexer to use, choice depends on the provider
of the instrument that was used to generate the data.
When not using --sample_sheet, specifying this argument is not
required.
- name: Output arguments
arguments:
- name: --output
Expand Down Expand Up @@ -40,7 +52,10 @@ resources:
test_resources:
- type: nextflow_script
path: test.nf
entrypoint: test_wf
entrypoint: test_illumina
- type: nextflow_script
path: test.nf
entrypoint: test_bases2fastq

dependencies:
- name: io/untar
Expand All @@ -53,6 +68,8 @@ dependencies:
repository: local
- name: bcl_convert
repository: bb
- name: bases2fastq
repository: bb
- name: falco
repository: bb
- name: multiqc
Expand All @@ -61,7 +78,7 @@ repositories:
- name: bb
type: vsh
repo: biobox
tag: v0.2.0
tag: v0.3.0

runners:
- type: nextflow
Expand Down
9 changes: 8 additions & 1 deletion src/demultiplex/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ viash ns build --setup cb
nextflow run . \
-main-script src/demultiplex/test.nf \
-profile docker,no_publish,local \
-entry test_wf \
-entry test_illumina \
-c src/config/labels.config \
--resources_test https://raw.githubusercontent.com/nf-core/test-datasets/demultiplex/testdata/NovaSeq6000/ \
-resume

nextflow run . \
-main-script src/demultiplex/test.nf \
-profile docker,no_publish,local \
-entry test_bases2fastq \
-c src/config/labels.config \
-resume
126 changes: 105 additions & 21 deletions src/demultiplex/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,79 @@ workflow run_wf {
// Gather input files from folder
| map {id, state ->
def newState = [:]
if (!state.sample_sheet) {
def sample_sheet = state.input.resolve("SampleSheet.csv")
assert (sample_sheet && sample_sheet.isFile()): "Could not find 'SampleSheet.csv' file in input directory."
newState["sample_sheet"] = sample_sheet
println("Provided run information: ${state.run_information} and demultiplexer: ${state.demultiplexer}")
if (!state.run_information) {
println("Run information was not specified, auto-detecting...")
// The supported_platforms hashmap must be a 1-on-1 mapping
// Also, it's keys must be present in the 'choices' field
// for the 'run_information' argument in the viash config.
def supported_platforms = [
"bclconvert": "SampleSheet.csv", // Illumina
"bases2fastq": "RunManifest.csv" // Element Biosciences
]
def found_sample_information = supported_platforms.collectEntries{demultiplexer, filename ->
println("Checking if ${filename} can be found in input folder ${state.input}.")
def resolved_filename = state.input.resolve(filename)
if (!resolved_filename.isFile()) {
resolved_filename = null
}
println("Result after looking for run information for ${demultiplexer}: ${resolved_filename}.")
[demultiplexer, resolved_filename]
}
def demultiplexer = null
def run_information = null
found_sample_information.each{demultiplexer_candidate, file_path ->
if (file_path) {
// At this point, a candicate run information file was found.
assert !run_information: "Autodetection of run information " +
"(SampleSheet, RunManifest) failed: " +
"multiple candidate files found in input folder. " +
"Please specify one using --run_information."
run_information = file_path
demultiplexer = demultiplexer_candidate
}
}

// When autodetecting, the run information should have been found
assert run_information: "No run information file (SampleSheet, RunManifest) " +
"found in input directory."

// When autodetecting, the demultiplexer must be set if the run information was found
assert demultiplexer, "State error: the demultiplexer should have been autodetected. " +
"Please report this as a bug."

// When autodetecting, the found demultiplexer must match
// with the demultiplexer that the user has provided (in case it was provided).
if (state.demultiplexer) {
assert state.demultiplexer == demultiplexer,
"Requested to use demultiplexer ${state.demultiplexer} " +
"but demultiplexer based on the autodetected run information "
"file ${run_information} seems to indicate that the demultiplexer "
"should be ${demultiplexer}. Either avoid specifying the demultiplexer "
"or override the autodetection of the run information by providing "
"the file."
}
println("Using run information ${run_information} and demultiplexer ${demultiplexer}")
// At this point, the autodetected state can override the user provided state.
newState = newState + [
"run_information": run_information,
"demultiplexer": demultiplexer,
]
}

// Do not add InterOp to state because we generate the summary csv's in the next
// step based on the run dir, not the InterOp dir.
def interop_dir = state.input.resolve("InterOp")
assert interop_dir.isDirectory(): "Expected InterOp directory to be present."
if (newState.demultiplexer in ["bclconvert"]) {
// Do not add InterOp to state because we generate the summary csv's in the next
// step based on the run dir, not the InterOp dir.
def interop_dir = state.input.resolve("InterOp")
assert interop_dir.isDirectory(): "Expected InterOp directory to be present."
}

def resultState = state + newState
[id, resultState]
}

| interop_summary_to_csv.run(
runIf: {id, state -> state.demultiplexer in ["bclconvert"]},
directives: [label: ["lowmem", "verylowcpu"]],
fromState: [
"input": "input",
Expand All @@ -50,16 +107,40 @@ workflow run_wf {
)
// run bcl_convert
| bcl_convert.run(
runIf: {id, state -> state.demultiplexer in ["bclconvert"]},
directives: [label: ["highmem", "midcpu"]],
fromState: [
"bcl_input_directory": "input",
"sample_sheet": "sample_sheet",
"sample_sheet": "run_information",
"output_directory": "output",
],
toState: {id, result, state ->
def toAdd = [
"output_bclconvert" : result.output_directory,
"bclconvert_reports": result.reports,
"output_demultiplexer" : result.output_directory,
"run_id": id,
]
def newState = state + toAdd
return newState
}
)
// run bases2fastq
| bases2fastq.run(
runIf: {id, state -> state.demultiplexer in ["bases2fastq"]},
directives: [label: ["highmem", "midcpu"]],
fromState: [
"analysis_directory": "input",
"run_manifest": "run_information",
"output_directory": "output",
],
args: [
"no_projects": true, // Do not put output files in a subfolder for project
//"split_lanes": true,
"legacy_fastq": true, // Illumina style output names
"group_fastq": true, // No subdir per sample
],
toState: {id, result, state ->
def toAdd = [
"output_demultiplexer" : result.output_directory,
"run_id": id,
]
def newState = state + toAdd
Expand All @@ -68,8 +149,8 @@ workflow run_wf {
)
| gather_fastqs_and_validate.run(
fromState: [
"input": "output_bclconvert",
"sample_sheet": "sample_sheet",
"input": "output_demultiplexer",
"sample_sheet": "run_information",
],
toState: [
"fastq_forward": "fastq_forward",
Expand Down Expand Up @@ -110,23 +191,26 @@ workflow run_wf {
| multiqc.run(
directives: [label: ["lowcpu", "lowmem"]],
fromState: {id, state ->
[
"input": [
state.output_falco,
state.interop_run_summary.getParent(),
state.interop_index_summary.getParent()
],
def new_state = [
"input": [state.output_falco],
"output_report": state.output_multiqc,
"cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}',
"cl_config": 'sp: {fastqc/data: {fn: "*_fastqc_data.txt"}}'
]
if (state.demultiplexer == "bclconvert") {
new_state["input"] += [
state.interop_run_summary.getParent(),
state.interop_index_summary.getParent()
]
}
return new_state
},
toState: { id, result, state ->
state + [ "output_multiqc" : result.output_report ]
},
)
| setState(
[
"output": "output_bclconvert",
"output": "output_demultiplexer",
"output_falco": "output_falco",
"output_multiqc": "output_multiqc"
]
Expand Down
Loading

0 comments on commit 4376973

Please sign in to comment.