Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add modification caching; Constant terminal mods for ProForma #148

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 175 additions & 27 deletions pyteomics/proforma.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,29 @@ def __init__(self, name, **kwargs):
self.name = name.lower()
self.symbol = self.name[0]
self._database = None
self._cache = {}

def clear_cache(self):
"""Clear the modification definition cache"""
self._cache.clear()

def enable_caching(self, flag=True):
"""
Enable or disable caching of modification definitions.
If `flag` is :const:`False`, this will also dispose of any
existing cached values.
Parameters
----------
flag : :class:`bool`
Whether or not to disable the cache
"""
if flag:
if not self._cache:
self._cache = {}
else:
self._cache = None

def load_database(self):
raise NotImplementedError()
Expand Down Expand Up @@ -316,9 +339,19 @@ def parse_identifier(self, identifier):
id = None
return name, id

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
raise NotImplementedError()

def resolve(self, name=None, id=None, **kwargs):
if self._cache is None:
return self._resolve_impl(name, id, **kwargs)
cache_key = (name, id, frozenset(kwargs.items()))
if cache_key in self._cache:
return self._cache[cache_key].copy()
value = self._resolve_impl(name, id, **kwargs)
self._cache[cache_key] = value
return value.copy()

def __call__(self, name=None, id=None, **kwargs):
return self.resolve(name, id, **kwargs)

