Skip to content

Commit

Permalink
FEAT: add csv2fasta (#1)
Browse files Browse the repository at this point in the history
* FEAT: add csv2fasta

* Update PR number
  • Loading branch information
DriesSchaumont authored Jun 24, 2024
1 parent 9c426cc commit 85a999c
Show file tree
Hide file tree
Showing 4 changed files with 574 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# craftbox x.x.x

## NEW FEATURES

* `csv2fasta`: Convert two columns from a CSV file to FASTA entries (PR #1).
102 changes: 102 additions & 0 deletions src/csv2fasta/config.vsh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
name: csv2fasta
description: |
Convert two columns from a CSV file to FASTA entries. The CSV file can
contain an optional header and each row (other than the header) becomes
a single FASTA record. One of the two columns will be used as the names
for the FASTA entries, while the other become the sequences. The sequences
column must only contain characters that are valid IUPAC notation for
nucleotides or a group thereof (wildcard characters).
argument_groups:
- name: Inputs
arguments:
- name: --input
type: file
direction: input
example: barcodes.csv
description: CSV file to be processed.
required: true
- name: --header
type: boolean_true
description: |
Parse the first line of the CSV file as a header.
- name: "CSV dialect options"
description: |
Options that can be used to override the automatically detected
dialect of the CSV file.
arguments:
- name: --delimiter
type: string
description: |
Overwrite the column delimiter character.
- name: --quote_character
type: string
description: |
Overwrite the character used to denote the start and end of a quoted item.
- name: "CSV column arguments"
description: |
Parameters for the selection of columns from the CSV file.
Only required when your CSV file contains more than 2 columns,
otherwise the first column will be used for the FASTA header
and the second for the FASTA nucleotide sequences. This default
can still be overwritten by using the options below.
arguments:
- name: --sequence_column
type: string
description: |
Name of the column containing the sequences. Implies 'header'.
Cannot be used together with 'sequence_column_index'.
required: false
- name: "--name_column"
type: string
description: |
Name of the column describing the FASTA headers. Implies 'header'.
Cannot be used together with 'name_column_index'.
required: false
- name: "--sequence_column_index"
type: integer
min: 0
description: |
Index of the column to use as the FASTA sequences, counter from the left and
starting from 0. Cannot be used in combination with the 'sequence_column' argument.
required: false
- name: "--name_column_index"
type: integer
min: 0
description: |
Index of the column to use as the FASTA headers, counter from the left and
starting from 0. Cannot be used in combination with 'name_column'.
required: false
- name: Outputs
arguments:
- name: "--output"
type: file
example: barcodes.fasta
direction: output
description: Output fasta file.

resources:
- type: python_script
path: script.py
test_resources:
- type: python_script
path: test_csv2fasta.py

engines:
- type: docker
image: python:slim
setup:
- type: apt
packages:
- procps
- type: python
packages:
- dnaio
test_setup:
- type: python
packages:
- pytest
- viashpy

runners:
- type: executable
- type: nextflow
102 changes: 102 additions & 0 deletions src/csv2fasta/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from pathlib import Path
import dnaio
import csv

## VIASH START
par = {

}
## VIASH END

iupac = frozenset("ABCDGHKMNRSTUVWXY")

def resolve_header_name_to_index(header_entries, column_name):
try:
return header_entries.index(column_name)
except ValueError as e:
raise ValueError(f"Column name '{column_name}' could not "
"be found in the header of the CSV file.") from e


def csv_records(csv_file, delimiter, quote_character,
header, sequence_column, name_column,
sequence_column_index, name_column_index):
with open(csv_file, newline='') as csvfile:
# Deduce CSV dialect based on first 5 lines.
hint = "\n".join([csvfile.readline() for _ in range(5)])
csvfile.seek(0)
dialect = csv.Sniffer().sniff(hint)
reader_args = {"dialect": dialect}
delimiter_arg = {"delimiter": delimiter} if delimiter else {}
quotechar_arg = {"quotechar": quote_character} if delimiter else {}
all_args = reader_args | delimiter_arg | quotechar_arg
csv_reader = csv.reader(csvfile, **all_args)
for linenum, line in enumerate(csv_reader):
if not linenum: # First row
num_columns = len(line)
if header:
if sequence_column:
sequence_column_index = resolve_header_name_to_index(line, sequence_column)
if name_column:
name_column_index = resolve_header_name_to_index(line, name_column)
continue
if not (linenum - header): # First 'data' line
if (not sequence_column_index and not name_column_index and len(line) == 2):
name_column_index, sequence_column_index = 0, 1
if sequence_column_index == name_column_index:
raise ValueError("The same columns were selected for both the FASTQ sequences and "
"headers.")
if sequence_column_index is None:
raise ValueError("Either 'sequence_column_index' or 'sequence_column' needs "
"to be specified.")
if name_column_index is None:
raise ValueError("Either 'name_column' or 'name_column_index' needs to "
"be specified.")
if name_column_index >= num_columns:
raise ValueError(f"Requested to use column number {name_column_index} "
f"(0 based) for the FASTA headers, but only {num_columns} "
"were found on the first line.")
if sequence_column_index >= num_columns:
raise ValueError(f"Requested to use column number {sequence_column_index} "
f"(0 based) for the FASTA sequences, but only {num_columns} "
"were found on the first line.")
if len(line) != num_columns:
raise ValueError(f"Number of columns ({len(line)}) found on line {linenum+1} "
"is different compared to number of columns found "
f"previously ({num_columns}).")
sequence_name, sequence = line[name_column_index], line[sequence_column_index]
invalid_characters = set(sequence.upper()) - iupac
if set(sequence.upper()) - iupac:
raise ValueError(f"The sequence ('{sequence}') found on line {linenum+1} "
f"contains characters ({','.join(invalid_characters)}) "
"which are not valid IUPAC identifiers for nucleotides.")
yield sequence_name, sequence


def main(par):
par['input'], par['output'] = Path(par['input']), Path(par['output'])
sequence_column, name_column = par['sequence_column'], par['name_column']
sequence_column_index, name_column_index = par['sequence_column_index'], par['name_column_index']
if (sequence_column or name_column) and not par['header']:
par["header"] = True
if sequence_column_index and sequence_column:
raise ValueError("Cannot specify both 'sequence_column_index' and 'sequence_column'")
if name_column and name_column_index:
raise ValueError("Cannot specify both 'name_column_index' and 'name_column'")
if (sequence_column_index or name_column_index) and \
(sequence_column_index == name_column_index):
raise ValueError("The value specified for 'sequence_column_index' cannot be the same as "
"the value for 'name_column_index'.")
with dnaio.open(par['output'], mode='w', fileformat="fasta") as writer:
for header, sequence in csv_records(par['input'],
par['delimiter'],
par['quote_character'],
par['header'],
sequence_column,
name_column,
sequence_column_index,
name_column_index):
writer.write(dnaio.SequenceRecord(header, sequence))

if __name__ == "__main__":
main(par)
Loading

0 comments on commit 85a999c

Please sign in to comment.