Skip to content

Commit

Permalink
Add fasta import, primitive peptide - protein sequence alignment, pre…
Browse files Browse the repository at this point in the history
…liminary coverage bar plot
  • Loading branch information
henninggaertner committed Nov 14, 2024
1 parent a2d0bf6 commit 51dff94
Show file tree
Hide file tree
Showing 8 changed files with 407 additions and 83 deletions.
70 changes: 70 additions & 0 deletions protzilla/data_analysis/protein_coverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import List

import pandas as pd
import plotly.graph_objects as go


def align_peptide_to_protein_sequence(
peptide_sequence: str, protein_sequence: str
) -> List[int]:
"""
Aligns a peptide to a protein sequence and returns the indices of the protein sequence where the peptide.
NAIVE APPROACH
"""
if (
len(peptide_sequence) == 0
or len(protein_sequence) == 0
or len(peptide_sequence) > len(protein_sequence)
):
return []
indices = []
for i in range(len(protein_sequence) - len(peptide_sequence) + 1):
if protein_sequence[i : i + len(peptide_sequence)] == peptide_sequence:
indices.append(i)
return indices


def plot_protein_coverage(
fasta_df: pd.DataFrame, peptide_df: pd.DataFrame, protein_id: str
) -> None:
"""
Plots the coverage of a protein sequence by peptides.
"""
# Retrieve the relevant peptides from the peptide df
relevant_peptides = peptide_df[peptide_df["Protein ID"] == protein_id]
protein_sequence = fasta_df[fasta_df["Protein ID"] == protein_id][
"Protein Sequence"
].values[0]
protein_sequence_length = len(protein_sequence)
coverage = [0] * protein_sequence_length
for peptide_sequence in relevant_peptides["Sequence"].unique():
indices = align_peptide_to_protein_sequence(peptide_sequence, protein_sequence)
for i in indices:
for j in range(len(peptide_sequence)):
coverage[i + j] += 1

# Make sure coverage length matches the sequence length
if len(coverage) != protein_sequence_length:
raise ValueError("Length of coverage must match protein_sequence_length.")

# Generate labels for amino acid positions
amino_acid_positions = list(range(1, protein_sequence_length + 1))
x_labels = [f"{i} ({protein_sequence[i - 1]})" for i in amino_acid_positions]

# Create the bar chart
fig = go.Figure()

# Add bars for coverage values
fig.add_trace(
go.Bar(x=x_labels, y=coverage, name="Coverage", marker=dict(color="skyblue"))
)

