Skip to content

Commit

Permalink
Merge pull request #81 from compomics/add-parquet
Browse files Browse the repository at this point in the history
Add Parquet reading and writing for efficient storage of PSM lists
  • Loading branch information
RalfG authored May 1, 2024
2 parents 2973b67 + 6a8b51f commit dfada94
Show file tree
Hide file tree
Showing 16 changed files with 426 additions and 132 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.9.0] - 2024-05-01

### Added

- `io`: Read and write support for writing PSMs to Apache Parquet for efficient storage of PSM lists.
- `io.sage`: Support for Sage results in Parquet format (new `SageParquetReader`, renamed `SageReader` to `SageTSVReader`).

### Changed

- Upgrade Pydantic dependency to v2. The PSM `spectrum_id` field is now always coerced to a string.
- `io.proteoscape`: Use pyarrow to iteratively read from Parquet instead of first reading an entire dataframe with Pandas.
- `io.sage`: Update compatibility to Sage v0.14

## [0.8.3] - 2024-04-16

### Added
Expand Down
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ Supported file formats
`MaxQuant msms.txt <https://www.maxquant.org/>`_ ``msms`` ✅ ❌
`MS Amanda CSV <https://ms.imp.ac.at/?goto=msamanda>`_ ``msamanda`` ✅ ❌
`mzIdentML <https://psidev.info/mzidentml>`_ ``mzid`` ✅ ✅
`Parquet <https://psm-utils.readthedocs.io/en/stable/api/psm_utils.io#module-psm_utils.io.parquet>` ``parquet`` ✅ ✅
`Peptide Record <https://psm-utils.readthedocs.io/en/stable/api/psm_utils.io/#module-psm_utils.io.peptide_record>`_ ``peprec`` ✅ ✅
`pepXML <http://tools.proteomecenter.org/wiki/index.php?title=Formats:pepXML>`_ ``pepxml`` ✅ ❌
`Percolator tab <https://github.com/percolator/percolator/wiki/Interface>`_ ``percolator`` ✅ ✅
Proteome Discoverer MSF ``proteome_discoverer`` ✅ ❌
`Sage <https://github.com/lazear/sage/blob/v0.12.0/DOCS.md#interpreting-sage-output>`_ ``sage`` ✅ ❌
`Sage Parquet <https://github.com/lazear/sage/blob/v0.14.7/DOCS.md#interpreting-sage-output>`_ ``sage_parquet`` ✅ ❌
`Sage TSV <https://github.com/lazear/sage/blob/v0.14.7/DOCS.md#interpreting-sage-output>`_ ``sage_tsv`` ✅ ❌
ProteoScape Parquet ``proteoscape`` ✅ ❌
`TSV <https://psm-utils.readthedocs.io/en/stable/api/psm_utils.io/#module-psm_utils.io.tsv>`_ ``tsv`` ✅ ✅
`X!Tandem XML <https://www.thegpm.org/tandem/>`_ ``xtandem`` ✅ ❌
Expand Down
2 changes: 1 addition & 1 deletion psm_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Common utilities for parsing and handling PSMs, and search engine results."""

__version__ = "0.8.3"
__version__ = "0.9.0"
__all__ = ["Peptidoform", "PSM", "PSMList"]

from functools import lru_cache
Expand Down
36 changes: 26 additions & 10 deletions psm_utils/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import psm_utils.io.maxquant as maxquant
import psm_utils.io.msamanda as msamanda
import psm_utils.io.mzid as mzid
import psm_utils.io.parquet as parquet
import psm_utils.io.peptide_record as peptide_record
import psm_utils.io.pepxml as pepxml
import psm_utils.io.percolator as percolator
Expand Down Expand Up @@ -75,12 +76,6 @@
"extension": ".parquet",
"filename_pattern": r"^.*\.candidates\.parquet$",
},
"tsv": {
"reader": tsv.TSVReader,
"writer": tsv.TSVWriter,
"extension": ".tsv",
"filename_pattern": r"^.*\.tsv$",
},
"xtandem": {
"reader": xtandem.XTandemReader,
"writer": None,
Expand All @@ -93,19 +88,40 @@
"extension": ".csv",
"filename_pattern": r"^.*(?:_|\.)msamanda.csv$",
},
"sage": {
"reader": sage.SageReader,
"sage_tsv": {
"reader": sage.SageTSVReader,
"writer": None,
"extension": ".tsv",
"filename_pattern": r"^.*(?:_|\.).sage.tsv$",
},
"sage_parquet": {
"reader": sage.SageParquetReader,
"writer": None,
"extension": ".parquet",
"filename_pattern": r"^.*(?:_|\.).sage.parquet$",
},
"ionbot": {
"reader": ionbot.IonbotReader,
"writer": None,
"extension": "ionbot.first.csv",
"filename_pattern": r"^ionbot.first.csv$",
},
"parquet": { # List after proteoscape and sage to avoid extension matching conflicts
"reader": parquet.ParquetReader,
"writer": parquet.ParquetWriter,
"extension": ".parquet",
"filename_pattern": r"^.*\.parquet$",
},
"tsv": { # List after sage to avoid extension matching conflicts
"reader": tsv.TSVReader,
"writer": tsv.TSVWriter,
"extension": ".tsv",
"filename_pattern": r"^.*\.tsv$",
},
}

FILETYPES["sage"] = FILETYPES["sage_tsv"] # Alias for backwards compatibility

READERS = {k: v["reader"] for k, v in FILETYPES.items() if v["reader"]}
WRITERS = {k: v["writer"] for k, v in FILETYPES.items() if v["writer"]}

Expand All @@ -124,10 +140,10 @@ def _supports_write_psm(writer: WriterBase):
with NamedTemporaryFile(delete=False) as temp_file:
temp_file.close()
Path(temp_file.name).unlink()
example_psm = PSM(peptidoform="ACDE", spectrum_id=0)
example_psm = PSM(peptidoform="ACDE", spectrum_id="0")
try:
with writer(temp_file.name, example_psm=example_psm) as writer_instance:
writer_instance.write_psm(None)
writer_instance.write_psm(example_psm)
except NotImplementedError:
supports_write_psm = False
except AttributeError: # `None` is not valid PSM
Expand Down
1 change: 0 additions & 1 deletion psm_utils/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ def set_csv_field_size_limit():
This function should be called before reading any CSV files to ensure that the field size
limit is properly set.
"""
max_int = sys.maxsize

