From 46efb89b91011695eba178fc67bdff13cbce952d Mon Sep 17 00:00:00 2001 From: Yves Gaetan Nana Teukam <56451020+yvesnana@users.noreply.github.com> Date: Thu, 25 Apr 2024 08:55:25 +0200 Subject: [PATCH] fix: enzyme optmization with Kcat fitness function (#240) * fix: fixed enzyme optmization with Kcat fitness function Signed-off-by: yvesnana * fixed random seed initialization Signed-off-by: yvesnana * feat: added xgboost as needed for enzeptional Signed-off-by: yvesnana * feat: added xgboost to requirement file Signed-off-by: yvesnana --------- Signed-off-by: yvesnana --- examples/enzeptional/example_enzeptional.py | 80 ++++++++++++++++++--- requirements.txt | 1 + setup.cfg | 4 ++ src/gt4sd/frameworks/enzeptional/core.py | 52 +++++++++----- 4 files changed, 110 insertions(+), 27 deletions(-) diff --git a/examples/enzeptional/example_enzeptional.py b/examples/enzeptional/example_enzeptional.py index 0033013cf..05efbfc5a 100644 --- a/examples/enzeptional/example_enzeptional.py +++ b/examples/enzeptional/example_enzeptional.py @@ -1,28 +1,58 @@ import logging import pandas as pd +from typing import Tuple, List, Optional from gt4sd.frameworks.enzeptional.processing import HFandTAPEModelUtility from gt4sd.frameworks.enzeptional.core import SequenceMutator, EnzymeOptimizer from gt4sd.configuration import GT4SDConfiguration, sync_algorithm_with_s3 -def initialize_environment(): - """Synchronize with GT4SD S3 storage and set up the environment.""" - # NOTE: For those interested in optimizing kcat values, it is important to adjust the scorer path to reflect this focus, thereby selecting the appropriate model for kcat optimization: f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/kcat/model.pkl". The specification of the scaler, located within the same directory as the `scorer.pkl`, is mandatory for accurate model performance. +def initialize_environment(model = "feasibility") -> Tuple[str, Optional[str]]: + """Synchronize with GT4SD S3 storage and set up the environment. + + Args: + model (str): Type of optimization ("feasibility" or "kcat"). + + Returns: + Tuple[str, Optional[str]]: The path to the scorer file and scaler file (if existing). + """ configuration = GT4SDConfiguration.get_instance() sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties") return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/feasibility/model.pkl" -def load_experiment_parameters(): +def load_experiment_parameters() -> Tuple[List, List, List, List]: """Load experiment parameters from a CSV file.""" df = pd.read_csv("data.csv").iloc[1] return df["substrates"], df["products"], df["sequences"], eval(df["intervals"]) def setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path + substrate_smiles: str, + product_smiles: str, + sample_sequence: str, + intervals: List[List[int]], + scorer_path: str, + scaler_path: str, + concat_order: List[str], + use_xgboost_scorer: bool ): - """Set up and return the optimizer with all necessary components configured.""" + """Set up and return the optimizer with all necessary components configured + + Args: + substrate_smiles (str): SMILES representation of + the substrate. + product_smiles (str): SMILES representation of the + product. + sample_sequence (str): The initial protein sequence. + intervals (List[List[int]]): Intervals for mutation. + scorer_path (str): File path to the scoring model. + scaler_path (str): Path to the scaller in case you are usinh the Kcat model. + concat_order (List[str]): Order of concatenating embeddings. + use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat. + + Returns: + Initialized optmizer + """ model_tokenizer_paths = "facebook/esm2_t33_650M_UR50D" chem_paths = "seyonec/ChemBERTa-zinc-base-v1" @@ -52,21 +82,32 @@ def setup_optimizer( "selection_ratio": 0.25, "perform_crossover": True, "crossover_type": "single_point", - "concat_order": ["substrate", "sequence", "product"], + "concat_order": concat_order, + "scaler_filepath": scaler_path, + "use_xgboost_scorer": use_xgboost_scorer } return EnzymeOptimizer(**optimizer_config) def optimize_sequences(optimizer): - """Optimize sequences using the configured optimizer.""" + """Optimize sequences using the configured optimizer. + + Args: + optimizer: Initialized optimizer + + Returns: + Optimized sequences + """ return optimizer.optimize( num_iterations=3, num_sequences=5, num_mutations=5, time_budget=3600 ) -def main(): +def main_kcat(): + """Optimization using Kcat model""" logging.basicConfig(level=logging.INFO) - scorer_path = initialize_environment() + scorer_path, scaler_path = initialize_environment(model="kcat") + concat_order, use_xgboost_scorer = ["substrate", "sequence"], True ( substrate_smiles, product_smiles, @@ -74,11 +115,28 @@ def main(): intervals, ) = load_experiment_parameters() optimizer = setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer ) optimized_sequences, iteration_info = optimize_sequences(optimizer) logging.info("Optimization completed.") +def main_feasibility(): + """Optimization using Feasibility model""" + logging.basicConfig(level=logging.INFO) + scorer_path, scaler_path = initialize_environment() + concat_order, use_xgboost_scorer = ["substrate", "sequence", "product"], False + ( + substrate_smiles, + product_smiles, + sample_sequence, + intervals, + ) = load_experiment_parameters() + optimizer = setup_optimizer( + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer + ) + optimized_sequences, iteration_info = optimize_sequences(optimizer) + logging.info("Optimization completed.") + if __name__ == "__main__": main() diff --git a/requirements.txt b/requirements.txt index 0e64fc293..ba1ba67b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,3 +45,4 @@ torchmetrics>=0.7.0,<1.0.0 transformers>=4.22.0,<=4.24.0 typing_extensions>=3.7.4.3 wheel>=0.26 +xgboost>=1.7.6 diff --git a/setup.cfg b/setup.cfg index 2aec22261..20ea4625b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -58,6 +58,7 @@ install_requires = transformers<4.26.0 typing_extensions wheel + xgboost setup_requires = setuptools package_dir = @@ -281,3 +282,6 @@ ignore_missing_imports = True [mypy-ruamel.*] ignore_missing_imports = True + +[mypy-xgboost.*] +ignore_missing_imports = True \ No newline at end of file diff --git a/src/gt4sd/frameworks/enzeptional/core.py b/src/gt4sd/frameworks/enzeptional/core.py index e37423194..f22fd1968 100644 --- a/src/gt4sd/frameworks/enzeptional/core.py +++ b/src/gt4sd/frameworks/enzeptional/core.py @@ -30,6 +30,7 @@ from itertools import product as iter_product import time from joblib import load +import xgboost as xgb from .processing import ( HFandTAPEModelUtility, SelectionGenerator, @@ -367,6 +368,8 @@ def __init__( minimum_interval_length: int = 8, pad_intervals: bool = False, concat_order=["sequence", "substrate", "product"], + scaler_filepath: Optional[str] = None, + use_xgboost_scorer: Optional[bool] = False, ): """Initializes the optimizer with models, sequences, and optimization parameters. @@ -379,18 +382,22 @@ def __init__( product_smiles (str): SMILES representation of the product. chem_model_path (str): Path to the chemical model. chem_tokenizer_path (str): Path to the chemical tokenizer. - scorer_filepath (str): Path to the scoring model. - mutator (SequenceMutator): The mutator for generating sequence variants. - intervals (List[Tuple[int, int]]): Intervals for mutation. - batch_size (int, optional): The number of sequences to process in one batch. Defaults to 2. - seed (int, optional): Random seed. Defaults to 123. - top_k (int, optional): Number of top mutations to consider. Defaults to 2. - selection_ratio (float, optional): Ratio of sequences to select after scoring. Defaults to 0.5. - perform_crossover (bool, optional): Flag to perform crossover operation. Defaults to False. - crossover_type (str, optional): Type of crossover operation. Defaults to "uniform". - minimum_interval_length (int, optional): Minimum length of mutation intervals. Defaults to 8. - pad_intervals (bool, optional): Flag to pad the intervals. Defaults to False. - concat_order (list, optional): Order of concatenating embeddings. Defaults to ["sequence", "substrate", "product"]. + scorer_filepath (str): File path to the scoring model. + mutator (SequenceMutator): The mutator for generating + sequence variants. + intervals (List[List[int]]): Intervals for mutation. + batch_size (int): The number of sequences to process in one batch. + top_k (int): Number of top mutations to consider. + selection_ratio (float): Ratio of sequences to select + after scoring. + perform_crossover (bool): Flag to perform crossover operation. + crossover_type (str): Type of crossover operation. + minimum_interval_length (int): Minimum length of + mutation intervals. + pad_intervals (bool): Flag to pad the intervals. + concat_order (list): Order of concatenating embeddings. + scaler_filepath (str): Path to the scaller in case you are usinh the Kcat model. + use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat. """ self.sequence = sequence self.protein_model = protein_model @@ -407,7 +414,9 @@ def __init__( self.mutator.set_top_k(top_k) self.concat_order = concat_order self.scorer = load(scorer_filepath) - self.seed = seed + if scaler_filepath is not None: + self.scaler = load(scaler_filepath) + self.use_xgboost_scorer = use_xgboost_scorer self.chem_model = HFandTAPEModelUtility(chem_model_path, chem_tokenizer_path) self.substrate_embedding = self.chem_model.embed([substrate_smiles])[0] @@ -424,7 +433,7 @@ def __init__( self.intervals = sanitize_intervals_with_padding( self.intervals, minimum_interval_length, len(sequence) ) - + self.seed = seed random.seed(self.seed) def optimize( @@ -614,7 +623,13 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]: combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - score = self.scorer.predict_proba(combined_embedding)[0][1] + if self.use_xgboost_scorer: + if self.scaler is not None: + combined_embedding = self.scaler.transform(combined_embedding) + score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] + else: + score = self.scorer.predict_proba(combined_embedding)[0][1] + return {"sequence": sequence, "score": score} def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: @@ -643,7 +658,12 @@ def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - score = self.scorer.predict_proba(combined_embedding)[0][1] + if self.use_xgboost_scorer: + if self.scaler is not None: + combined_embedding = self.scaler.transform(combined_embedding) + score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] + else: + score = self.scorer.predict_proba(combined_embedding)[0][1] output.append({"sequence": sequences[position], "score": score}) return output