From b45ec17ded7434f87744b8feba4e88c9ff3c8a97 Mon Sep 17 00:00:00 2001 From: Jeff Knaggs Date: Wed, 3 Jan 2018 17:02:00 -0800 Subject: [PATCH 1/4] allow constant truth values in boolean conditions --- pyvdrm/asi2.py | 17 ++++++++++++++++- pyvdrm/tests/test_asi2.py | 8 ++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pyvdrm/asi2.py b/pyvdrm/asi2.py index 348f975..99890cf 100644 --- a/pyvdrm/asi2.py +++ b/pyvdrm/asi2.py @@ -83,6 +83,18 @@ def __call__(self, mutations): return Score(not child_score.score, child_score.residues) +class BoolTrue(AsiExpr): + """Boolean True constant""" + def __call__(self, *args): + return Score(True, []) + + +class BoolFalse(AsiExpr): + """Boolean False constant""" + def __call__(self, *args): + return Score(False, []) + + class AndExpr(AsiExpr): """Fold boolean AND on children""" @@ -279,8 +291,11 @@ def parser(self, rule): selectstatement = select + select_quantifier + from_ + residue_list selectstatement.setParseAction(SelectFrom) + bool_ = Literal('TRUE').suppress().setParseAction(BoolTrue) |\ + Literal('FALSE').suppress().setParseAction(BoolFalse) + booleancondition = Forward() - condition = residue | excludestatement | selectstatement + condition = residue | excludestatement | selectstatement | bool_ booleancondition << infixNotation(condition, [(and_, 2, opAssoc.LEFT, AndExpr), diff --git a/pyvdrm/tests/test_asi2.py b/pyvdrm/tests/test_asi2.py index 7b19741..459c766 100644 --- a/pyvdrm/tests/test_asi2.py +++ b/pyvdrm/tests/test_asi2.py @@ -81,6 +81,14 @@ def test_bool_and(self): self.assertEqual(rule(VariantCalls("7Y 1G 2T")), True) self.assertEqual(rule([]), False) + def test_bool_constants(self): + rule = ASI2("TRUE OR 1G") + self.assertEqual(rule(VariantCalls("2G")), True) + rule = ASI2("FALSE AND 1G") + self.assertEqual(rule(VariantCalls("1G")), False) + rule = ASI2("TRUE OR (FALSE AND TRUE)") + self.assertEqual(rule(VariantCalls("1G")), True) + def test_bool_or(self): rule = ASI2("1G OR (2T OR 7Y)") self.assertTrue(rule(VariantCalls("2T"))) From fa23ccd87345537b72e12c0fcceecf860c7b9740 Mon Sep 17 00:00:00 2001 From: Jeff Knaggs Date: Mon, 8 Jan 2018 10:54:30 -0800 Subject: [PATCH 2/4] break out HCV specific changes to separate module --- pyvdrm/asi2.py | 17 +-- pyvdrm/hcvr.py | 317 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 318 insertions(+), 16 deletions(-) create mode 100644 pyvdrm/hcvr.py diff --git a/pyvdrm/asi2.py b/pyvdrm/asi2.py index 99890cf..348f975 100644 --- a/pyvdrm/asi2.py +++ b/pyvdrm/asi2.py @@ -83,18 +83,6 @@ def __call__(self, mutations): return Score(not child_score.score, child_score.residues) -class BoolTrue(AsiExpr): - """Boolean True constant""" - def __call__(self, *args): - return Score(True, []) - - -class BoolFalse(AsiExpr): - """Boolean False constant""" - def __call__(self, *args): - return Score(False, []) - - class AndExpr(AsiExpr): """Fold boolean AND on children""" @@ -291,11 +279,8 @@ def parser(self, rule): selectstatement = select + select_quantifier + from_ + residue_list selectstatement.setParseAction(SelectFrom) - bool_ = Literal('TRUE').suppress().setParseAction(BoolTrue) |\ - Literal('FALSE').suppress().setParseAction(BoolFalse) - booleancondition = Forward() - condition = residue | excludestatement | selectstatement | bool_ + condition = residue | excludestatement | selectstatement booleancondition << infixNotation(condition, [(and_, 2, opAssoc.LEFT, AndExpr), diff --git a/pyvdrm/hcvr.py b/pyvdrm/hcvr.py new file mode 100644 index 0000000..99890cf --- /dev/null +++ b/pyvdrm/hcvr.py @@ -0,0 +1,317 @@ +""" +ASI2 Parser definition +""" + +from functools import reduce, total_ordering +from pyparsing import (Literal, nums, Word, Forward, Optional, Regex, + infixNotation, delimitedList, opAssoc) +from pyvdrm.drm import AsiExpr, AsiBinaryExpr, AsiUnaryExpr, DRMParser +from pyvdrm.vcf import MutationSet + + +def maybe_foldl(func, noneable): + """Safely fold a function over a potentially empty list of + potentially null values""" + if noneable is None: + return None + clean = [x for x in noneable if x is not None] + if not clean: + return None + return reduce(func, clean) + + +def maybe_map(func, noneable): + if noneable is None: + return None + r_list = [] + for x in noneable: + if x is None: + continue + result = func(x) + if result is None: + continue + r_list.append(result) + if not r_list: + return None + return r_list + + +@total_ordering +class Score(object): + """Encapsulate a score and the residues that support it""" + + residues = set([]) + score = None + + def __init__(self, score, residues): + """ Initialize. + + :param bool|float score: value of the score + :param residues: sequence of Mutations + """ + self.score = score + self.residues = set(residues) + + def __add__(self, other): + return Score(self.score + other.score, self.residues | other.residues) + + def __sub__(self, other): + return Score(self.score - other.score, self.residues | other.residues) + + def __repr__(self): + return "Score({!r}, {!r})".format(self.score, self.residues) + + def __eq__(self, other): + return self.score == other.score + + def __lt__(self, other): + # the total_ordering decorator populates the other 5 comparison + # operations. Implement them explicitly if this causes performance + # issues + return self.score < other.score + + def __bool__(self): + return self.score + + +class Negate(AsiExpr): + """Unary negation of boolean child""" + def __call__(self, mutations): + child_score = self.children[0](mutations) + if child_score is None: + return Score(True, []) # TODO: propagate negative residues + return Score(not child_score.score, child_score.residues) + + +class BoolTrue(AsiExpr): + """Boolean True constant""" + def __call__(self, *args): + return Score(True, []) + + +class BoolFalse(AsiExpr): + """Boolean False constant""" + def __call__(self, *args): + return Score(False, []) + + +class AndExpr(AsiExpr): + """Fold boolean AND on children""" + + def __call__(self, mutations): + scores = map(lambda f: f(mutations), self.children[0]) + scores = [Score(False, []) if s is None else s for s in scores] + if not scores: + raise ValueError + + residues = set([]) + for s in scores: + if not s.score: + return Score(False, []) + residues = residues | s.residues + + return Score(True, residues) + + +class OrExpr(AsiBinaryExpr): + """Boolean OR on children (binary only)""" + + def __call__(self, mutations): + arg1, arg2 = self.children + + score1 = arg1(mutations) + score2 = arg2(mutations) + + if score1 is None: + score1 = Score(False, []) + if score2 is None: + score2 = Score(False, []) + + return Score(score1.score or score2.score, + score1.residues | score2.residues) + + +class EqualityExpr(AsiExpr): + """ASI2 inequality expressions""" + + def __init__(self, label, pos, children): + super().__init__(label, pos, children) + self.operation, limit = children + self.limit = int(limit) + + def __call__(self, x): + if self.operation == 'ATLEAST': + return x >= self.limit + elif self.operation == 'EXACTLY': + return x == self.limit + elif self.operation == 'NOMORETHAN': + return x <= self.limit + + raise NotImplementedError + + +class ScoreExpr(AsiExpr): + """Score expressions propagate DRM scores""" + + def __call__(self, mutations): + if len(self.children) == 3: + operation, minus, score = self.children + if minus != '-': + raise ValueError + score = -1 * int(score) + elif len(self.children) == 2: + operation, score = self.children + score = int(score) + else: + raise ValueError + + # evaluate operation and return score + result = operation(mutations) + if result is None: + return None + + if result.score is False: + return Score(0, []) + return Score(score, result.residues) + + +class ScoreList(AsiExpr): + """Lists of scores are either summed or maxed""" + + def __call__(self, mutations): + operation, *rest = self.children + if operation == 'MAX': + return maybe_foldl(max, [f(mutations) for f in rest]) + + # the default operation is sum + return maybe_foldl(lambda x, y: x+y, [f(mutations) for f in self.children]) + + +class SelectFrom(AsiExpr): + """Return True if some number of mutations match""" + + def typecheck(self, tokens): + # if type(tokens[0]) != EqualityExpr: + # raise TypeError() + pass + + def __call__(self, mutations): + operation, *rest = self.children + # the head of the arg list must be an equality expression + + scored = list(maybe_map(lambda f: f(mutations), rest)) + passing = len(scored) + + if operation(passing): + return Score(True, maybe_foldl( + lambda x, y: x.residues.union(y.residues), scored)) + else: + return None + + +class AsiScoreCond(AsiExpr): + """Score condition""" + + label = "ScoreCond" + + def __call__(self, args): + """Score conditions evaluate a list of expressions and sum scores""" + return maybe_foldl(lambda x, y: x+y, map(lambda x: x(args), self.children)) + + +class AsiMutations(object): + """List of mutations given an ambiguous pattern""" + + def __init__(self, _label=None, _pos=None, args=None): + """Initialize set of mutations from a potentially ambiguous residue + """ + self.mutations = args and MutationSet(''.join(args)) + + def __repr__(self): + if self.mutations is None: + return "AsiMutations()" + return "AsiMutations(args={!r})".format(str(self.mutations)) + + def __call__(self, env): + for mutation_set in env: + intersection = self.mutations.mutations & mutation_set.mutations + if len(intersection) > 0: + return Score(True, intersection) + return None + + +class ASI2(DRMParser): + """ASI2 Syntax definition""" + + def parser(self, rule): + + select = Literal('SELECT').suppress() + except_ = Literal('EXCEPT') + exactly = Literal('EXACTLY') + atleast = Literal('ATLEAST') + + from_ = Literal('FROM').suppress() + + max_ = Literal('MAX') + + and_ = Literal('AND').suppress() + or_ = Literal('OR').suppress() + # min_ = Literal('MIN') + + notmorethan = Literal('NOTMORETHAN') + l_par = Literal('(').suppress() + r_par = Literal(')').suppress() + mapper = Literal('=>').suppress() + integer = Word(nums) + + mutation = Optional(Regex(r'[A-Z]')) + integer + Regex(r'[diA-Z]+') + mutation.setParseAction(AsiMutations) + + not_ = Literal('NOT').suppress() + mutation + not_.setParseAction(Negate) + + residue = mutation | not_ + # integer + l_par + not_ + Regex(r'[A-Z]+') + r_par + # roll this next rule into the mutation object + + # Syntax of ASI expressions + excludestatement = except_ + residue + + quantifier = exactly | atleast | notmorethan + inequality = quantifier + integer + inequality.setParseAction(EqualityExpr) + + select_quantifier = infixNotation(inequality, + [(and_, 2, opAssoc.LEFT, AndExpr), + (or_, 2, opAssoc.LEFT, OrExpr)]) + + residue_list = l_par + delimitedList(residue) + r_par + + # so selectstatement.eval :: [Mutation] -> Maybe Bool + selectstatement = select + select_quantifier + from_ + residue_list + selectstatement.setParseAction(SelectFrom) + + bool_ = Literal('TRUE').suppress().setParseAction(BoolTrue) |\ + Literal('FALSE').suppress().setParseAction(BoolFalse) + + booleancondition = Forward() + condition = residue | excludestatement | selectstatement | bool_ + + booleancondition << infixNotation(condition, + [(and_, 2, opAssoc.LEFT, AndExpr), + (or_, 2, opAssoc.LEFT, OrExpr)]) | condition + + scoreitem = booleancondition + mapper + Optional(Literal('-')) + integer + scoreitem.setParseAction(ScoreExpr) + scorelist = max_ + l_par + delimitedList(scoreitem) + r_par |\ + delimitedList(scoreitem) + scorelist.setParseAction(ScoreList) + + scorecondition = Literal('SCORE FROM').suppress() +\ + l_par + delimitedList(scorelist) + r_par + + scorecondition.setParseAction(AsiScoreCond) + + statement = booleancondition | scorecondition + + return statement.parseString(rule) From d1114745b8fb689c5c4f64862a841f3db4b3ba55 Mon Sep 17 00:00:00 2001 From: Jeff Knaggs Date: Mon, 8 Jan 2018 11:08:37 -0800 Subject: [PATCH 3/4] rename references to ASI2 in HCV test cases --- pyvdrm/hcvr.py | 11 ++- pyvdrm/tests/test_asi2.py | 8 -- pyvdrm/tests/test_hcvr.py | 193 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 199 insertions(+), 13 deletions(-) create mode 100644 pyvdrm/tests/test_hcvr.py diff --git a/pyvdrm/hcvr.py b/pyvdrm/hcvr.py index 99890cf..22acdef 100644 --- a/pyvdrm/hcvr.py +++ b/pyvdrm/hcvr.py @@ -1,5 +1,5 @@ """ -ASI2 Parser definition +HCV Drug Resistance Rule Parser definition """ from functools import reduce, total_ordering @@ -42,6 +42,7 @@ class Score(object): residues = set([]) score = None + flags = [] # allow a score expression to raise a user defined string def __init__(self, score, residues): """ Initialize. @@ -132,7 +133,7 @@ def __call__(self, mutations): class EqualityExpr(AsiExpr): - """ASI2 inequality expressions""" + """ASI2 style inequality expressions""" def __init__(self, label, pos, children): super().__init__(label, pos, children) @@ -240,8 +241,8 @@ def __call__(self, env): return None -class ASI2(DRMParser): - """ASI2 Syntax definition""" +class HCVR(DRMParser): + """HCV Resistance Syntax definition""" def parser(self, rule): @@ -274,7 +275,7 @@ def parser(self, rule): # integer + l_par + not_ + Regex(r'[A-Z]+') + r_par # roll this next rule into the mutation object - # Syntax of ASI expressions + # Syntax of expressions excludestatement = except_ + residue quantifier = exactly | atleast | notmorethan diff --git a/pyvdrm/tests/test_asi2.py b/pyvdrm/tests/test_asi2.py index 459c766..7b19741 100644 --- a/pyvdrm/tests/test_asi2.py +++ b/pyvdrm/tests/test_asi2.py @@ -81,14 +81,6 @@ def test_bool_and(self): self.assertEqual(rule(VariantCalls("7Y 1G 2T")), True) self.assertEqual(rule([]), False) - def test_bool_constants(self): - rule = ASI2("TRUE OR 1G") - self.assertEqual(rule(VariantCalls("2G")), True) - rule = ASI2("FALSE AND 1G") - self.assertEqual(rule(VariantCalls("1G")), False) - rule = ASI2("TRUE OR (FALSE AND TRUE)") - self.assertEqual(rule(VariantCalls("1G")), True) - def test_bool_or(self): rule = ASI2("1G OR (2T OR 7Y)") self.assertTrue(rule(VariantCalls("2T"))) diff --git a/pyvdrm/tests/test_hcvr.py b/pyvdrm/tests/test_hcvr.py new file mode 100644 index 0000000..dbbb445 --- /dev/null +++ b/pyvdrm/tests/test_hcvr.py @@ -0,0 +1,193 @@ +import unittest +from pyvdrm.hcvr import HCVR, AsiMutations, Score +from pyvdrm.vcf import Mutation, MutationSet, VariantCalls + + +# noinspection SqlNoDataSourceInspection,SqlDialectInspection +class TestRuleParser(unittest.TestCase): + + def test_stanford_ex1(self): + HCVR("151M OR 69i") + + def test_stanford_ex2(self): + rule = HCVR("SELECT ATLEAST 2 FROM (41L, 67N, 70R, 210W, 215F, 219Q)") + m1 = MutationSet('41L') + m2 = MutationSet('67N') + m3 = MutationSet('70N') + self.assertTrue(rule([m1, m2])) + self.assertFalse(rule([m1, m3])) + + def test_stanford_ex3(self): + HCVR("SELECT ATLEAST 2 AND NOTMORETHAN 2 FROM (41L, 67N, 70R, 210W, 215FY, 219QE)") + + def test_stanford_ex4(self): + HCVR("215FY AND NOT 184VI") + + def test_stanford_rest(self): + examples = ["SCORE FROM (65R => 20, 74V => 20, 184VI => 20)", + "151M AND EXCLUDE 69i", + # "69(NOT TDN)", + "215F OR 215Y", + "SCORE FROM (101P => 40, 101E => 30, 101HN => 15, 101Q => 5 )", + "SCORE FROM ( MAX (101P => 40, 101E => 30, 101HN => 15, 101Q => 5 ))", + "(184V AND 115F) => 20" + "3N AND 9N", + "2N OR 9N AND 2N", + "3N AND (2N AND (4N OR 2N))"] + + for ex in examples: + x = HCVR(ex) + self.assertEqual(ex, x.rule) + + def test_asi2_compat(self): + q = "SCORE FROM ( 98G => 10, 100I => 40,\ + MAX (101P => 40, 101E => 30, 101HN => 15, 101Q => 5) )" + HCVR(q) + + +# noinspection SqlNoDataSourceInspection,SqlDialectInspection +class TestRuleSemantics(unittest.TestCase): + def test_score_from(self): + rule = HCVR("SCORE FROM ( 100G => 10, 101D => 20 )") + self.assertEqual(rule(VariantCalls("100G 102G")), 10) + + def test_score_negate(self): + rule = HCVR("SCORE FROM ( NOT 100G => 10, NOT 101SD => 20 )") + self.assertEqual(rule(VariantCalls("100G 102G")), 20) + self.assertEqual(rule(VariantCalls("100S 101S")), 10) + + def test_score_residues(self): + rule = HCVR("SCORE FROM ( 100G => 10, 101D => 20 )") + expected_residue = repr({Mutation('S100G')}) + + result = rule.dtree(VariantCalls("S100G R102G")) + + self.assertEqual(expected_residue, repr(result.residues)) + + def test_score_from_max(self): + rule = HCVR("SCORE FROM (MAX (100G => 10, 101D => 20, 102D => 30))") + self.assertEqual(rule(VariantCalls("100G 101D")), 20) + self.assertEqual(rule(VariantCalls("10G 11D")), False) + + def test_score_from_max_neg(self): + rule = HCVR("SCORE FROM (MAX (100G => -10, 101D => -20, 102D => 30))") + self.assertEqual(rule(VariantCalls("100G 101D")), -10) + self.assertEqual(rule(VariantCalls("10G 11D")), False) + + def test_bool_and(self): + rule = HCVR("1G AND (2T AND 7Y)") + self.assertEqual(rule(VariantCalls("2T 7Y 1G")), True) + self.assertEqual(rule(VariantCalls("2T 3Y 1G")), False) + self.assertEqual(rule(VariantCalls("7Y 1G 2T")), True) + self.assertEqual(rule([]), False) + + def test_bool_constants(self): + rule = HCVR("TRUE OR 1G") + self.assertEqual(rule(VariantCalls("2G")), True) + rule = HCVR("FALSE AND 1G") + self.assertEqual(rule(VariantCalls("1G")), False) + rule = HCVR("TRUE OR (FALSE AND TRUE)") + self.assertEqual(rule(VariantCalls("1G")), True) + + def test_bool_or(self): + rule = HCVR("1G OR (2T OR 7Y)") + self.assertTrue(rule(VariantCalls("2T"))) + self.assertFalse(rule(VariantCalls("3T"))) + self.assertTrue(rule(VariantCalls("1G"))) + self.assertFalse(rule([])) + + def test_select_from_atleast(self): + rule = HCVR("SELECT ATLEAST 2 FROM (2T, 7Y, 3G)") + self.assertTrue(rule(VariantCalls("2T 7Y 1G"))) + self.assertFalse(rule(VariantCalls("2T 4Y 5G"))) + self.assertTrue(rule(VariantCalls("3G 9Y 2T"))) + + def test_score_from_exactly(self): + rule = HCVR("SELECT EXACTLY 1 FROM (2T, 7Y)") + score = rule(VariantCalls("2T 7Y 1G")) + self.assertEqual(0, score) + + +class TestActualRules(unittest.TestCase): + def test_hivdb_rules_parse(self): + for line in open("pyvdrm/tests/HIVDB.rules"): + r = HCVR(line) + self.assertEqual(line, r.rule) + + def test_chained_and(self): + rule = HCVR(""" + SCORE FROM(41L => 5, 62V => 5, MAX ( 65E => 10, 65N => + 30, 65R => 45 ), MAX ( 67E => 5, 67G => 5, 67H => 5, 67N => 5, 67S => + 5, 67T => 5, 67d => 30 ), 68d => 15, MAX ( 69G => 10, 69i => 60, 69d => + 15 ), MAX ( 70E => 15, 70G => 15, 70N => 15, 70Q => 15, 70R => 5, 70S + => 15, 70T => 15, 70d => 15 ), MAX ( 74I => 30, 74V => 30 ), 75I => 5, + 77L => 5, 115F => 60, 116Y => 10, MAX ( 151L => 30, 151M => 60 ), MAX( + 184I => 15, 184V => 15 ), 210W => 5, MAX ( 215A => 5, 215C => 5, 215D + => 5, 215E => 5, 215F => 10, 215I => 5, 215L => 5, 215N => 5, 215S => + 5, 215V => 5, 215Y => 10 ), MAX ( 219E => 5, 219N => 5, 219Q => 5, 219R + => 5 ), (40F AND 41L AND 210W AND 215FY) => 5, (41L AND 210W) => 10, + (41L AND 210W AND 215FY) => 5, (41L AND 44AD AND 210W AND 215FY) => 5, + (41L AND 67EGN AND 215FY) => 5, (67EGN AND 215FY AND 219ENQR) => 5, + (67EGN AND 70R AND 184IV AND 219ENQR) => 20, (67EGN AND 70R AND + 219ENQR) => 10, (70R AND 215FY) => 5, (74IV AND 184IV) => 15, (77L AND + 116Y AND 151M) => 10, MAX ((210W AND 215ACDEILNSV) => 5, (210W AND + 215FY) => 10), MAX ((41L AND 215ACDEILNSV) => 5, (41L AND 215FY) => + 15)) + """) + self.assertEqual(rule(VariantCalls("40F 41L 210W 215Y")), 65) + self.assertEqual(rule(VariantCalls("41L 210W 215F")), 60) + self.assertEqual(rule(VariantCalls("40F 210W 215Y")), 25) + self.assertEqual(rule(VariantCalls("40F 67G 215Y")), 15) + + +class TestAsiMutations(unittest.TestCase): + def test_init_args(self): + expected_mutation_set = MutationSet('Q80KR') + m = AsiMutations(args='Q80KR') + + self.assertEqual(expected_mutation_set, m.mutations) + self.assertEqual(expected_mutation_set.wildtype, m.mutations.wildtype) + + def test_init_none(self): + m = AsiMutations() + + self.assertIsNone(m.mutations) + + def test_repr(self): + expected_repr = "AsiMutations(args='Q80KR')" + m = AsiMutations(args='Q80KR') + + r = repr(m) + + self.assertEqual(expected_repr, r) + + def test_repr_none(self): + expected_repr = "AsiMutations()" + m = AsiMutations() + + r = repr(m) + + self.assertEqual(expected_repr, r) + + +class TestScore(unittest.TestCase): + def test_init(self): + expected_value = 10 + expected_mutations = {Mutation('A23R')} + + score = Score(expected_value, expected_mutations) + + self.assertEqual(expected_value, score.score) + self.assertEqual(expected_mutations, score.residues) + + def test_repr(self): + expected_repr = "Score(10, {Mutation('A23R')})" + score = Score(10, {Mutation('A23R')}) + + r = repr(score) + + self.assertEqual(expected_repr, r) + + +if __name__ == '__main__': + unittest.main() From 50021bce52db0c3ab41cbbb36a45b79cc9b36cb3 Mon Sep 17 00:00:00 2001 From: Jeff Knaggs Date: Mon, 8 Jan 2018 12:46:21 -0800 Subject: [PATCH 4/4] prototype of non-integer scores, raising flags for some mutations --- pyvdrm/hcvr.py | 45 +++++++++++++++++++++++++++++++-------- pyvdrm/tests/test_hcvr.py | 6 ++++++ 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/pyvdrm/hcvr.py b/pyvdrm/hcvr.py index 22acdef..50c0f63 100644 --- a/pyvdrm/hcvr.py +++ b/pyvdrm/hcvr.py @@ -4,10 +4,18 @@ from functools import reduce, total_ordering from pyparsing import (Literal, nums, Word, Forward, Optional, Regex, - infixNotation, delimitedList, opAssoc) + infixNotation, delimitedList, opAssoc, alphas) from pyvdrm.drm import AsiExpr, AsiBinaryExpr, AsiUnaryExpr, DRMParser from pyvdrm.vcf import MutationSet +def update_flags(fst, snd): + for k in snd: + if k in fst: + fst[k].append(snd[k]) + else: + fst[k] = snd[k] # this chould be achieved with a defaultdict + return fst + def maybe_foldl(func, noneable): """Safely fold a function over a potentially empty list of @@ -42,22 +50,28 @@ class Score(object): residues = set([]) score = None - flags = [] # allow a score expression to raise a user defined string + flags = {} # allow a score expression to raise a user defined string - def __init__(self, score, residues): + def __init__(self, score, residues, flags={}): """ Initialize. :param bool|float score: value of the score :param residues: sequence of Mutations + :param flags: dictionary of user defined strings and supporting Mutations """ self.score = score self.residues = set(residues) + self.flags = flags def __add__(self, other): - return Score(self.score + other.score, self.residues | other.residues) + flags = update_flags(self.flags, other.flags) + return Score(self.score + other.score, self.residues | other.residues, + flags) def __sub__(self, other): - return Score(self.score - other.score, self.residues | other.residues) + flags = update_flags(self.flags, other.flags) + return Score(self.score - other.score, self.residues | other.residues, + flags) def __repr__(self): return "Score({!r}, {!r})".format(self.score, self.residues) @@ -155,14 +169,23 @@ class ScoreExpr(AsiExpr): """Score expressions propagate DRM scores""" def __call__(self, mutations): - if len(self.children) == 3: + + flags = {} + if len(self.children) == 4: + operation, _, flag, _ = self.children + flags[flag] = [] + score = 0 # should be None + + elif len(self.children) == 3: operation, minus, score = self.children - if minus != '-': + if minus != '-': # this is parsing the expression twice, refactor raise ValueError score = -1 * int(score) + elif len(self.children) == 2: operation, score = self.children score = int(score) + else: raise ValueError @@ -173,7 +196,7 @@ def __call__(self, mutations): if result.score is False: return Score(0, []) - return Score(score, result.residues) + return Score(score, result.residues, flags=flags) class ScoreList(AsiExpr): @@ -262,6 +285,9 @@ def parser(self, rule): notmorethan = Literal('NOTMORETHAN') l_par = Literal('(').suppress() r_par = Literal(')').suppress() + + quote = Literal('"') + mapper = Literal('=>').suppress() integer = Word(nums) @@ -302,7 +328,8 @@ def parser(self, rule): [(and_, 2, opAssoc.LEFT, AndExpr), (or_, 2, opAssoc.LEFT, OrExpr)]) | condition - scoreitem = booleancondition + mapper + Optional(Literal('-')) + integer + score = Optional(Literal('-')) + integer | quote + Word(alphas) + quote + scoreitem = booleancondition + mapper + score scoreitem.setParseAction(ScoreExpr) scorelist = max_ + l_par + delimitedList(scoreitem) + r_par |\ delimitedList(scoreitem) diff --git a/pyvdrm/tests/test_hcvr.py b/pyvdrm/tests/test_hcvr.py index dbbb445..0a4956f 100644 --- a/pyvdrm/tests/test_hcvr.py +++ b/pyvdrm/tests/test_hcvr.py @@ -107,6 +107,12 @@ def test_score_from_exactly(self): score = rule(VariantCalls("2T 7Y 1G")) self.assertEqual(0, score) + def test_score_comment(self): + rule = HCVR("SCORE FROM (100G => 10, 200T => 3, 100S => \"comment\")") + self.assertEqual(rule(VariantCalls("100G")), 10) + result = rule.dtree(VariantCalls("100S 200T")) + self.assertEqual(result.score, 3) + self.assertTrue("comment" in result.flags) class TestActualRules(unittest.TestCase): def test_hivdb_rules_parse(self):