Expand Down
130 changes: 130 additions & 0 deletions psm_utils/io/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
Reader and writer for a simple, lossless psm_utils Parquet format.
Similar to the :py:mod:`psm_utils.io.tsv` module, this module provides a reader and writer
for :py:class:`~psm_utils.psm_list.PSMList` objects in a lossless manner. However, Parquet provides
better performance and storage efficiency compared to TSV, and is recommended for large datasets.
"""

from __future__ import annotations

from pathlib import Path
from typing import Union

import pyarrow as pa
import pyarrow.parquet as pq
from pydantic import ValidationError

from psm_utils.io._base_classes import ReaderBase, WriterBase
from psm_utils.io.exceptions import PSMUtilsIOException
from psm_utils.psm import PSM
from psm_utils.psm_list import PSMList


class ParquetReader(ReaderBase):
def __init__(self, path: Union[str, Path], *args, **kwargs):
"""
Reader for Parquet files.
Parameters
----------
path : Union[str, Path]
Path to the Parquet file.
"""
self.path = path

def __iter__(self):
with pq.ParquetFile(self.path) as reader:
for batch in reader.iter_batches():
for row in batch.to_pylist():
# Convert map columns (rendered as lists of tuples) to dictionaries
row["metadata"] = dict(row["metadata"] or {})
row["provenance_data"] = dict(row["provenance_data"] or {})
row["rescoring_features"] = dict(row["rescoring_features"] or {})

# Convert to PSM object and yield
try:
yield PSM(**row)
except ValidationError as e:
raise PSMUtilsIOException(f"Error while parsing row {row}:\n{e}")


class ParquetWriter(WriterBase):
def __init__(self, path: Union[str, Path], chunk_size: int = 1e6, *args, **kwargs):
"""
Writer for Parquet files.
Parameters
----------
path : Union[str, Path]
Path to the Parquet file.
chunk_size : int
Number of PSMs to write in a single batch. Default is 1e6.
"""
self.path = path
self.chunk_size = chunk_size

self._writer = None
self._psm_cache = []

def __enter__(self):
self._writer = pq.ParquetWriter(self.path, schema=SCHEMA)
return self

def __exit__(self, *args, **kwargs):
self._flush()
self._writer.close()

def write_psm(self, psm: PSM):
"""Write a single PSM to the Parquet file."""
self._psm_cache.append(self._psm_to_entry(psm))
if len(self._psm_cache) > self.chunk_size:
self._flush()

def write_file(self, psm_list: PSMList):
"""Write a list of PSMs to the Parquet file."""
with self:
for psm in psm_list:
self.write_psm(psm)

@staticmethod
def _psm_to_entry(psm: PSM) -> dict:
"""Convert a PSM object to a dictionary suitable for writing to Parquet."""
psm_dict = dict(psm)
psm_dict["peptidoform"] = str(psm.peptidoform)
return psm_dict

def _flush(self):
"""Write the cached PSMs to the Parquet file."""
if not self._psm_cache:
return
table = pa.Table.from_pylist(self._psm_cache, schema=SCHEMA)
self._writer.write_table(table)
self._psm_cache = []


SCHEMA = pa.schema(
[
("peptidoform", pa.string()),
("spectrum_id", pa.string()),
("run", pa.string()),
("collection", pa.string()),
("spectrum", pa.string()),
("is_decoy", pa.bool_()),
("score", pa.float32()),
("qvalue", pa.float32()),
("pep", pa.float32()),
("precursor_mz", pa.float32()),
("retention_time", pa.float32()),
("ion_mobility", pa.float32()),
("protein_list", pa.list_(pa.string())),
("rank", pa.int32()),
("source", pa.string()),
("provenance_data", pa.map_(pa.string(), pa.string())),
("metadata", pa.map_(pa.string(), pa.string())),
("rescoring_features", pa.map_(pa.string(), pa.float32())),
]
)
93 changes: 49 additions & 44 deletions psm_utils/io/proteoscape.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import re
from pathlib import Path
from typing import Union
from collections import namedtuple

import numpy as np
import pandas as pd
import pyarrow.parquet as pq

from psm_utils import PSM
from psm_utils.psm import PSM
from psm_utils.psm_list import PSMList
from psm_utils.io._base_classes import ReaderBase
from psm_utils.io.exceptions import PSMUtilsIOException
from psm_utils.peptidoform import format_number_as_string

logger = logging.getLogger(__name__)
Expand All @@ -36,31 +38,31 @@ def __init__(
Path to MSF file.
"""
if isinstance(filename, pd.DataFrame):
self.data = filename
else:
super().__init__(filename, *args, **kwargs)
self.data = pd.read_parquet(self.filename)