Expand All @@ -343,7 +376,7 @@ def load_database(self):
return obo_cache.resolve("http://www.unimod.org/obo/unimod.obo")
return Unimod()

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
strict = kwargs.get("strict", self.strict)
exhaustive = kwargs.get("exhaustive", True)
if name is not None:
Expand Down Expand Up @@ -398,7 +431,7 @@ def __init__(self, **kwargs):
def load_database(self):
return load_psimod()

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
if name is not None:
defn = self.database[name]
elif id is not None:
Expand Down Expand Up @@ -443,7 +476,7 @@ def __init__(self, **kwargs):
def load_database(self):
return load_xlmod()

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
if name is not None:
defn = self.database[name]
elif id is not None:
Expand Down Expand Up @@ -562,7 +595,7 @@ def get_mass_from_term(self, term, raw_mass):
"Only a rough approximation is available.") % (term, ))
return rough_mass

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
if name is not None:
term = self.database[name]
elif id is not None:
Expand Down Expand Up @@ -613,7 +646,7 @@ def parse_identifier(self, identifier):
"""
return identifier, None

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
defn = None
for resolver in self.resolvers:
try:
Expand Down Expand Up @@ -1251,6 +1284,56 @@ def process_tag_tokens(tokens):
return main_tag


class ModificationTarget(object):
def __init__(self, aa, n_term=False, c_term=False):
self.aa = aa
self.n_term = n_term
self.c_term = c_term

def __eq__(self, other):
if isinstance(other, str):
return str(self) == other
else:
return (
self.aa == other.aa
and self.n_term == other.n_term
and self.c_term == other.c_term
)

def __ne__(self, other):
if isinstance(other, str):
return str(self) != other
else:
return (
self.aa != other.aa
or self.n_term != other.n_term
or self.c_term != other.c_term
)

def __hash__(self):
return hash(str(self))

def __str__(self):
buffer = []
if self.n_term:
buffer.append('N-term')
if self.c_term:
buffer.append('C-term')
if self.aa:
buffer.append(self.aa)
return ':'.join(buffer)

def __repr__(self):
return str(self)

def is_valid(self, aa, n_term, c_term):
if (n_term and self.n_term) or (c_term and self.c_term):
if (self.aa and aa == self.aa) or self.aa is None:
return True
return False
return self.aa == aa or self.aa is None


class ModificationRule(object):
'''Define a fixed modification rule which dictates a modification tag is
always applied at one or more amino acid residues.
Expand All @@ -1267,6 +1350,53 @@ class ModificationRule(object):
def __init__(self, modification_tag, targets=None):
self.modification_tag = modification_tag
self.targets = targets
self._validate_targets()

def is_valid(self, aa, n_term, c_term):
return any(target.is_valid(aa, n_term, c_term) for target in self.targets)

def _validate_targets(self):
validated_targets = []
if self.targets is None:
self.targets = []
elif not isinstance(self.targets, list):
self.targets = [self.targets]
for target in self.targets:
if target in VALID_AA:
validated_targets.append(ModificationTarget(target, False, False))
elif target in ("N-term", "C-term"):
n_term = target == "N-term"
c_term = target == "C-term"
validated_targets.append(ModificationTarget(None, n_term, c_term))
elif target.startswith(("N-term:", "C-term:")):
tokens = target.split(":")
if len(tokens) == 2:
if tokens[1] in VALID_AA:
n_term = tokens[0] == "N-term"
c_term = tokens[0] == "C-term"
validated_targets.append(ModificationTarget(tokens[1], n_term, c_term))
else:
raise PyteomicsError(
"Modification rule {0} has an invalid amino acid specific terminal target {2} in {1}".format(
self,
target,
tokens[1]
)
)
else:
raise PyteomicsError(
"Modification rule {0} has an empty amino acid specific terminal target {1}".format(
self, target
)
)
else:
raise PyteomicsError(
"Modification rule {0} has an invalid target {1}".format(
self, target
)
)

self.targets = validated_targets

def __eq__(self, other):
if other is None:
Expand All @@ -1277,7 +1407,7 @@ def __ne__(self, other):
return not self == other

def __str__(self):
targets = ','.join(self.targets)
targets = ','.join(map(str, self.targets))
return "<[{self.modification_tag}]@{targets}>".format(self=self, targets=targets)

def __repr__(self):
Expand Down Expand Up @@ -1609,6 +1739,7 @@ class ParserStateEnum(Enum):
inter_chain_cross_link_start = 20
chimeric_start = 21
interval_initial = 22
post_global_terminal = 23
done = 999


Expand All @@ -1628,6 +1759,7 @@ class ParserStateEnum(Enum):
UNLOCALIZED_COUNT = ParserStateEnum.unlocalized_count
POST_GLOBAL = ParserStateEnum.post_global
POST_GLOBAL_AA = ParserStateEnum.post_global_aa
POST_GLOBAL_TERM = ParserStateEnum.post_global_terminal
POST_INTERVAL_TAG = ParserStateEnum.post_interval_tag
CHARGE_START = ParserStateEnum.charge_state_start
CHARGE_NUMBER = ParserStateEnum.charge_state_number
Expand All @@ -1636,6 +1768,7 @@ class ParserStateEnum(Enum):
DONE = ParserStateEnum.done

VALID_AA = set("QWERTYIPASDFGHKLCVNMXUOJZB")
TERMINAL_SPEC_CHARS = set('N-term') | set('C-term') | set("ncT: ")

def parse(sequence):
'''Tokenize a ProForma sequence into a sequence of amino acid+tag positions, and a
Expand Down Expand Up @@ -1678,7 +1811,7 @@ def parse(sequence):
current_tag = TagParser()
current_interval = None
current_unlocalized_count = NumberParser()
current_aa_targets = TokenBuffer()
current_aa_targets = StringParser()

charge_buffer = None
adduct_buffer = None
Expand Down Expand Up @@ -1868,13 +2001,13 @@ def parse(sequence):
depth = 1
tag = current_tag()[0]
multiplicity = current_unlocalized_count()
for i in range(multiplicity):
for _ in range(multiplicity):
unlocalized_modifications.append(tag)
elif c == '?':
state = BEFORE
tag = current_tag()[0]
multiplicity = current_unlocalized_count()
for i in range(multiplicity):
for _ in range(multiplicity):
unlocalized_modifications.append(tag)
else:
raise ProFormaError(
Expand All @@ -1887,14 +2020,25 @@ def parse(sequence):
("Error In State {state}, fixed modification detected without "
"target amino acids found at index {i}").format(**locals()), i, state)
elif state == POST_GLOBAL_AA:
if c in VALID_AA:
if c in VALID_AA or c in TERMINAL_SPEC_CHARS:
current_aa_targets.append(c)
elif c == ',':
# the next character should be another amino acid
pass
current_aa_targets.bound()
elif c == '>':
fixed_modifications.append(
ModificationRule(current_tag()[0], current_aa_targets()))
try:
v = current_aa_targets()
fixed_modifications.append(
ModificationRule(current_tag()[0], v))
except PyteomicsError as err:
raise ProFormaError(
(
"Error In State {state}, fixed modification detected invalid "
"target found at index {i}: {err}"
).format(state=state, i=i, err=err),
i,
state,
)
state = BEFORE
else:
raise ProFormaError(
Expand Down Expand Up @@ -2164,19 +2308,20 @@ def mass(self):
mass = 0.0

fixed_modifications = self.properties['fixed_modifications']
fixed_rules = {}
for rule in fixed_modifications:
for aa in rule.targets:
fixed_rules[aa] = rule.modification_tag.mass

for position in self.sequence:
n_term_v = 0
c_term_v = len(self) - 1
for i, position in enumerate(self.sequence):
aa = position[0]
try:
mass += std_aa_mass[aa]
except KeyError:
warnings.warn("%r does not have an exact mass" % (aa, ))
if aa in fixed_rules:
mass += fixed_rules[aa]
n_term = i == n_term_v
c_term = i == c_term_v
for rule in fixed_modifications:
if rule.is_valid(aa, n_term, c_term):
mass += rule.modification_tag.mass
tags = position[1]
if tags:
for tag in tags:
Expand Down Expand Up @@ -2261,10 +2406,6 @@ def fragments(self, ion_shift, charge=1, reverse=None, include_labile=True, incl
mass += ion_shift

fixed_modifications = self.properties['fixed_modifications']
fixed_rules = {}
for rule in fixed_modifications:
for aa in rule.targets:
fixed_rules[aa] = rule.modification_tag.mass

intervals = self.intervals
if intervals:
Expand Down Expand Up @@ -2298,8 +2439,12 @@ def fragments(self, ion_shift, charge=1, reverse=None, include_labile=True, incl

if not reverse:
iterator = (iter(range(0, n - 1)))
n_term_v = 0
c_term_v = n - 1
else:
iterator = (reversed(range(1, n)))
n_term_v = n - 1
c_term_v = 0

for i in iterator:
position = self.sequence[i]
Expand All @@ -2310,8 +2455,11 @@ def fragments(self, ion_shift, charge=1, reverse=None, include_labile=True, incl
except KeyError:
warnings.warn("%r does not have an exact mass" % (aa, ))

if aa in fixed_rules:
mass += fixed_rules[aa]
n_term = i == n_term_v
c_term = i == c_term_v
for rule in fixed_modifications:
if rule.is_valid(aa, n_term, c_term):
mass += rule.modification_tag.mass

tags = position[1]
if tags:
Expand Down