# Customize layout
fig.update_layout(
title="Protein Coverage",
xaxis_title="Amino Acid",
yaxis_title="Coverage Value",
xaxis=dict(tickmode="linear"),
bargap=0.1, # Adjust space between bars if needed
)
return dict(plots=[fig])
36 changes: 36 additions & 0 deletions protzilla/importing/fasta_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
This module contains the code to parse a fasta file containing protein sequences and their ids.
"""
import logging

import pandas as pd
from Bio import SeqIO


def parse_fasta_id(fasta_id: str) -> str:
"""
Parse the fasta id to get the protein name from the fasta id string
"""
metadata = fasta_id.split("|")[1]
if len(metadata) < 2:
logging.warning(f"Metadata too short: {metadata}")
return ""
return metadata


def fasta_import(file_path: str) -> pd.DataFrame:
"""
Import a fasta file and return a DataFrame with the protein sequences and their protein ids
"""
fasta_iterator = SeqIO.parse(open(file_path), "fasta")
protein_ids = []
protein_sequences = []
for fasta_sequence in fasta_iterator:
id, sequence = parse_fasta_id(fasta_sequence.id), str(fasta_sequence.seq)
protein_ids.append(id)
protein_sequences.append(sequence)

fasta_sequences = pd.DataFrame(
{"Protein ID": protein_ids, "Protein Sequence": protein_sequences}
)
return {"fasta_df": fasta_sequences}
128 changes: 92 additions & 36 deletions protzilla/methods/data_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,30 @@
k_means,
)
from protzilla.data_analysis.differential_expression_anova import anova
from protzilla.data_analysis.differential_expression_kruskal_wallis import kruskal_wallis_test_on_ptm_data, \
kruskal_wallis_test_on_intensity_data
from protzilla.data_analysis.differential_expression_kruskal_wallis import (
kruskal_wallis_test_on_intensity_data,
kruskal_wallis_test_on_ptm_data,
)
from protzilla.data_analysis.differential_expression_linear_model import linear_model
from protzilla.data_analysis.differential_expression_mann_whitney import (
mann_whitney_test_on_intensity_data, mann_whitney_test_on_ptm_data)
mann_whitney_test_on_intensity_data,
mann_whitney_test_on_ptm_data,
)
from protzilla.data_analysis.differential_expression_t_test import t_test
from protzilla.data_analysis.dimension_reduction import t_sne, umap
from protzilla.data_analysis.ptm_analysis import ptms_per_sample, \
ptms_per_protein_and_sample, select_peptides_of_protein
from protzilla.data_analysis.model_evaluation import evaluate_classification_model
from protzilla.data_analysis.plots import (
clustergram_plot,
create_volcano_plot,
prot_quant_plot,
scatter_plot,
)
from protzilla.data_analysis.protein_coverage import plot_protein_coverage
from protzilla.data_analysis.protein_graphs import peptides_to_isoform, variation_graph
from protzilla.data_analysis.ptm_analysis import (
select_peptides_of_protein,
ptms_per_protein_and_sample,
ptms_per_sample,
select_peptides_of_protein,
)
from protzilla.data_analysis.ptm_quantification import flexiquant_lf
from protzilla.methods.data_preprocessing import TransformationLog
Expand Down Expand Up @@ -164,8 +167,10 @@ def plot(self, inputs):
class DifferentialExpressionMannWhitneyOnIntensity(DataAnalysisStep):
display_name = "Mann-Whitney Test"
operation = "differential_expression"
method_description = ("A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
"The p-values are corrected for multiple testing.")
method_description = (
"A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
"The p-values are corrected for multiple testing."
)

input_keys = [
"protein_df",
Expand All @@ -191,7 +196,9 @@ def method(self, inputs: dict) -> dict:

def insert_dataframes(self, steps: StepManager, inputs) -> dict:
if steps.get_step_output(Step, "protein_df", inputs["protein_df"]) is not None:
inputs["protein_df"] = steps.get_step_output(Step, "protein_df", inputs["protein_df"])
inputs["protein_df"] = steps.get_step_output(
Step, "protein_df", inputs["protein_df"]
)
inputs["metadata_df"] = steps.metadata_df
inputs["log_base"] = steps.get_step_input(TransformationLog, "log_base")
return inputs
Expand All @@ -200,8 +207,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionMannWhitneyOnPTM(DataAnalysisStep):
display_name = "Mann-Whitney Test"
operation = "Peptide analysis"
method_description = ("A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
"The p-values are corrected for multiple testing.")
method_description = (
"A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
"The p-values are corrected for multiple testing."
)

input_keys = [
"ptm_df",
Expand Down Expand Up @@ -234,8 +243,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionKruskalWallisOnIntensity(DataAnalysisStep):
display_name = "Kruskal-Wallis Test"
operation = "differential_expression"
method_description = ("A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
"The p-values are corrected for multiple testing.")
method_description = (
"A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
"The p-values are corrected for multiple testing."
)

input_keys = [
"protein_df",
Expand Down Expand Up @@ -265,8 +276,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionKruskalWallisOnIntensity(DataAnalysisStep):
display_name = "Kruskal-Wallis Test"
operation = "differential_expression"
method_description = ("A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
"The p-values are corrected for multiple testing.")
method_description = (
"A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
"The p-values are corrected for multiple testing."
)

input_keys = [
"protein_df",
Expand All @@ -288,7 +301,9 @@ def method(self, inputs: dict) -> dict:
return kruskal_wallis_test_on_intensity_data(**inputs)

def insert_dataframes(self, steps: StepManager, inputs) -> dict:
inputs["protein_df"] = steps.get_step_output(Step, "protein_df", inputs["protein_df"])
inputs["protein_df"] = steps.get_step_output(
Step, "protein_df", inputs["protein_df"]
)
inputs["metadata_df"] = steps.metadata_df
inputs["log_base"] = steps.get_step_input(TransformationLog, "log_base")
return inputs
Expand All @@ -297,8 +312,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionKruskalWallisOnPTM(DataAnalysisStep):
display_name = "Kruskal-Wallis Test"
operation = "Peptide analysis"
method_description = ("A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
"The p-values are corrected for multiple testing.")
method_description = (
"A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
"The p-values are corrected for multiple testing."
)

input_keys = [
"ptm_df",
Expand Down Expand Up @@ -327,9 +344,11 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class PlotVolcano(PlotStep):
display_name = "Volcano Plot"
operation = "plot"
method_description = ("Plots the results of a differential expression analysis in a volcano plot. The x-axis shows "
"the log2 fold change and the y-axis shows the -log10 of the corrected p-values. The user "
"can define a fold change threshold and an alpha level to highlight significant items.")
method_description = (
"Plots the results of a differential expression analysis in a volcano plot. The x-axis shows "
"the log2 fold change and the y-axis shows the -log10 of the corrected p-values. The user "
"can define a fold change threshold and an alpha level to highlight significant items."
)
input_keys = [
"p_values",
"fc_threshold",
Expand Down Expand Up @@ -762,6 +781,33 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
return inputs


class PlotProteinCoverage(PlotStep):
display_name = "Protein Coverage Plot"
operation = "plot"
method_description = (
"Create a protein coverage plot from a protein graph and peptide data"
)

input_keys = [
"protein_id",
"fasta_df",
"peptide_df",
]
output_keys = []

def method(self, inputs: dict) -> dict:
return plot_protein_coverage(**inputs)

def insert_dataframes(self, steps: StepManager, inputs) -> dict:
inputs["fasta_df"] = steps.get_step_output(
Step, "fasta_df", inputs["fasta_df_instance"]
)
inputs["peptide_df"] = steps.get_step_output(
Step, "peptide_df", inputs["peptide_df_instance"]
)
return


class ProteinGraphVariationGraph(DataAnalysisStep):
display_name = "Protein Variation Graph"
operation = "protein_graph"
Expand Down Expand Up @@ -841,26 +887,34 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
inputs["metadata_df"] = steps.metadata_df

if inputs["auto_select"]:
significant_proteins = (
steps.get_step_output(DataAnalysisStep, "significant_proteins_df", inputs["protein_list"]))
index_of_most_significant_protein = significant_proteins['corrected_p_value'].idxmin()
most_significant_protein = significant_proteins.loc[index_of_most_significant_protein]
significant_proteins = steps.get_step_output(
DataAnalysisStep, "significant_proteins_df", inputs["protein_list"]
)
index_of_most_significant_protein = significant_proteins[
"corrected_p_value"
].idxmin()
most_significant_protein = significant_proteins.loc[
index_of_most_significant_protein
]
inputs["protein_id"] = [most_significant_protein["Protein ID"]]
self.messages.append({
"level": logging.INFO,
"msg":
f"Selected the most significant Protein: {most_significant_protein['Protein ID']}, "
f"from {inputs['protein_list']}"
})
self.messages.append(
{
"level": logging.INFO,
"msg": f"Selected the most significant Protein: {most_significant_protein['Protein ID']}, "
f"from {inputs['protein_list']}",
}
)

return inputs


class PTMsPerSample(DataAnalysisStep):
display_name = "PTMs per Sample"
operation = "Peptide analysis"
method_description = ("Analyze the post-translational modifications (PTMs) of a single protein of interest. "
"This function requires a peptide dataframe with PTM information.")
method_description = (
"Analyze the post-translational modifications (PTMs) of a single protein of interest. "
"This function requires a peptide dataframe with PTM information."
)

input_keys = [
"peptide_df",
Expand All @@ -882,8 +936,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class PTMsProteinAndPerSample(DataAnalysisStep):
display_name = "PTMs per Sample and Protein"
operation = "Peptide analysis"
method_description = ("Analyze the post-translational modifications (PTMs) of all Proteins. "
"This function requires a peptide dataframe with PTM information.")
method_description = (
"Analyze the post-translational modifications (PTMs) of all Proteins. "
"This function requires a peptide dataframe with PTM information."
)

input_keys = [
"peptide_df",
Expand All @@ -899,4 +955,4 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
inputs["peptide_df"] = steps.get_step_output(
Step, "peptide_df", inputs["peptide_df"]
)
return inputs
return inputs
21 changes: 18 additions & 3 deletions protzilla/methods/importing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from protzilla.importing.fasta_import import fasta_import
from protzilla.importing.metadata_import import (
metadata_column_assignment,
metadata_import_method,
Expand All @@ -10,7 +11,7 @@
max_quant_import,
ms_fragger_import,
)
from protzilla.importing.peptide_import import peptide_import, evidence_import
from protzilla.importing.peptide_import import evidence_import, peptide_import
from protzilla.steps import Step, StepManager


Expand Down Expand Up @@ -51,7 +52,9 @@ def method(self, inputs):
class MsFraggerImport(ImportingStep):
display_name = "MS Fragger Combined Protein Import"
operation = "Protein Data Import"
method_description = "Import the combined_protein.tsv file form output of MS Fragger"
method_description = (
"Import the combined_protein.tsv file form output of MS Fragger"
)

input_keys = ["file_path", "intensity_name", "map_to_uniprot", "aggregation_method"]
output_keys = ["protein_df"]
Expand Down Expand Up @@ -139,4 +142,16 @@ class EvidenceImport(ImportingStep):
output_keys = ["peptide_df"]

def method(self, inputs):
return evidence_import(**inputs)
return evidence_import(**inputs)


class FastaImport(ImportingStep):
display_name = "Fasta Protein Sequence Import"
operation = "fasta_import"
method_description = "Import a fasta file containing protein sequences."

input_keys = ["file_path"]
output_keys = ["fasta_df"]

def method(self, inputs):
return fasta_import(**inputs)
Loading

0 comments on commit 51dff94

Please sign in to comment.