self._Row = namedtuple("Row", self.data.columns)
self.filename = filename

def __len__(self):
"""Return number of PSMs in file."""
return len(self.data)
return pq.read_metadata(self.filename).num_rows

def __iter__(self):
"""Iterate over file and return PSMs one-by-one."""
for entry in self.data.itertuples():
yield _parse_entry(entry)

def __getitem__(self, index):
"""Return PSM at index."""
return _parse_entry(self._Row(*self.data.iloc[index]))
with pq.ParquetFile(self.filename) as reader:
for batch in reader.iter_batches():
for row in batch.to_pylist():
try:
yield _parse_entry(row)
except Exception as e:
raise PSMUtilsIOException(f"Error while parsing row {row}:\n{e}") from e

@classmethod
def from_dataframe(cls, dataframe: pd.DataFrame, *args, **kwargs):
"""Create a ProteoScapeReader from a DataFrame."""
return cls(dataframe, *args, **kwargs)
def from_dataframe(cls, dataframe: pd.DataFrame) -> PSMList:
"""Create a PSMList from a ProteoScape Pandas DataFrame."""
return PSMList(
psm_list=[
cls._get_peptide_spectrum_match(cls(""), entry)
for entry in dataframe.to_dict(orient="records")
]
)


def _parse_peptidoform(
Expand All @@ -81,40 +83,43 @@ def _parse_peptidoform(
return f"{n_term}{''.join(peptidoform)}{c_term}/{precursor_charge}"


def _parse_entry(entry) -> PSM:
def _parse_entry(entry: dict) -> PSM:
"""Parse a single entry from ProteoScape Parquet file to PSM object."""
return PSM(
peptidoform=_parse_peptidoform(
entry.stripped_peptide, entry.ptms, entry.ptm_locations, entry.precursor_charge
entry["stripped_peptide"],
entry["ptms"],
entry["ptm_locations"],
entry["precursor_charge"],
),
spectrum_id=entry.ms2_id,
run=getattr(entry, "run", None),
is_decoy=all(DECOY_PATTERN.match(p) for p in entry.locus_name),
score=entry.x_corr_score,
precursor_mz=entry.precursor_mz,
retention_time=entry.rt,
ion_mobility=entry.ook0,
protein_list=list(entry.locus_name),
rank=entry.rank,
spectrum_id=entry["ms2_id"],
run=entry.get("run", None),
is_decoy=all(DECOY_PATTERN.match(p) for p in entry["locus_name"]),
score=entry["x_corr_score"],
precursor_mz=entry["precursor_mz"],
retention_time=entry["rt"],
ion_mobility=entry["ook0"],
protein_list=list(entry["locus_name"]),
rank=entry["rank"],
source="ProteoScape",
provenance_data={
"candidate_id": str(entry.candidate_id),
"ms2_id": str(entry.ms2_id),
"parent_id": str(entry.parent_id),
"candidate_id": str(entry["candidate_id"]),
"ms2_id": str(entry["ms2_id"]),
"parent_id": str(entry["parent_id"]),
},
metadata={
"leading_aa": str(entry.leading_aa),
"trailing_aa": str(entry.trailing_aa),
"corrected_ook0": str(entry.corrected_ook0),
"leading_aa": str(entry["leading_aa"]),
"trailing_aa": str(entry["trailing_aa"]),
"corrected_ook0": str(entry["corrected_ook0"]),
},
rescoring_features={
"tims_score": float(entry.tims_score),
"x_corr_score": float(entry.x_corr_score),
"delta_cn_score": float(entry.delta_cn_score),
"ppm_error": float(entry.ppm_error),
"number_matched_ions": float(entry.number_matched_ions),
"number_expected_ions": float(entry.number_expected_ions),
"ion_proportion": float(entry.ion_proportion),
"spectrum_total_ion_intensity": float(entry.spectrum_total_ion_intensity),
"tims_score": float(entry["tims_score"]),
"x_corr_score": float(entry["x_corr_score"]),
"delta_cn_score": float(entry["delta_cn_score"]),
"ppm_error": float(entry["ppm_error"]),
"number_matched_ions": float(entry["number_matched_ions"]),
"number_expected_ions": float(entry["number_expected_ions"]),
"ion_proportion": float(entry["ion_proportion"]),
"spectrum_total_ion_intensity": float(entry["spectrum_total_ion_intensity"]),
},
)
Loading

0 comments on commit dfada94

Please sign in to comment.