From e96d724718f60190dcb4454daf3e050640525a0c Mon Sep 17 00:00:00 2001 From: nanayves Date: Mon, 4 Mar 2024 10:13:52 +0100 Subject: [PATCH] refactor: Cleaned Enzeptional Signed-off-by: nanayves --- examples/enzeptional/README.md | 97 ++++++++ src/gt4sd/frameworks/enzeptional/__init__.py | 3 +- src/gt4sd/frameworks/enzeptional/core.py | 118 ++++------ .../frameworks/enzeptional/processing.py | 207 ++++++++++-------- .../frameworks/enzeptional/tests/__init__.py | 2 +- .../frameworks/enzeptional/tests/test_core.py | 15 +- .../enzeptional/tests/test_processing.py | 2 +- 7 files changed, 271 insertions(+), 173 deletions(-) create mode 100644 examples/enzeptional/README.md diff --git a/examples/enzeptional/README.md b/examples/enzeptional/README.md new file mode 100644 index 000000000..ea561b270 --- /dev/null +++ b/examples/enzeptional/README.md @@ -0,0 +1,97 @@ +# Enzyme Optimization Experiment + +## Description +This script performs an optimization experiment for enzyme sequences using different mutation strategies. + +## Import modules +```python +import logging +import pandas as pd +from gt4sd.frameworks.enzeptional.processing import HFandTAPEModelUtility +from gt4sd.frameworks.enzeptional.core import SequenceMutator, EnzymeOptimizer +from gt4sd.configuration import sync_algorithm_with_s3 +from gt4sd.configuration import GT4SDConfiguration +configuration = GT4SDConfiguration.get_instance() +``` + +## Load datasets and scorers +```python +sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties") +``` +Feasibility scorer path +```python +scorer_path = f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/feasibility/model.pkl" +``` +## Set embedding model/tokenizer paths +```python +language_model_path = "facebook/esm2_t33_650M_UR50D" +tokenizer_path = "facebook/esm2_t33_650M_UR50D" +unmasking_model_path = "facebook/esm2_t33_650M_UR50D" +chem_model_path = "seyonec/ChemBERTa-zinc-base-v1" +chem_tokenizer_path = "seyonec/ChemBERTa-zinc-base-v1" +``` +## Load protein embedding model +```python +protein_model = HFandTAPEModelUtility( + embedding_model_path=language_model_path, tokenizer_path=tokenizer_path + ) +``` +## Create mutation config +```python +mutation_config = { + "type": "language-modeling", + "embedding_model_path": language_model_path, + "tokenizer_path": tokenizer_path, + "unmasking_model_path": unmasking_model_path, + } +``` +## Set key parameters +```python +intervals = [(5, 10), (20, 25)] +batch_size = 5 +top_k = 3 +substrate_smiles = "NC1=CC=C(N)C=C1" +product_smiles = "CNC1=CC=C(NC(=O)C2=CC=C(C=C2)C(C)=O)C=C1" + +sample_sequence = "MSKLLMIGTGPVAIDQFLTRYEASCQAYKDMHQDQQLSSQFNTNLFEGDKALVTKFLEINRTLS" +``` +## Load mutator +```python +mutator = SequenceMutator(sequence=sample_sequence, mutation_config=mutation_config) +``` +## Set Optimizer +```python +optimizer = EnzymeOptimizer( + sequence=sample_sequence, + protein_model=protein_model, + substrate_smiles=substrate_smiles, + product_smiles=product_smiles, + chem_model_path=chem_model_path, + chem_tokenizer_path=chem_tokenizer_path, + scorer_filepath=scorer_path, + mutator=mutator, + intervals=intervals, + batch_size=batch_size, + top_k=top_k, + selection_ratio=0.25, + perform_crossover=True, + crossover_type="single_point", + concat_order=["substrate", "sequence", "product"], +) +``` +## Define optmization parameters +```python +num_iterations = 3 +num_sequences = 5 +num_mutations = 5 +time_budget = 3600 +``` +## Optimize +```python +optimized_sequences, iteration_info = optimizer.optimize( + num_iterations=num_iterations, + num_sequences=num_sequences, + num_mutations=num_mutations, + time_budget=time_budget, +) +``` \ No newline at end of file diff --git a/src/gt4sd/frameworks/enzeptional/__init__.py b/src/gt4sd/frameworks/enzeptional/__init__.py index 1b8c74a48..4049555d0 100644 --- a/src/gt4sd/frameworks/enzeptional/__init__.py +++ b/src/gt4sd/frameworks/enzeptional/__init__.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2023 GT4SD team +# Copyright (c) 2024 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -27,3 +27,4 @@ """ from .core import EnzymeOptimizer # noqa: F401 +from .core import EnzymeOptimizer # noqa: F401 diff --git a/src/gt4sd/frameworks/enzeptional/core.py b/src/gt4sd/frameworks/enzeptional/core.py index 58231511c..e37423194 100644 --- a/src/gt4sd/frameworks/enzeptional/core.py +++ b/src/gt4sd/frameworks/enzeptional/core.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2023 GT4SD team +# Copyright (c) 2024 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -89,8 +89,7 @@ class MutationStrategy(ABC): def mutate( self, sequence: str, num_mutations: int, intervals: List[List[int]] ) -> List[str]: - """ - Abstract method for mutating a sequence. + """Abstract method for mutating a sequence. Args: sequence (str): The original sequence to be mutated. @@ -108,8 +107,7 @@ class LanguageModelMutationStrategy(MutationStrategy): """ def __init__(self, mutation_model): - """ - Initializes the mutation strategy with a given model. + """Initializes the mutation strategy with a given model. Args: mutation_model: The model to be used for mutation. @@ -118,8 +116,7 @@ def __init__(self, mutation_model): self.top_k = 2 def set_top_k(self, top_k: int): - """ - Sets the top k mutations to consider during mutation. + """Sets the top k mutations to consider during mutation. Args: top_k (int): The number of top mutations to consider. @@ -129,8 +126,7 @@ def set_top_k(self, top_k: int): def mutate( self, sequence: str, num_mutations: int, intervals: List[List[int]] ) -> List[str]: - """ - Mutates a sequence within specified intervals using the model. + """Mutates a sequence within specified intervals using the model. Args: sequence (str): The original sequence to be mutated. @@ -170,8 +166,7 @@ class TransitionMatrixMutationStrategy(MutationStrategy): """ def __init__(self, transition_matrix: str): - """ - Initializes the mutation strategy with a transition matrix. + """Initializes the mutation strategy with a transition matrix. Args: transition_matrix (str): Path to the CSV file containing @@ -184,8 +179,7 @@ def __init__(self, transition_matrix: str): self.top_k = 2 def set_top_k(self, top_k: int): - """ - Sets the top k mutations to consider during mutation. + """Sets the top k mutations to consider during mutation. Args: top_k (int): The number of top mutations to consider. @@ -196,8 +190,7 @@ def set_top_k(self, top_k: int): def mutate( self, sequence: str, num_mutations: int, intervals: List[List[int]] ) -> List[str]: - """ - Mutates a sequence based on the transition matrix within + """Mutates a sequence based on the transition matrix within specified intervals. Args: @@ -247,19 +240,18 @@ class MutationFactory: @staticmethod def get_mutation_strategy(mutation_config: Dict[str, Any]): - """ - Retrieves a mutation strategy based on the provided configuration. + """Retrieves a mutation strategy based on the provided configuration. Args: mutation_config (Dict[str, Any]): Configuration specifying the type of mutation strategy and its parameters. - Returns: - An instance of the specified mutation strategy. - Raises: - ValueError: If the mutation type is unsupported. KeyError: If required configuration parameters are missing. + ValueError: If the mutation type is unsupported. + + Returns: + _type_: An instance of the specified mutation strategy """ if mutation_config["type"] == "language-modeling": mutation_model = MutationModelManager.load_model( @@ -285,8 +277,7 @@ class SequenceMutator: """ def __init__(self, sequence: str, mutation_config: Dict[str, Any]): - """ - Initializes the mutator with a sequence and a mutation strategy. + """Initializes the mutator with a sequence and a mutation strategy. Args: sequence (str): The sequence to be mutated. @@ -298,8 +289,7 @@ def __init__(self, sequence: str, mutation_config: Dict[str, Any]): self.top_k = 2 def set_top_k(self, top_k: int): - """ - Sets the number of top mutations to consider in the mutation strategy. + """Sets the number of top mutations to consider in the mutation strategy. Args: top_k (int): The number of top mutations to consider. @@ -319,14 +309,13 @@ def get_mutations( current_population: List[str], already_evaluated_sequences: List[str], ) -> List[str]: - """ - Generates a set of mutated sequences. + """Generates a set of mutated sequences. Args: num_sequences (int): Number of mutated sequences to generate. number_of_mutations (int): Number of mutations to apply to each sequence. - intervals (List[List[int]]): Intervals within the sequence + intervals (List[Tuple[int]]): Intervals within the sequence where mutations are allowed. already_evaluated_sequences (List[str]): List of sequences that have already been evaluated. @@ -346,14 +335,6 @@ def get_mutations( new_mutations = self.mutation_strategy.mutate( temp_sequence, max_mutations, intervals ) - # filtered_mutations = [ - # element - # for element in new_mutations - # if element not in already_evaluated_sequences - # ] - # if not filtered_mutations: - # break - # mutated_sequences_set.extend(filtered_mutations) mutated_sequences_set.extend(new_mutations) if len(mutated_sequences_set) >= num_sequences: break @@ -387,34 +368,29 @@ def __init__( pad_intervals: bool = False, concat_order=["sequence", "substrate", "product"], ): - """ - Initializes the optimizer with models, sequences, and + """Initializes the optimizer with models, sequences, and optimization parameters. + Args: sequence (str): The initial protein sequence. - protein_model (HFandTAPEModelUtility): Model for - protein embeddings. - substrate_smiles (str): SMILES representation of - the substrate. - product_smiles (str): SMILES representation of the - product. + protein_model (HFandTAPEModelUtility): Model for protein embeddings. + substrate_smiles (str): SMILES representation of the substrate. + product_smiles (str): SMILES representation of the product. chem_model_path (str): Path to the chemical model. chem_tokenizer_path (str): Path to the chemical tokenizer. - scorer_filepath (str): File path to the scoring model. - mutator (SequenceMutator): The mutator for generating - sequence variants. - intervals (List[List[int]]): Intervals for mutation. - batch_size (int): The number of sequences to process in one batch. - top_k (int): Number of top mutations to consider. - selection_ratio (float): Ratio of sequences to select - after scoring. - perform_crossover (bool): Flag to perform crossover operation. - crossover_type (str): Type of crossover operation. - minimum_interval_length (int): Minimum length of - mutation intervals. - pad_intervals (bool): Flag to pad the intervals. - concat_order (list): Order of concatenating embeddings. + scorer_filepath (str): Path to the scoring model. + mutator (SequenceMutator): The mutator for generating sequence variants. + intervals (List[Tuple[int, int]]): Intervals for mutation. + batch_size (int, optional): The number of sequences to process in one batch. Defaults to 2. + seed (int, optional): Random seed. Defaults to 123. + top_k (int, optional): Number of top mutations to consider. Defaults to 2. + selection_ratio (float, optional): Ratio of sequences to select after scoring. Defaults to 0.5. + perform_crossover (bool, optional): Flag to perform crossover operation. Defaults to False. + crossover_type (str, optional): Type of crossover operation. Defaults to "uniform". + minimum_interval_length (int, optional): Minimum length of mutation intervals. Defaults to 8. + pad_intervals (bool, optional): Flag to pad the intervals. Defaults to False. + concat_order (list, optional): Order of concatenating embeddings. Defaults to ["sequence", "substrate", "product"]. """ self.sequence = sequence self.protein_model = protein_model @@ -428,23 +404,18 @@ def __init__( self.concat_order = concat_order self.minimum_interval_length = minimum_interval_length self.pad_intervals = pad_intervals - self.mutator.set_top_k(top_k) # Set top_k for the mutation model + self.mutator.set_top_k(top_k) self.concat_order = concat_order self.scorer = load(scorer_filepath) self.seed = seed - # Initialize chem_model for SMILES embeddings self.chem_model = HFandTAPEModelUtility(chem_model_path, chem_tokenizer_path) - - # Compute embeddings for substrate and product self.substrate_embedding = self.chem_model.embed([substrate_smiles])[0] self.product_embedding = self.chem_model.embed([product_smiles])[0] - # Initialize selection and crossover generators self.selection_generator = SelectionGenerator() self.crossover_generator = CrossoverGenerator() - # Process intervals if intervals is None: self.intervals = [(0, len(sequence))] else: @@ -463,8 +434,7 @@ def optimize( num_mutations: int, time_budget: Optional[int] = 360, ): - """ - Runs the optimization process over a specified number + """Runs the optimization process over a specified number of iterations. Args: @@ -474,7 +444,7 @@ def optimize( per iteration. num_mutations (int): Max number of mutations to apply. time_budget (Optional[int]): Time budget for - optimizer (in seconds). + optimizer (in seconds). Defaults to 360. Returns: A tuple containing the list of all sequences and @@ -498,9 +468,7 @@ def optimize( scored_sequences: List[Dict[str, Any]] = [scored_original_sequence] - # Mutation step if iteration == 0: - # initialize population current_population: List[str] = [self.sequence] if len(current_population) < num_sequences: while len(current_population) < num_sequences: @@ -525,7 +493,6 @@ def optimize( f"Number of sequences in current population: {len(current_population)}" ) - # Scoring step iteration_scored_sequences = [] for _ in range(0, len(current_population), self.batch_size): scored_sequences = self.score_sequences( @@ -537,7 +504,6 @@ def optimize( all_scored_sequences.extend(scored_sequences) iteration_scored_sequences.extend(scored_sequences) - # Selection step if self.selection_ratio < 1.0: samples_with_higher_score = [ @@ -551,7 +517,6 @@ def optimize( else: selected_sequences = iteration_scored_sequences - # Crossover step offspring_sequences = [] if self.perform_crossover and len(selected_sequences) > 1: for i in range(0, len(selected_sequences), 2): @@ -593,7 +558,6 @@ def optimize( random.shuffle(current_population) current_population = current_population[:num_sequences] - # Update best sequences and count higher scoring sequences higher_scoring_sequences = 0 for temp_seq in iteration_scored_sequences: if temp_seq["score"] > current_best_score: @@ -630,14 +594,13 @@ def optimize( return all_scored_sequences, iteration_info def score_sequence(self, sequence: str) -> Dict[str, Any]: - """ - Scores a single protein sequence. + """Scores a single protein sequence. Args: sequence (str): The protein sequence to score. Returns: - float: The score of the sequence. + Dict[str, Any]: The score of the sequence. """ sequence_embedding = self.protein_model.embed([sequence])[0] embeddings = [ @@ -655,8 +618,7 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]: return {"sequence": sequence, "score": score} def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: - """ - Scores a list of protein sequences. + """Scores a list of protein sequences. Args: sequences (List[str]): The list of protein sequences to score. diff --git a/src/gt4sd/frameworks/enzeptional/processing.py b/src/gt4sd/frameworks/enzeptional/processing.py index d646eff70..e742236b1 100644 --- a/src/gt4sd/frameworks/enzeptional/processing.py +++ b/src/gt4sd/frameworks/enzeptional/processing.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2023 GT4SD team +# Copyright (c) 2024 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -23,8 +23,10 @@ # from abc import ABC import torch +import torch import numpy as np from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union from tape.datasets import pad_sequences from tape.registry import registry from tape.tokenizers import TAPETokenizer @@ -38,6 +40,7 @@ import random import logging from itertools import product as iter_product +from gt4sd.frameworks.torch import get_device logging.basicConfig(level=logging.INFO) @@ -58,12 +61,23 @@ def __init__(self): """ self.cache = {} + def get(self, key): + """ + Retrieves a model from the cache using the given key. + """ + def __init__(self): + """ + Initializes the cache as an empty dictionary. + """ + self.cache = {} + def get(self, key): """ Retrieves a model from the cache using the given key. Args: key: The key used to store the model. + key: The key used to store the model. Returns: The model associated with the key, or None if not found. @@ -81,24 +95,7 @@ def add(self, key, model): self.cache[key] = model -model_cache = ModelCache() - - -def get_device(device: Optional[Union[torch.device, str]] = None) -> torch.device: - """ - Determines the appropriate torch device for computations. - - Args: - device (Optional[Union[torch.device, str]]): The desired device - 'cpu' or 'cuda:0'). If None, - automatically selects the device. - - Returns: - torch.device: The determined torch device for computations. - """ - return torch.device( - "cuda:0" if torch.cuda.is_available() and device != "cpu" else "cpu" - ) +ENZEPTIONAL_MODEL_CACHE = ModelCache() class StringEmbedding(ABC): @@ -112,22 +109,25 @@ class StringEmbedding(ABC): model: Any def embed(self, samples: List[str]) -> np.ndarray: - """ - Abstract method for embedding a list of string samples. + """Abstract method for embedding a list of string samples. Args: samples (List[str]): The list of strings to be embedded. + Raises: + NotImplementedError: If the method is not implemented in the subclass. + Returns: np.ndarray: The resulting embeddings as a NumPy array. - - Raises: - NotImplementedError: If the method is not implemented in - the subclass. """ raise NotImplementedError +class HFandTAPEModelUtility(StringEmbedding): + """ + Utility class for handling both Hugging Face and TAPE models for embedding + and unmasking tasks. + """ class HFandTAPEModelUtility(StringEmbedding): """ Utility class for handling both Hugging Face and TAPE models for embedding @@ -140,28 +140,29 @@ def __init__( tokenizer_path: str, unmasking_model_path: Optional[str] = None, is_tape_model: bool = False, + embedding_model_path: str, + tokenizer_path: str, + unmasking_model_path: Optional[str] = None, + is_tape_model: bool = False, device: Optional[Union[torch.device, str]] = None, cache_dir: Optional[str] = None, + cache_dir: Optional[str] = None, ) -> None: - """ - Initializes the utility with specified model and tokenizer paths. + """Initializes the utility with specified model and tokenizer paths. Args: - embedding_model_path (str): Path to the embedding - model. + embedding_model_path (str): Path to the embedding model. tokenizer_path (str): Path to the tokenizer. - unmasking_model_path (Optional[str]): Path to the - unmasking model, if applicable. - is_tape_model (bool): Flag to indicate if a TAPE - model is being used. - device (Optional[Union[torch.device, str]]): The - compute device to use ('cpu' or 'cuda:0'). + unmasking_model_path (Optional[str], optional): Path to the unmasking model, if applicable. Defaults to None. + is_tape_model (bool, optional): Flag to indicate if a TAPE model is being used. Defaults to False. + device (Optional[Union[torch.device, str]], optional): The compute device to use ('cpu' or 'cuda:0'). Defaults to None. + cache_dir (Optional[str], optional): Path to cache directory. Defaults to None. """ - self.device = get_device(device) + self.device = get_device() self.is_tape_model = is_tape_model embedding_cache_key = f"embedding_{embedding_model_path}" - self.embedding_model = model_cache.get(embedding_cache_key) + self.embedding_model = ENZEPTIONAL_MODEL_CACHE.get(embedding_cache_key) if not self.embedding_model: if is_tape_model: self.embedding_model = registry.get_task_model( @@ -188,11 +189,11 @@ def __init__( .eval() ) - model_cache.add(embedding_cache_key, self.embedding_model) + ENZEPTIONAL_MODEL_CACHE.add(embedding_cache_key, self.embedding_model) if unmasking_model_path is not None: unmasking_cache_key = f"unmasking_{unmasking_model_path}" - self.unmasking_model = model_cache.get(unmasking_cache_key) + self.unmasking_model = ENZEPTIONAL_MODEL_CACHE.get(unmasking_cache_key) if not self.unmasking_model: if cache_dir: self.unmasking_model = ( @@ -211,7 +212,7 @@ def __init__( .to(self.device) .eval() ) - model_cache.add(unmasking_cache_key, self.unmasking_model) + ENZEPTIONAL_MODEL_CACHE.add(unmasking_cache_key, self.unmasking_model) else: logger.error("No Unmasking model loaded. Check you model inputs") @@ -221,34 +222,32 @@ def __init__( self.tokenizer = self._load_tokenizer(tokenizer_path) def _load_tokenizer(self, tokenizer_path: str): - """ - Loads a tokenizer based on the given path, caching it for future use. + """Loads a tokenizer based on the given path, caching it for future use. Args: tokenizer_path (str): Path to the tokenizer. Returns: - The loaded tokenizer. + The loaded tokenizer """ tokenizer_cache_key = f"tokenizer_{tokenizer_path}" - tokenizer = model_cache.get(tokenizer_cache_key) + tokenizer = ENZEPTIONAL_MODEL_CACHE.get(tokenizer_cache_key) if not tokenizer: try: tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) except Exception: tokenizer = T5Tokenizer.from_pretrained(tokenizer_path) - model_cache.add(tokenizer_cache_key, tokenizer) + ENZEPTIONAL_MODEL_CACHE.add(tokenizer_cache_key, tokenizer) return tokenizer def embed(self, samples: List[str]) -> np.ndarray: - """ - Embeds a list of samples using either TAPE or Hugging Face models. + """Embeds a list of samples using either TAPE or Hugging Face models. Args: samples (List[str]): List of strings to be embedded. Returns: - np.ndarray: The resulting embeddings as a NumPy array. + np.ndarray: The resulting embeddings. """ if self.is_tape_model: return self._embed_tape(samples) @@ -256,14 +255,14 @@ def embed(self, samples: List[str]) -> np.ndarray: return self._embed_huggingface(samples) def _embed_tape(self, samples: List[str]) -> np.ndarray: - """ - Embeds samples using a TAPE model. + """mbeds samples using a TAPE model. Args: samples (List[str]): List of strings to be embedded. + samples (List[str]): List of strings to be embedded. Returns: - np.ndarray: The resulting embeddings as a NumPy array. + np.ndarray: The resulting embeddings. """ token_ids: Dict[str, Any] = {"ids": [], "mask": []} for sequence in samples: @@ -293,14 +292,14 @@ def _embed_tape(self, samples: List[str]) -> np.ndarray: ) def _embed_huggingface(self, samples: List[str]) -> np.ndarray: - """ - Embeds samples using a Hugging Face model. + """Embeds samples using a Hugging Face model. Args: samples (List[str]): List of strings to be embedded. + samples (List[str]): List of strings to be embedded. Returns: - np.ndarray: The resulting embeddings as a NumPy array. + np.ndarray: The resulting embeddings. """ inputs = self.tokenizer( samples, @@ -316,9 +315,25 @@ def _embed_huggingface(self, samples: List[str]) -> np.ndarray: sequence_lengths = inputs["attention_mask"].sum(1) + inputs = self.tokenizer( + samples, + add_special_tokens=True, + padding=True, + return_tensors="pt", + ) + inputs = {k: v.to(self.device) for k, v in inputs.items()} + + with torch.no_grad(): + outputs = self.embedding_model(**inputs) + sequence_embeddings = outputs[0].cpu().detach().numpy() + + sequence_lengths = inputs["attention_mask"].sum(1) + return np.array( [ sequence_embedding[:sequence_length].mean(0) + for sequence_embedding, sequence_length in zip( + sequence_embeddings, sequence_lengths for sequence_embedding, sequence_length in zip( sequence_embeddings, sequence_lengths ) @@ -326,15 +341,18 @@ def _embed_huggingface(self, samples: List[str]) -> np.ndarray: ) def unmask(self, sequence: str, top_k: int = 2) -> List[str]: - """ - Unmasks a given sequence using the model, retrieving top-k predictions. + """Unmasks a given sequence using the model, retrieving top-k predictions. Args: sequence (str): The sequence with masked tokens. - top_k (int): Number of top predictions to retrieve. + top_k (int, optional): Number of top predictions to retrieve. Defaults to 2. + + Raises: + NotImplementedError: If TAPE model is used. + KeyError: If the model used is not supported. Returns: - List[List[str]]: List of top-k predicted sequences. + List[str]: List of top-k predicted sequences. """ if self.is_tape_model: logger.error("Unmasking is not supported for TAPE models.") @@ -345,17 +363,28 @@ def unmask(self, sequence: str, top_k: int = 2) -> List[str]: except (KeyError, NotImplementedError) as e: logger.warning(f"{e} Standard unmasking failed ") raise KeyError("Check the unmasking model you want to use") + if self.is_tape_model: + logger.error("Unmasking is not supported for TAPE models.") + raise NotImplementedError("Unmasking is not supported for TAPE models.") + + try: + return self._unmask_with_model(sequence, top_k) + except (KeyError, NotImplementedError) as e: + logger.warning(f"{e} Standard unmasking failed ") + raise KeyError("Check the unmasking model you want to use") def _unmask_with_model(self, sequence: str, top_k: int) -> List[str]: - """ - Unmasks a sequence using the model, providing top-k predictions. + """Unmasks a sequence using the model, providing top-k predictions. Args: sequence (str): The sequence with masked tokens. top_k (int): Number of top predictions to retrieve. + Raises: + KeyError: If model used do not support unmasking. + Returns: - List[List[str]]: List of top-k predicted sequences. + List[str]: List of top-k predicted sequences. """ inputs = self.tokenizer( sequence, @@ -406,38 +435,45 @@ def _unmask_with_model(self, sequence: str, top_k: int) -> List[str]: def mutate_sequence_with_variant(sequence: str, variant: str) -> str: - """ - Applies a specified variant mutation to an amino acid sequence. + """Applies a specified variant mutation to an amino acid sequence. Args: sequence (str): The original amino acid sequence. variant (str): The variant to apply, formatted as a string. + sequence (str): The original amino acid sequence. + variant (str): The variant to apply, formatted as a string. Returns: str: The mutated amino acid sequence. + str: The mutated amino acid sequence. """ mutated_sequence = list(sequence) for variant_string in variant.split("/"): - index = ( - int(variant_string[1:-1]) - 1 - ) # Assuming 1-based indexing in the variant + index = int(variant_string[1:-1]) - 1 mutated_sequence[index] = variant_string[-1] return "".join(mutated_sequence) def sanitize_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]: - """ - Merges overlapping intervals into a single interval. + """Merges overlapping intervals into a single interval. Args: intervals (List[Tuple[int, int]]): A list of start and end points of intervals. + intervals (List[Tuple[int, int]]): A list of + start and end points of intervals. Returns: List[Tuple[int, int]]: A list of merged intervals. + List[Tuple[int, int]]: A list of merged intervals. """ intervals.sort() merged: List[Tuple[int, int]] = [] + for start, end in intervals: + if not merged or merged[-1][1] < start: + merged.append((start, end)) + intervals.sort() + merged: List[Tuple[int, int]] = [] for start, end in intervals: if not merged or merged[-1][1] < start: merged.append((start, end)) @@ -447,8 +483,7 @@ def sanitize_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int] def round_up(number: float) -> int: - """ - Rounds up a floating-point number to the nearest integer. + """Rounds up a floating-point number to the nearest integer. Args: number (float): The number to round up. @@ -462,8 +497,7 @@ def round_up(number: float) -> int: def sanitize_intervals_with_padding( intervals: List[Tuple[int, int]], pad_value: int, max_value: int ) -> List[Tuple[int, int]]: - """ - Pads and sanitizes intervals within a given range. + """Pads and sanitizes intervals within a given range. Args: intervals (List[Tuple[int, int]]): A list of intervals. @@ -477,8 +511,7 @@ def sanitize_intervals_with_padding( def pad_interval( interval: Tuple[int, int], pad: int, max_val: int ) -> Tuple[int, int]: - """ - Pads an individual interval within the constraints of a maximum value. + """Pads an individual interval within the constraints of a maximum value. Args: interval (Tuple[int, int]): The interval to pad. @@ -492,11 +525,9 @@ def pad_interval( interval_length = end - start padding_needed = max(0, pad - interval_length) // 2 - # Apply padding padded_start = max(0, start - padding_needed) padded_end = min(max_val, end + padding_needed) - # Adjust if padding goes beyond max_value if padded_end > max_val: padded_start = max(0, padded_start - (padded_end - max_val)) return padded_start, padded_end @@ -512,8 +543,7 @@ def reconstruct_sequence_with_mutation_range( mutated_sequence_range: str, intervals: List[Tuple[int, int]], ) -> str: - """ - Reconstructs a sequence by inserting a mutated sequence + """Reconstructs a sequence by inserting a mutated sequence range at specific intervals. Args: @@ -521,12 +551,19 @@ def reconstruct_sequence_with_mutation_range( mutated_sequence_range (str): The range of the sequence to be mutated. intervals (List[Tuple[int, int]]): The intervals where mutations are applied. + sequence (str): The original sequence. + mutated_sequence_range (str): The range of the sequence to be mutated. + intervals (List[Tuple[int, int]]): The intervals where + mutations are applied. Returns: str: The reconstructed sequence with mutations. + str: The reconstructed sequence with mutations. """ mutated_sequence = list(sequence) range_index = 0 + mutated_sequence = list(sequence) + range_index = 0 for start, end in intervals: size_fragment = end - start mutated_sequence[start:end] = list( @@ -546,8 +583,7 @@ def selection( pool_of_sequences: List[Dict[str, Any]], k: float = 0.8, ) -> List[Any]: - """ - Selects a subset of sequences from a pool based on their scores. + """Selects a subset of sequences from a pool based on their scores. Args: pool_of_sequences (List[Dict[str, Any]]): A list of @@ -570,19 +606,17 @@ class CrossoverGenerator: """ def __init__(self, threshold_probability: float = 0.5) -> None: - """ - Initializes the CrossoverGenerator with a specified + """Initializes the CrossoverGenerator with a specified threshold probability. Args: - threshold_probability (float): The probability + threshold_probability (float, optional): The probability threshold used in uniform crossover. Defaults to 0.5. """ self.threshold_probability = threshold_probability def sp_crossover(self, a_sequence: str, another_sequence: str) -> Tuple[str, str]: - """ - Performs a single point crossover between two sequences. + """Performs a single point crossover between two sequences. Args: a_sequence (str): The first sequence for crossover. @@ -601,8 +635,7 @@ def sp_crossover(self, a_sequence: str, another_sequence: str) -> Tuple[str, str def uniform_crossover( self, a_sequence: str, another_sequence: str ) -> Tuple[str, str]: - """ - Performs a uniform crossover between two sequences. + """Performs a uniform crossover between two sequences. Args: a_sequence (str): The first sequence for crossover. diff --git a/src/gt4sd/frameworks/enzeptional/tests/__init__.py b/src/gt4sd/frameworks/enzeptional/tests/__init__.py index 8e9365214..c1113d761 100644 --- a/src/gt4sd/frameworks/enzeptional/tests/__init__.py +++ b/src/gt4sd/frameworks/enzeptional/tests/__init__.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2023 GT4SD team +# Copyright (c) 2024 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal diff --git a/src/gt4sd/frameworks/enzeptional/tests/test_core.py b/src/gt4sd/frameworks/enzeptional/tests/test_core.py index 35254e2cf..2764b20a3 100644 --- a/src/gt4sd/frameworks/enzeptional/tests/test_core.py +++ b/src/gt4sd/frameworks/enzeptional/tests/test_core.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2023 GT4SD team +# Copyright (c) 2024 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -22,7 +22,6 @@ # SOFTWARE. # import warnings -import pkg_resources from gt4sd.frameworks.enzeptional.core import ( SequenceMutator, EnzymeOptimizer, @@ -30,11 +29,17 @@ from gt4sd.frameworks.enzeptional.processing import HFandTAPEModelUtility +from gt4sd.configuration import sync_algorithm_with_s3 +from gt4sd.configuration import GT4SDConfiguration + +configuration = GT4SDConfiguration.get_instance() + + warnings.simplefilter(action="ignore", category=FutureWarning) -scorer_filepath = pkg_resources.resource_filename( - "gt4sd", "frameworks/enzeptional/tests/scorer.pkl" -) +sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties") + +scorer_filepath = f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/feasibility/model.pkl" def test_optimize(): diff --git a/src/gt4sd/frameworks/enzeptional/tests/test_processing.py b/src/gt4sd/frameworks/enzeptional/tests/test_processing.py index cd4de2a92..efaf1c4df 100644 --- a/src/gt4sd/frameworks/enzeptional/tests/test_processing.py +++ b/src/gt4sd/frameworks/enzeptional/tests/test_processing.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2023 GT4SD team +# Copyright (c) 2024 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal