Skip to content

Commit

Permalink
Add modification caching; Constant terminal mods;
Browse files Browse the repository at this point in the history
**Modification caching**
All ModificationResolver types now use an in-memory cache for
resolved modification definitions, reducing overhead of resolving
the same rule over and over again.

Sub-classes should move their implementation of `resolve` to the
`_resolve_impl` method, otherwise the cache will not be used.

To disable the cache for a resolver instance, call `resolver.enable_caching(False)`.

**Constant terminal modifications**
This implements support for the syntax discussed in HUPO-PSI/ProForma#6
to include constant modification rules that apply to specific sequence
terminals with or without specific amino acids.
  • Loading branch information
mobiusklein committed Apr 20, 2024
1 parent 196c179 commit 39a017b
Showing 1 changed file with 175 additions and 27 deletions.
202 changes: 175 additions & 27 deletions pyteomics/proforma.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,29 @@ def __init__(self, name, **kwargs):
self.name = name.lower()
self.symbol = self.name[0]
self._database = None
self._cache = {}

def clear_cache(self):
"""Clear the modification definition cache"""
self._cache.clear()

def enable_caching(self, flag=True):
"""
Enable or disable caching of modification definitions.
If `flag` is :const:`False`, this will also dispose of any
existing cached values.
Parameters
----------
flag : :class:`bool`
Whether or not to disable the cache
"""
if flag:
if not self._cache:
self._cache = {}
else:
self._cache = None

def load_database(self):
raise NotImplementedError()
Expand Down Expand Up @@ -316,9 +339,19 @@ def parse_identifier(self, identifier):
id = None
return name, id

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
raise NotImplementedError()

def resolve(self, name=None, id=None, **kwargs):
if self._cache is None:
return self._resolve_impl(name, id, **kwargs)
cache_key = (name, id, frozenset(kwargs.items()))
if cache_key in self._cache:
return self._cache[cache_key].copy()
value = self._resolve_impl(name, id, **kwargs)
self._cache[cache_key] = value
return value.copy()

def __call__(self, name=None, id=None, **kwargs):
return self.resolve(name, id, **kwargs)

Expand All @@ -343,7 +376,7 @@ def load_database(self):
return obo_cache.resolve("http://www.unimod.org/obo/unimod.obo")
return Unimod()

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
strict = kwargs.get("strict", self.strict)
exhaustive = kwargs.get("exhaustive", True)
if name is not None:
Expand Down Expand Up @@ -398,7 +431,7 @@ def __init__(self, **kwargs):
def load_database(self):
return load_psimod()

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
if name is not None:
defn = self.database[name]
elif id is not None:
Expand Down Expand Up @@ -443,7 +476,7 @@ def __init__(self, **kwargs):
def load_database(self):
return load_xlmod()

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
if name is not None:
defn = self.database[name]
elif id is not None:
Expand Down Expand Up @@ -562,7 +595,7 @@ def get_mass_from_term(self, term, raw_mass):
"Only a rough approximation is available.") % (term, ))
return rough_mass

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
if name is not None:
term = self.database[name]
elif id is not None:
Expand Down Expand Up @@ -613,7 +646,7 @@ def parse_identifier(self, identifier):
"""
return identifier, None

def resolve(self, name=None, id=None, **kwargs):
def _resolve_impl(self, name=None, id=None, **kwargs):
defn = None
for resolver in self.resolvers:
try:
Expand Down Expand Up @@ -1251,6 +1284,56 @@ def process_tag_tokens(tokens):
return main_tag


class ModificationTarget(object):
def __init__(self, aa, n_term=False, c_term=False):
self.aa = aa
self.n_term = n_term
self.c_term = c_term

def __eq__(self, other):
if isinstance(other, str):
return str(self) == other
else:
return (
self.aa == other.aa
and self.n_term == other.n_term
and self.c_term == other.c_term
)

def __ne__(self, other):
if isinstance(other, str):
return str(self) != other
else:
return (
self.aa != other.aa
or self.n_term != other.n_term
or self.c_term != other.c_term
)

def __hash__(self):
return hash(str(self))

def __str__(self):
buffer = []
if self.n_term:
buffer.append('N-term')
if self.c_term:
buffer.append('C-term')
if self.aa:
buffer.append(self.aa)
return ':'.join(buffer)

def __repr__(self):
return str(self)

def is_valid(self, aa, n_term, c_term):
if (n_term and self.n_term) or (c_term and self.c_term):
if (self.aa and aa == self.aa) or self.aa is None:
return True
return False
return self.aa == aa or self.aa is None


class ModificationRule(object):
'''Define a fixed modification rule which dictates a modification tag is
always applied at one or more amino acid residues.
Expand All @@ -1267,6 +1350,53 @@ class ModificationRule(object):
def __init__(self, modification_tag, targets=None):
self.modification_tag = modification_tag
self.targets = targets
self._validate_targets()

def is_valid(self, aa, n_term, c_term):
return any(target.is_valid(aa, n_term, c_term) for target in self.targets)

def _validate_targets(self):
validated_targets = []
if self.targets is None:
self.targets = []
elif not isinstance(self.targets, list):
self.targets = [self.targets]
for target in self.targets:
if target in VALID_AA:
validated_targets.append(ModificationTarget(target, False, False))
elif target in ("N-term", "C-term"):
n_term = target == "N-term"
c_term = target == "C-term"
validated_targets.append(ModificationTarget(None, n_term, c_term))
elif target.startswith(("N-term:", "C-term:")):
tokens = target.split(":")
if len(tokens) == 2:
if tokens[1] in VALID_AA:
n_term = tokens[0] == "N-term"
c_term = tokens[0] == "C-term"
validated_targets.append(ModificationTarget(tokens[1], n_term, c_term))
else:
raise PyteomicsError(
"Modification rule {0} has an invalid amino acid specific terminal target {2} in {1}".format(
self,
target,
tokens[1]
)
)
else:
raise PyteomicsError(
"Modification rule {0} has an empty amino acid specific terminal target {1}".format(
self, target
)
)
else:
raise PyteomicsError(
"Modification rule {0} has an invalid target {1}".format(
self, target
)
)

self.targets = validated_targets

def __eq__(self, other):
if other is None:
Expand All @@ -1277,7 +1407,7 @@ def __ne__(self, other):
return not self == other

def __str__(self):
targets = ','.join(self.targets)
targets = ','.join(map(str, self.targets))
return "<[{self.modification_tag}]@{targets}>".format(self=self, targets=targets)

def __repr__(self):
Expand Down Expand Up @@ -1609,6 +1739,7 @@ class ParserStateEnum(Enum):
inter_chain_cross_link_start = 20
chimeric_start = 21
interval_initial = 22
post_global_terminal = 23
done = 999


Expand All @@ -1628,6 +1759,7 @@ class ParserStateEnum(Enum):
UNLOCALIZED_COUNT = ParserStateEnum.unlocalized_count
POST_GLOBAL = ParserStateEnum.post_global
POST_GLOBAL_AA = ParserStateEnum.post_global_aa
POST_GLOBAL_TERM = ParserStateEnum.post_global_terminal
POST_INTERVAL_TAG = ParserStateEnum.post_interval_tag
CHARGE_START = ParserStateEnum.charge_state_start
CHARGE_NUMBER = ParserStateEnum.charge_state_number
Expand All @@ -1636,6 +1768,7 @@ class ParserStateEnum(Enum):
DONE = ParserStateEnum.done

VALID_AA = set("QWERTYIPASDFGHKLCVNMXUOJZB")
TERMINAL_SPEC_CHARS = set('N-term') | set('C-term') | set("ncT: ")

def parse(sequence):
'''Tokenize a ProForma sequence into a sequence of amino acid+tag positions, and a
Expand Down Expand Up @@ -1678,7 +1811,7 @@ def parse(sequence):
current_tag = TagParser()
current_interval = None
current_unlocalized_count = NumberParser()
current_aa_targets = TokenBuffer()
current_aa_targets = StringParser()

charge_buffer = None
adduct_buffer = None
Expand Down Expand Up @@ -1868,13 +2001,13 @@ def parse(sequence):
depth = 1
tag = current_tag()[0]
multiplicity = current_unlocalized_count()
for i in range(multiplicity):
for _ in range(multiplicity):
unlocalized_modifications.append(tag)
elif c == '?':
state = BEFORE
tag = current_tag()[0]
multiplicity = current_unlocalized_count()
for i in range(multiplicity):
for _ in range(multiplicity):
unlocalized_modifications.append(tag)
else:
raise ProFormaError(
Expand All @@ -1887,14 +2020,25 @@ def parse(sequence):
("Error In State {state}, fixed modification detected without "
"target amino acids found at index {i}").format(**locals()), i, state)
elif state == POST_GLOBAL_AA:
if c in VALID_AA:
if c in VALID_AA or c in TERMINAL_SPEC_CHARS:
current_aa_targets.append(c)
elif c == ',':
# the next character should be another amino acid
pass
current_aa_targets.bound()
elif c == '>':
fixed_modifications.append(
ModificationRule(current_tag()[0], current_aa_targets()))
try:
v = current_aa_targets()
fixed_modifications.append(
ModificationRule(current_tag()[0], v))
except PyteomicsError as err:
raise ProFormaError(
(
"Error In State {state}, fixed modification detected invalid "
"target found at index {i}: {err}"
).format(state=state, i=i, err=err),
i,
state,
)
state = BEFORE
else:
raise ProFormaError(
Expand Down Expand Up @@ -2164,19 +2308,20 @@ def mass(self):
mass = 0.0

fixed_modifications = self.properties['fixed_modifications']
fixed_rules = {}
for rule in fixed_modifications:
for aa in rule.targets:
fixed_rules[aa] = rule.modification_tag.mass

for position in self.sequence:
n_term_v = 0
c_term_v = len(self) - 1
for i, position in enumerate(self.sequence):
aa = position[0]
try:
mass += std_aa_mass[aa]
except KeyError:
warnings.warn("%r does not have an exact mass" % (aa, ))
if aa in fixed_rules:
mass += fixed_rules[aa]
n_term = i == n_term_v
c_term = i == c_term_v
for rule in fixed_modifications:
if rule.is_valid(aa, n_term, c_term):
mass += rule.modification_tag.mass
tags = position[1]
if tags:
for tag in tags:
Expand Down Expand Up @@ -2261,10 +2406,6 @@ def fragments(self, ion_shift, charge=1, reverse=None, include_labile=True, incl
mass += ion_shift

fixed_modifications = self.properties['fixed_modifications']
fixed_rules = {}
for rule in fixed_modifications:
for aa in rule.targets:
fixed_rules[aa] = rule.modification_tag.mass

intervals = self.intervals
if intervals:
Expand Down Expand Up @@ -2298,8 +2439,12 @@ def fragments(self, ion_shift, charge=1, reverse=None, include_labile=True, incl

if not reverse:
iterator = (iter(range(0, n - 1)))
n_term_v = 0
c_term_v = n - 1
else:
iterator = (reversed(range(1, n)))
n_term_v = n - 1
c_term_v = 0

for i in iterator:
position = self.sequence[i]
Expand All @@ -2310,8 +2455,11 @@ def fragments(self, ion_shift, charge=1, reverse=None, include_labile=True, incl
except KeyError:
warnings.warn("%r does not have an exact mass" % (aa, ))

if aa in fixed_rules:
mass += fixed_rules[aa]
n_term = i == n_term_v
c_term = i == c_term_v
for rule in fixed_modifications:
if rule.is_valid(aa, n_term, c_term):
mass += rule.modification_tag.mass

tags = position[1]
if tags:
Expand Down

0 comments on commit 39a017b

Please sign in to comment.