Source code for optunaz.descriptors

import abc
import inspect
import logging
import os
import pathlib
import sys
from dataclasses import dataclass, field
from typing import List, Union, Type, Optional, Any, Tuple, Dict, Literal, Annotated
from functools import partial

import apischema
import json
import jsonpickle
import jsonpickle.ext.numpy as jsonpickle_numpy
import numpy as np
import pandas as pd
import sklearn
from apischema import deserializer, serialize, serializer, schema, identity, type_name
from apischema.conversions import Conversion
from apischema.metadata import skip
from rdkit import Chem, DataStructs
from rdkit.Chem.Scaffolds.MurckoScaffold import (
    GetScaffoldForMol,
    MakeScaffoldGeneric,
)
from rdkit.Avalon import pyAvalonTools
from rdkit.Chem import AllChem, Descriptors, MACCSkeys, rdFingerprintGenerator
from rdkit.DataStructs.cDataStructs import (
    ExplicitBitVect,
    SparseBitVect,
    UIntSparseIntVect,
)

from rdkit.ML.Descriptors.MoleculeDescriptors import MolecularDescriptorCalculator
from jazzy.api import molecular_vector_from_smiles
from jazzy.exception import JazzyError
from sklearn import preprocessing
from joblib import Parallel, delayed, effective_n_jobs
from optunaz.config import NameParameterDataclass
from optunaz.utils import load_df_from_file

jsonpickle_numpy.register_handlers()

logger = logging.getLogger(__name__)


RdkitFp = Union[  # Types that RDkit expects as input.
    ExplicitBitVect,  # relatively small (10K bits) or dense bit vectors.
    SparseBitVect,  # large, sparse bit vectors.
    UIntSparseIntVect,  # sparse vectors of integers.
]


[docs]class ScalingFittingError(Exception): """Raised when insufficient molecules for UnfittedSklearnSclaer to fit""" def __init__(self, descriptor_str=None): self.descriptor_str = descriptor_str pass
[docs]class NoValidSmiles(Exception): """Raised when no valid SMILES are available""" pass
[docs]def mol_from_smi(smi: str) -> Optional[Chem.Mol]: try: mol = Chem.MolFromSmiles(smi) except TypeError: return None if mol is None: logger.warning(f"Failed to parse SMILES {smi}.") return None err_code = Chem.SanitizeMol(mol, catchErrors=True) if err_code != 0: logger.warning(f"Failed to sanitize SMILES {smi}.") return None return mol
[docs]def numpy_from_rdkit(fp: RdkitFp, dtype: Type) -> np.ndarray: """Returns Numpy representation of a given RDKit Fingerprint.""" fp_numpy = np.zeros((0,), dtype=dtype) DataStructs.ConvertToNumpyArray(fp, fp_numpy) return fp_numpy
[docs]class MolDescriptor(NameParameterDataclass, abc.ABC): """Molecular Descriptors. Descriptors can be fingerprints, but can also be custom user-specified descriptors. Descriptors calculate a feature vector that will be used as input for predictive models (e.g. scikit-learn models). """ _union: Any = None # You can use __init_subclass__ to register new subclass automatically def __init_subclass__(cls, **kwargs): if inspect.isabstract(cls): return # Do not register abstract classes, like RDKitDescriptor. # Deserializers stack directly as a Union deserializer(Conversion(identity, source=cls, target=MolDescriptor)) # Only Base serializer must be registered (and updated for each subclass) as # a Union, and not be inherited MolDescriptor._union = ( cls if MolDescriptor._union is None else Union[MolDescriptor._union, cls] ) serializer( Conversion( identity, source=MolDescriptor, target=MolDescriptor._union, inherited=False, ) )
[docs] @abc.abstractmethod def calculate_from_smi(self, smi: str) -> np.ndarray: """Returns a descriptor (e.g. a fingerprint) for a given SMILES string. The descriptor is returned as a 1-d Numpy ndarray. """ pass
[docs] def parallel_compute_descriptor( self, smiles: List[str], n_cores=None, cache=None ) -> Tuple[Optional[np.ndarray], Optional[list]]: """Use python Parallel to compute descriptor (e.g. a fingerprint) for a given SMILES string. Can be used to generate descriptors in parallel and/or with a cache""" def try_cache(mol, cache=cache): try: return cache.cache(self.calculate_from_smi)(mol) except (FileNotFoundError, OSError): # handle when cleared cache not found return self.calculate_from_smi(mol) if cache is not None: # Scaled and Composite descriptors accept cache in order to sub-cache nested calculate_from_smi if "cache" in self.calculate_from_smi.__code__.co_varnames: _calculate_from_smi = partial(self.calculate_from_smi, cache=cache) # All other descriptors cache calculate_from_smi directly else: _calculate_from_smi = partial(try_cache, cache=cache) if n_cores is None: if hasattr(cache, "n_cores"): n_cores = effective_n_jobs(cache.n_cores) else: n_cores = effective_n_jobs(-1) else: _calculate_from_smi = self.calculate_from_smi if n_cores is not None: n_cores = effective_n_jobs(n_cores) else: n_cores = effective_n_jobs(-1) return Parallel(n_jobs=n_cores)( delayed(_calculate_from_smi)(smi) for smi in smiles )
[docs]class RdkitDescriptor(MolDescriptor, abc.ABC): """Abstract class for RDKit molecular descriptors (fingerprints)."""
[docs] @abc.abstractmethod def calculate_from_mol(self, mol: Chem.Mol) -> np.ndarray: """Returns a descriptor (fingerprint) for a given RDKit Mol as a 1-d Numpy array.""" pass
[docs] def calculate_from_smi(self, smi: str) -> Optional[np.ndarray]: """Returns a descriptor (fingerprint) for a given SMILES string. The descriptor is returned as a 1-d Numpy ndarray. Returns None if input SMILES string is not valid according to RDKit. """ mol = mol_from_smi(smi) if mol is None: return None else: return self.calculate_from_mol(mol)
[docs]@dataclass class AmorProtDescriptors(MolDescriptor): """AmorProtDescriptors These descriptors are intended to be used with Peptide SMILES """ try: from amorprot import AmorProt class _AmorProt(AmorProt): def __init__( self, maccs=True, ecfp4=True, ecfp6=True, rdkit=True, W=10, A=10, R=0.85, smi=None, ): self.AA_dict = {smi: smi} self.maccs = maccs self.ecfp4 = ecfp4 self.ecfp6 = ecfp6 self.rdkit = rdkit self.W = W self.A = A self.R = R if smi is None: return if self.maccs: self.maccs_dict = {} for aa in self.AA_dict.keys(): mol = Chem.MolFromSmiles(self.AA_dict[aa]) self.maccs_dict[aa] = np.array( MACCSkeys.GenMACCSKeys(mol) ).tolist() if self.ecfp4: self.ecfp4_dict = {} for aa in self.AA_dict.keys(): mol = Chem.MolFromSmiles(self.AA_dict[aa]) self.ecfp4_dict[aa] = np.array( AllChem.GetMorganFingerprintAsBitVect( mol, radius=2, nBits=1024 ) ).tolist() if self.ecfp6: self.ecfp6_dict = {} for aa in self.AA_dict.keys(): mol = Chem.MolFromSmiles(self.AA_dict[aa]) self.ecfp6_dict[aa] = np.array( AllChem.GetMorganFingerprintAsBitVect( mol, radius=3, nBits=1024 ) ).tolist() if self.rdkit: self.rdkit_dict = {} for aa in self.AA_dict.keys(): mol = Chem.MolFromSmiles(self.AA_dict[aa]) self.rdkit_dict[aa] = np.array( AllChem.RDKFingerprint(mol) ).tolist() @abc.abstractmethod def calculate_from_smi(self, smi: str) -> np.ndarray: self.__init__(smi=smi) return self.fingerprint([smi]) except ImportError: pass
[docs] @apischema.type_name("AmorProtDescParams") @dataclass class Parameters: pass
name: Literal["AmorProtDescriptors"] parameters: Parameters
[docs] def calculate_from_smi(self, smi: str): try: return self._AmorProt().calculate_from_smi(smi) except AttributeError: logging.critical( "The amorprot package must be installed to use AmorProtDescriptors" ) sys.exit(1)
[docs]@dataclass class Avalon(RdkitDescriptor): """Avalon Descriptor Avalon (see Gedeck P, et al. QSAR-how good is it in practice?) uses a fingerprint generator in a similar to way to Daylight fingerprints, but enumerates with custom feature classes of the molecular graph ( see ref. paper for the 16 feature classes used). Hash codes for the path-style features are computed implicitly during enumeration. Avalon generated the largest number of good models in the reference study, which is likely since the fingerprint generator was tuned toward the features contained in the data set. """
[docs] @apischema.type_name("AvalonParams") @dataclass class Parameters: nBits: Annotated[ int, schema( min=1, title="nBits", description="Number of bits in the fingerprint, sometimes also called size.", ), ] = field( default=2048, )
name: Literal["Avalon"] parameters: Parameters
[docs] def calculate_from_mol(self, mol: Chem.Mol): fp = pyAvalonTools.GetAvalonFP(mol, nBits=self.parameters.nBits) return numpy_from_rdkit(fp, dtype=bool)
[docs]@dataclass class ECFP(RdkitDescriptor): """ECFP Binary Extended Connectivity Fingerprint (ECFP).\n ECFP (see Rogers et al. "Extended-Connectivity Fingerprints.") [also known as Circular Fingerprints or Morgan Fingerprints], are built by applying the Morgan algorithm to a set of user-supplied atom invariants. This approach (implemented here using GetMorganFingerprintAsBitVect from RDKit) systematically records the neighborhood of each non-H atom into multiple circular layers up to a given radius (provided at runtime). The substructural features are mapped to integers using a hashing procedure (length of the hash provided at runtime). It is the set of the resulting identifiers that defines ECFPs. The diameter of the atom environments is appended to the name (e.g. ECFP4 corresponds to radius=2). """
[docs] @apischema.type_name("EcfpParams") @dataclass class Parameters: radius: Annotated[ int, schema( min=1, title="radius", description="Radius of the atom environments considered." " Note that the 4 in ECFP4" " corresponds to the diameter of the atom environments considered," " while here we use radius." " For example, radius=2 would correspond to ECFP4.", ), ] = field( default=3, ) nBits: Annotated[ int, schema( min=1, title="nBits", description="Number of bits in the fingerprint, sometimes also called size.", ), ] = field( default=2048, ) returnRdkit: bool = False
name: Literal["ECFP"] parameters: Parameters
[docs] def calculate_from_mol(self, mol: Chem.Mol): fp = AllChem.GetMorganFingerprintAsBitVect( mol, self.parameters.radius, self.parameters.nBits ) if fp is None: return None if self.parameters.returnRdkit: return fp else: return numpy_from_rdkit(fp, dtype=bool)
[docs]@dataclass class ECFP_counts(RdkitDescriptor): """ECFP With Counts Binary Extended Connectivity Fingerprint (ECFP) With Counts.\n ECFP (see Rogers et al. "Extended-Connectivity Fingerprints.") [also known as Circular Fingerprints or Morgan Fingerprints] With Counts are built similar to ECFP fingerprints, however this approach (implemented using GetHashedMorganFingerprint from RDKit) systematically records the count vectors rather than bit vectors. Bit vectors track whether features appear in a molecule while count vectors track the number of times each feature appears. The diameter of the atom environments is appended to the name (e.g. ECFP4 corresponds to radius=2). """
[docs] @apischema.type_name("EcfpCountsParams") @dataclass class Parameters: radius: Annotated[ int, schema( min=1, title="radius", description="Radius of the atom environments considered. For ECFP4 (diameter=4) set radius=2", ), ] = field(default=3) useFeatures: Annotated[ bool, schema( title="useFeatures", description="Use feature fingerprints (FCFP)," " instead of normal ones (ECFP)." " RDKit feature definitions are adapted from the definitions in" " Gobbi & Poppinger, Biotechnology and Bioengineering 61, 47-54 (1998)." " FCFP and ECFP will likely lead to different fingerprints/similarity scores.", ), ] = True nBits: Annotated[ int, schema( min=1, title="nBits", description="Number of bits in the fingerprint, sometimes also called size.", ), ] = field(default=2048)
name: Literal["ECFP_counts"] parameters: Parameters
[docs] def calculate_from_mol(self, mol: Chem.Mol): fp = AllChem.GetHashedMorganFingerprint( mol, radius=self.parameters.radius, nBits=self.parameters.nBits, useFeatures=self.parameters.useFeatures, ) return numpy_from_rdkit(fp, dtype=np.int8)
[docs]@dataclass class PathFP(RdkitDescriptor): """PathFP Path fingerprint based on RDKit FP Generator.\n This is a Path fingerprint. """
[docs] @apischema.type_name("PathFPParams") @dataclass class Parameters: maxPath: Annotated[ int, schema( min=1, title="maxPath", description="Maximum path for the fingerprint", ), ] = field(default=3) fpSize: Annotated[ int, schema( min=1, title="fpSize", description="Number size of the fingerprint, sometimes also called bit size.", ), ] = field(default=2048)
name: Literal["PathFP"] parameters: Parameters
[docs] def calculate_from_mol(self, mol: Chem.Mol): rdkit_gen = rdFingerprintGenerator.GetRDKitFPGenerator( maxPath=self.parameters.maxPath, fpSize=self.parameters.fpSize ) fp = rdkit_gen.GetFingerprint(mol) return numpy_from_rdkit(fp, dtype=np.int8)
[docs]@dataclass class MACCS_keys(RdkitDescriptor): """MACCS Molecular Access System (MACCS) fingerprint.\n MACCS fingerprints (often referred to as MDL keys after the developing company ) are calculated using keysets originally constructed and optimized for substructure searching (see Durant et al. Reoptimization of MDL keys for use in drug discovery) are 166-bit 2D structure fingerprints.\n Essentially, they are a binary fingerprint (zeros and ones) that answer 166 fragment related questions. If the explicitly defined fragment exists in the structure, the bit in that position is set to 1, and if not, it is set to 0. In that sense, the position of the bit matters because it is addressed to a specific question or a fragment. An atom can belong to multiple MACCS keys, and since each bit is binary, MACCS 166 keys can represent more than 9.3×1049 distinct fingerprint vectors. """
[docs] @apischema.type_name("MaccsParams") @dataclass class Parameters: pass
name: Literal["MACCS_keys"] parameters: Parameters = Parameters()
[docs] def calculate_from_mol(self, mol: Chem.Mol): fp = MACCSkeys.GenMACCSKeys(mol) return numpy_from_rdkit(fp, dtype=bool)
[docs]@dataclass class UnfittedSklearnScaler:
[docs] @dataclass class MolData: file_path: pathlib.Path = None smiles_column: str = None
mol_data: MolData = MolData() name: Literal["UnfittedSklearnScaler"] = "UnfittedSklearnScaler"
[docs] def get_fitted_scaler_for_fp(self, fp, cache=None): scaler = ( sklearn.preprocessing.StandardScaler() ) # Add choice of different scalers. df = load_df_from_file(self.mol_data.file_path, self.mol_data.smiles_column) try: smis = df["canonical"].str.replace(">>", ".") except KeyError: smis = df[self.mol_data.smiles_column].str.replace(">>", ".") fps, failed_idx = descriptor_from_config(smis, fp, cache=cache) if len(failed_idx) > 0: logger.warning( f"Could not compute descriptors for {len(failed_idx)} SMILES," " ignoring them in Scaler." ) if len(fps) == 0: msg = f"{len(fps)} fingerprints too few to train scaler." logger.warning(msg) raise ScalingFittingError(msg) scaler.fit(fps) jsonpickle_numpy.register_handlers() # An extra time. saved_params = jsonpickle.dumps(scaler) return FittedSklearnScaler(saved_params=saved_params)
[docs]@dataclass class FittedSklearnScaler: saved_params: str name: Literal["FittedSklearnScaler"] = "FittedSklearnScaler"
[docs] def get_fitted_scaler(self): return jsonpickle.loads(self.saved_params)
[docs]@dataclass class UnscaledMAPC(RdkitDescriptor): """Unscaled MAPC descriptors These MAPC descriptors are unscaled and should be used with caution. MinHashed Atom-Pair Fingerprint Chiral (see Orsi et al. One chiral fingerprint to find them all) is the original version of the MinHashed Atom-Pair fingerprint of radius 2 (MAP4) which combined circular substructure fingerprints and atom-pair fingerprints into a unified framework. This combination allowed for improved substructure perception and performance in small molecule benchmarks while retaining information about bond distances for molecular size and shape perception. These fingerprints expand the functionality of MAP4 to include encoding of stereochemistry into the fingerprint. CIP descriptors of chiral atoms are encoded into the fingerprint at the highest radius. This allows MAPC to modulate the impact of stereochemistry on fingerprints, making it scale with increasing molecular size without disproportionally affecting structural fingerprints/similarity. """
[docs] @apischema.type_name("UnscaledMAPCParams") @dataclass class Parameters: maxRadius: Annotated[ int, schema( min=1, title="maxRadius", description="Maximum radius of the fingerprint.", ), ] = field( default=2, ) nPermutations: Annotated[ int, schema( min=1, title="nPermutations", description="Number of permutations to perform.", ), ] = field( default=2048, )
name: Literal["UnscaledMAPC"] parameters: Parameters
[docs] def calculate_from_mol(self, mol: Chem.Mol): try: from mapchiral.mapchiral import get_fingerprint except ImportError: logging.critical("mapchiral must be installed to use MAPC fingerprints") sys.exit(1) fp = get_fingerprint( mol, max_radius=self.parameters.maxRadius, n_permutations=self.parameters.nPermutations, ) return fp
[docs]@dataclass class UnscaledPhyschemDescriptors(RdkitDescriptor): """Base (unscaled) PhyschemDescriptors (RDKit) for PhyschemDescriptors These physchem descriptors are unscaled and should be used with caution. They are a set of 208 physchem/molecular properties that are calculated in RDKit and used as descriptor vectors for input molecules. Features include ClogP, MW, # of atoms, rings, rotatable bonds, fraction sp3 C, graph invariants (Kier indices etc), TPSA, Slogp descriptors, counts of some functional groups, VSA MOE-type descriptors, estimates of atomic charges etc. (See https://www.rdkit.org/docs/GettingStartedInPython.html#list-of-available-descriptors). Vectors whose components are molecular descriptors have been used as high-level feature representations for molecular machine learning. One advantage of molecular descriptor vectors is their interpretability, since the meaning of a physicochemical descriptor can be intuitively understood"""
[docs] @apischema.type_name("UnscaledPhyschemDescriptorsParams") @dataclass class Parameters: rdkit_names: Optional[List[str]] = None
name: Literal["UnscaledPhyschemDescriptors"] = "UnscaledPhyschemDescriptors" parameters: Parameters = Parameters() def _rdkit_descriptors(self): return MolecularDescriptorCalculator(self.parameters.rdkit_names) def __post_init__(self): # Get RDKit descriptor names. if self.parameters.rdkit_names is None: self.parameters.rdkit_names = [d[0] for d in Descriptors._descList]
[docs] def calculate_from_mol(self, mol: Chem.Mol): d = self._rdkit_descriptors().CalcDescriptors(mol) if np.isnan(d).any(): return None else: return np.array(d)
[docs]@dataclass class UnscaledJazzyDescriptors(MolDescriptor): """Base (unscaled) Jazzy descriptors These Jazzy descriptors are unscaled and should be used with caution. They offer a molecular vector describing the hydration free energies and hydrogen-bond acceptor and donor strengths. A publication describing the implementation, fitting, and validation of Jazzy can be found at doi.org/10.1038/s41598-023-30089-x. These descriptors use the "MMFF94" minimisation method. NB: this descriptor employs a threshold of <50 Hydrogen acceptors/donors and a Mw of <1000Da for compound inputs. """
[docs] @apischema.type_name("UnscaledJazzyDescriptorsParams") @dataclass class Parameters: jazzy_names: Optional[List[str]] = None jazzy_filters: Optional[Dict[str, Any]] = None
name: Literal["UnscaledJazzyDescriptors"] = "UnscaledJazzyDescriptors" parameters: Parameters = Parameters() def _exceeds_descriptor_threshold(self, smi): mol = Chem.MolFromSmiles(smi) if mol is None: return True return any( getattr(Descriptors, descriptor)(mol) > threshold for descriptor, threshold in self.parameters.jazzy_filters.items() ) def _jazzy_descriptors(self, smi): """Returns a Jazzy MMFF94 vector (fingerprint) for a given SMILES string.""" from jazzy.exception import JazzyError if self._exceeds_descriptor_threshold(smi): """Raise JazzyError if descriptor input is expected to be slow""" raise JazzyError else: """Return the MMFF94 vector""" return molecular_vector_from_smiles( smi, minimisation_method="MMFF94", embedding_max_iterations=int(os.environ.get("QPTUNA_JAZZY_MAX_ITERATIONS", 0))) def __post_init__(self): # Get Jazzy descriptor names. if self.parameters.jazzy_names is None: self.parameters.jazzy_names = sorted( list(molecular_vector_from_smiles("C", minimisation_method="MMFF94")) ) if self.parameters.jazzy_filters is None: self.parameters.jazzy_filters = { "NumHAcceptors": 25, "NumHDonors": 25, "MolWt": 1000, }
[docs] def calculate_from_smi(self, smi: str) -> np.ndarray | None: from jazzy.exception import JazzyError try: d = self._jazzy_descriptors(smi) d = [d[jazzy_name] for jazzy_name in self.parameters.jazzy_names] except (JazzyError, TypeError): return None if np.isnan(d).any(): return None else: return np.array(d)
[docs]@dataclass class UnscaledZScalesDescriptors(MolDescriptor): """Unscaled Z-Scales. Compute the Z-scales of a peptide SMILES. These Z-Scales descriptors are unscaled and should be used with caution. """
[docs] @apischema.type_name("UnscaledZScalesDescParams") @dataclass class Parameters: pass
name: Literal["UnscaledZScalesDescriptors"] = "UnscaledZScalesDescriptors" parameters: Parameters = Parameters()
[docs] def calculate_from_smi(self, smi: str) -> list[str | list[None]] | None: try: from chemistry_adapters import AminoAcidAdapter from peptides import Peptide except ImportError: logging.critical( "chemistry_adapters and peptides packages must be installed for Z-Scales descriptors" ) try: sequence = AminoAcidAdapter().convert_smiles_to_amino_acid_sequence(smi) fp = list(Peptide(sequence).z_scales()) except KeyError: return None # SMILES to aa sequence failed return fp
[docs]@dataclass class PrecomputedDescriptorFromFile(MolDescriptor): """Precomputed descriptors. Users can supply a CSV file of feature vectors to use as descriptors, with headers on the first line. Each row corresponds to a compound in the training set, followed by a column that may have comma-separated vectors describing that molecule. """
[docs] @apischema.type_name("PrecomputedDescriptorFromFileParams") @dataclass class Parameters: file: Annotated[ str, schema( title="file", description="Name of the CSV containing precomputed descriptors", ), ] = field(default=None) input_column: Annotated[ str, schema( title="Input Column", description="Name of input column with SMILES strings", ), ] = field(default=None) response_column: Annotated[ str, schema( title="Response column", description="Name of response column with the comma-separated vectors that the model will use as " "pre-computed descriptors", ), ] = field(default=None)
name: Literal["PrecomputedDescriptorFromFile"] parameters: Parameters def __post_init__(self): df = pd.read_csv(self.parameters.file, skipinitialspace=True) out_cols = [self.parameters.input_column, self.parameters.response_column] # Add canonicalised SMILES to end of file dataframe can_smiles = descriptor_from_config( df[self.parameters.input_column], CanonicalSmiles.new(), return_failed_idx=False, ) can_df = pd.DataFrame({self.parameters.input_column: can_smiles}) can_df[self.parameters.response_column] = df[self.parameters.response_column] df = pd.concat((df, can_df)).reset_index(drop=True) df.dropna(subset=out_cols, inplace=True) df.drop_duplicates(subset=out_cols, inplace=True) # Allow reaction SMILES compatability df.loc[:, self.parameters.input_column] = df[ self.parameters.input_column ].str.replace(">>", ".") self.df = df[[self.parameters.input_column, self.parameters.response_column]]
[docs] def calculate_from_smi(self, smi: str) -> np.ndarray: rows = self.df[self.df[self.parameters.input_column] == smi] if len(rows) < 1: logger.warning( f"Could not find descriptor for {smi} in file {self.parameters.file}." ) return None if len(rows) > 1: logger.warning( f"Multiple (conflicting) descriptors found for {smi}, taking the first one." ) descriptor_iloc = rows[self.parameters.response_column].iloc[0] try: return np.array([descriptor_iloc.astype(float)]) except (ValueError, AttributeError): fp = np.fromstring(descriptor_iloc, sep=",") if len(fp) == 0: return None else: return fp
[docs] def inference_parameters(self, file: str, input_column: str, response_column: str): """This function allows precomputed descriptors to be used for inference for a new file""" self.parameters.file = file self.parameters.input_column = input_column self.parameters.response_column = response_column self.__post_init__()
[docs]@dataclass class SmilesFromFile(MolDescriptor): """Smiles as descriptors (for ChemProp). ChemProp optimisation runs require either this or SmilesAndSideInfoFromFile descriptor to be selected. This setting allows the SMILES to pass through to the ChemProp package. """
[docs] @apischema.type_name("SmilesFromFileParams") @dataclass class Parameters: pass
name: Literal["SmilesFromFile"] parameters: Parameters = Parameters()
[docs] def calculate_from_smi(self, smi: str) -> list[str | list[None]] | None: # Handle RDkit errors here to avoid poor handling by ChemProp mol = mol_from_smi(smi) if mol is None: return None # None is used as placeholder to indicate no side information is used return [smi, [None]]
[docs]@dataclass class SmilesAndSideInfoFromFile(MolDescriptor): """SMILES & side information descriptors (for ChemProp). ChemProp optimisation requires either these or SmilesFromFile descriptors. This descriptor allows SMILES to pass through to ChemProp, _and_ for side information to be supplied as auxiliary tasks.\n Side information can take the form of any vector (continuous or binary) which describe input compounds. All tasks are learnt in a multi-task manner to improve main-task (task of intent) predictions. Side information can boost performance since their contribution to network loss can lead to improved learnt molecular representations.\n Optimal side information weighting (how much auxiliary tasks contribute to network loss) is also an (optional) learned parameter during optimisation.\n Similar to PrecomputedDescriptorFromFile, CSV inputs for this descriptor should contain a SMILES column of input molecules. All vectors in the remaining columns are used as user-derived side-information (i.e: be cautious to only upload a CSV with side information tasks in columns since _all_ are used)\n (see https://ruder.io/multi-task/index.html#auxiliarytasks for details). """
[docs] @apischema.type_name("SmilesAndSideInfoFromFileParams") @dataclass class Parameters:
[docs] @type_name("SmilesAndSideInfoFromFileAux_Weight_Pc") @dataclass class Aux_Weight_Pc: low: int = field(default=100, metadata=schema(min=0, max=100)) high: int = field(default=100, metadata=schema(min=0, max=100)) q: int = field(default=20, metadata=schema(min=1))
file: Annotated[ str, schema( title="file", description="Name of the CSV containing precomputed side-info descriptors", ), ] = field(default=None) input_column: Annotated[ str, schema( title="Input Column", description="Name of input column with SMILES strings", ), ] = field(default=None) aux_weight_pc: Annotated[ Aux_Weight_Pc, schema( title="Auxiliary weight percentage", description="How much (%) auxiliary tasks (side information) contribute (%)" "to the loss function optimised during training. The larger the number, " "the larger the weight of side information.", ), ] = Aux_Weight_Pc()
name: Literal["SmilesAndSideInfoFromFile"] parameters: Parameters def __post_init__(self): df = pd.read_csv(self.parameters.file, skipinitialspace=True) # Add canonicalised SMILES to end of file dataframe can_smiles = descriptor_from_config( df[self.parameters.input_column], CanonicalSmiles.new(), return_failed_idx=False, ) can_df = df.copy() can_df[self.parameters.input_column] = can_smiles df = pd.concat((df, can_df)).reset_index(drop=True) df.dropna(subset=self.parameters.input_column, inplace=True) df.drop_duplicates(inplace=True) # Allow reaction SMILES compatability df.loc[:, self.parameters.input_column] = df[ self.parameters.input_column ].str.replace(">>", ".") self.df = df
[docs] def calculate_from_smi(self, smi: str) -> list[str, Any] | None: # Handle RDkit errors here to avoid poor handling by ChemProp mol = mol_from_smi(smi) if mol is None: return None rows = self.df[self.df[self.parameters.input_column] == smi] if len(rows) < 1: logger.warning( f"Could not find descriptor for {smi} in file {self.parameters.file}." ) return None if len(rows) > 1: logger.warning( f"Multiple (conflicting) descriptors found for {smi}, taking the first one." ) descriptor = rows.drop(self.parameters.input_column, axis=1).values[:1] return [smi, descriptor]
SmilesBasedDescriptor = Union[SmilesFromFile, SmilesAndSideInfoFromFile] AnyUnscaledDescriptor = Union[ Avalon, ECFP, ECFP_counts, PathFP, AmorProtDescriptors, MACCS_keys, PrecomputedDescriptorFromFile, UnscaledMAPC, UnscaledPhyschemDescriptors, UnscaledJazzyDescriptors, UnscaledZScalesDescriptors, ]
[docs]@dataclass class ScaledDescriptor(MolDescriptor): """Scaled Descriptor. This descriptor is not a complete descriptor, but instead it wraps and scales another descriptor.\n Some algorithms require input to be within certain range, e.g. [-1..1]. Some descriptors have different ranges for different columns/features. This descriptor wraps another descriptor and provides scaled values. """
[docs] @dataclass class ScaledDescriptorParameters: descriptor: AnyUnscaledDescriptor scaler: Union[ FittedSklearnScaler, UnfittedSklearnScaler, ] = UnfittedSklearnScaler()
parameters: ScaledDescriptorParameters name: Literal["ScaledDescriptor"] = "ScaledDescriptor" _scaler: Any = field( default=None, init=False, metadata=skip ) # This is (scikit-learn) object with method transform().
[docs] def set_unfitted_scaler_data( self, file_path: str, smiles_column: str, cache=None ) -> None: if isinstance(self.parameters.scaler, UnfittedSklearnScaler): self.parameters.scaler.mol_data.file_path = file_path self.parameters.scaler.mol_data.smiles_column = smiles_column else: raise TypeError( f"Called 'set_unfitted_scaler_data'" f" for scaler of type {type(self.parameters.scaler)}." ) if os.path.exists(self.parameters.scaler.mol_data.file_path): self._ensure_scaler_is_fitted(cache=cache) else: logger.warning( f"Scaler data file is missing:" f" {self.parameters.scaler.mol_data.file_path}" )
def __post_init__(self): # We should avoid heavy computations in init/post_init, # but descriptors are serialized+deserialized # by optuna objective # already before the first use, # so we compute the scaler here # to make sure we always serialize fitted scaler. # Except when path is None, # and is provided during `OptimizationConfig.__post_init_()`. if ( isinstance(self.parameters.scaler, UnfittedSklearnScaler) and self.parameters.scaler.mol_data.file_path is not None ): self._ensure_scaler_is_fitted() def _ensure_scaler_is_fitted(self, cache=None): if isinstance(self.parameters.scaler, UnfittedSklearnScaler): self.parameters.scaler = self.parameters.scaler.get_fitted_scaler_for_fp( self.parameters.descriptor, cache=cache ) # Cache unpickled sklearn scaler. if self._scaler is None: self._scaler = self.parameters.scaler.get_fitted_scaler()
[docs] def calculate_from_smi(self, smi: str, cache=None) -> Optional[np.ndarray]: self._ensure_scaler_is_fitted() if cache is not None: cache_desc = cache.cache(self.parameters.descriptor.calculate_from_smi) desc = cache_desc(smi) else: desc = self.parameters.descriptor.calculate_from_smi(smi) # 1d array. if desc is None: return None # Scikit-learn scaler takes 2d array, one sample per row. Reshape. desc_2d = np.array(desc).reshape(1, -1) # single sample = single row. scaled = self._scaler.transform(desc_2d) scaled_1d = scaled[0, :] # return as 1d array. return scaled_1d
[docs]@dataclass class PhyschemDescriptors(ScaledDescriptor): """PhyschemDescriptors (scaled) calculated in RDKit A set of 208 physchem/molecular properties that are calculated in RDKit and used as descriptor vectors for input molecules. Features include ClogP, MW, # of atoms, rings, rotatable bonds, fraction sp3 C, graph invariants (Kier indices etc), TPSA, Slogp descriptors, counts of some functional groups, VSA MOE-type descriptors, estimates of atomic charges etc. (See https://www.rdkit.org/docs/GettingStartedInPython.html#list-of-available-descriptors). Vectors whose components are molecular descriptors have been used as high-level feature representations for molecular machine learning. One advantage of molecular descriptor vectors is their interpretability, since the meaning of a physicochemical descriptor can be intuitively understood """
[docs] @apischema.type_name("PhyschemDescParams") @dataclass class Parameters: rdkit_names: Optional[List[str]] = None scaler: Union[ FittedSklearnScaler, UnfittedSklearnScaler, ] = UnfittedSklearnScaler() descriptor: AnyUnscaledDescriptor = UnscaledPhyschemDescriptors
parameters: Parameters = Parameters() name: Literal["PhyschemDescriptors"] = "PhyschemDescriptors" def __post_init__(self): # Get RDKit descriptor names. if self.parameters.rdkit_names is None: self.parameters.rdkit_names = [d[0] for d in Descriptors._descList] self.parameters.descriptor = UnscaledPhyschemDescriptors.new( rdkit_names=self.parameters.rdkit_names ) if ( isinstance(self.parameters.scaler, UnfittedSklearnScaler) and self.parameters.scaler.mol_data.file_path is not None ): try: self._ensure_scaler_is_fitted() except ScalingFittingError: logger.warning("PhyschemDescriptors scaling failed")
[docs]@dataclass class JazzyDescriptors(ScaledDescriptor): """Scaled Jazzy descriptors Jazzy descriptors offer a molecular vector describing the hydration free energies and hydrogen-bond acceptor and donor strengths. A publication describing the implementation, fitting, and validation of Jazzy can be found at doi.org/10.1038/s41598-023-30089-x. These descriptors use the "MMFF94" minimisation method. NB: Jazzy employs a threshold of <50 Hydrogen acceptors/donors and Mw of <1000Da for input compounds. """
[docs] @apischema.type_name("JazzyDescParams") @dataclass class Parameters: jazzy_names: Optional[List[str]] = None jazzy_filters: Optional[Dict[str, Any]] = None scaler: Union[ FittedSklearnScaler, UnfittedSklearnScaler, ] = UnfittedSklearnScaler() descriptor: AnyUnscaledDescriptor = UnscaledJazzyDescriptors
name: Literal["JazzyDescriptors"] = "JazzyDescriptors" parameters: Parameters = Parameters() def __post_init__(self): # Get Jazzy descriptor names. if self.parameters.jazzy_names is None: self.parameters.jazzy_names = sorted( list(molecular_vector_from_smiles("C", minimisation_method="MMFF94")) ) if self.parameters.jazzy_filters is None: self.parameters.jazzy_filters = { "NumHAcceptors": 25, "NumHDonors": 25, "MolWt": 1200, } self.parameters.descriptor = UnscaledJazzyDescriptors.new( jazzy_names=self.parameters.jazzy_names, jazzy_filters=self.parameters.jazzy_filters, ) if ( isinstance(self.parameters.scaler, UnfittedSklearnScaler) and self.parameters.scaler.mol_data.file_path is not None ): try: self._ensure_scaler_is_fitted() except ScalingFittingError: logger.warning("JazzyDescriptors scaling failed")
[docs]@dataclass class MAPC(ScaledDescriptor): """Scaled MAPC descriptors MAPC (MinHashed Atom-Pair Fingerprint Chiral) (see Orsi et al. One chiral fingerprint to find them all) is the original version of the MinHashed Atom-Pair fingerprint of radius 2 (MAP4) which combined circular substructure fingerprints and atom-pair fingerprints into a unified framework. This combination allowed for improved substructure perception and performance in small molecule benchmarks while retaining information about bond distances for molecular size and shape perception. These fingerprints expand the functionality of MAP4 to include encoding of stereochemistry into the fingerprint. CIP descriptors of chiral atoms are encoded into the fingerprint at the highest radius. This allows MAPC to modulate the impact of stereochemistry on fingerprints, making it scale with increasing molecular size without disproportionally affecting structural fingerprints/similarity. """
[docs] @apischema.type_name("MAP4CParams") @dataclass class Parameters: maxRadius: int = 2 nPermutations: int = 2048 scaler: Union[ FittedSklearnScaler, UnfittedSklearnScaler, ] = UnfittedSklearnScaler() descriptor: AnyUnscaledDescriptor = UnscaledMAPC
name: Literal["MAPC"] = "MAPC" parameters: Parameters = Parameters() def __post_init__(self): self.parameters.descriptor = UnscaledMAPC.new( maxRadius=self.parameters.maxRadius, nPermutations=self.parameters.nPermutations, ) if ( isinstance(self.parameters.scaler, UnfittedSklearnScaler) and self.parameters.scaler.mol_data.file_path is not None ): try: self._ensure_scaler_is_fitted() except ScalingFittingError: logger.warning("MAPC scaling failed")
[docs]@dataclass class ZScalesDescriptors(ScaledDescriptor): """Scaled Z-Scales descriptors. Z-scales were proposed in Sandberg et al (1998) based on physicochemical properties of proteogenic and non-proteogenic amino acids, including NMR data and thin-layer chromatography (TLC) data. Refer to doi:10.1021/jm9700575 for the original publication. These descriptors capture 1. lipophilicity, 2. steric properties (steric bulk and polarizability), 3. electronic properties (polarity and charge), 4. electronegativity (heat of formation, electrophilicity and hardness) and 5. another electronegativity. This fingerprint is the computed average of Z-scales of all the amino acids in the peptide. """
[docs] @apischema.type_name("ZScalesDescParams") @dataclass class Parameters: scaler: Union[ FittedSklearnScaler, UnfittedSklearnScaler, ] = UnfittedSklearnScaler() descriptor: AnyUnscaledDescriptor = UnscaledZScalesDescriptors
name: Literal["ZScalesDescriptors"] = "ZScalesDescriptors" parameters: Parameters = Parameters() def __post_init__(self): self.parameters.descriptor = UnscaledZScalesDescriptors.new() if ( isinstance(self.parameters.scaler, UnfittedSklearnScaler) and self.parameters.scaler.mol_data.file_path is not None ): try: self._ensure_scaler_is_fitted() except ScalingFittingError: logger.warning("ZScales scaling failed")
CompositeCompatibleDescriptor = Union[ AnyUnscaledDescriptor, ScaledDescriptor, MAPC, PhyschemDescriptors, JazzyDescriptors, ZScalesDescriptors, ]
[docs]@dataclass class CompositeDescriptor(MolDescriptor): """Composite descriptor Concatenates multiple descriptors into one. Select multiple algorithms from the button below. Please note the ChemProp SMILES descriptors are not compatible with this function. """
[docs] @apischema.type_name("CompositeDescParams") @dataclass class Parameters: descriptors: List[CompositeCompatibleDescriptor]
parameters: Parameters name: Literal["CompositeDescriptor"] = "CompositeDescriptor"
[docs] def calculate_from_smi(self, smi: str, cache=None) -> Optional[np.ndarray]: if cache is not None: ds = [] for d in self.parameters.descriptors: try: # Composite descriptors comprising scaled descriptors pass through cache ds.append(d.calculate_from_smi(smi, cache=cache)) except TypeError: # All other descriptors use the cache directly and do not expect cache as parameter sub_calculate_from_smi = cache.cache(d.calculate_from_smi) ds.append(sub_calculate_from_smi(smi)) else: ds = [d.calculate_from_smi(smi) for d in self.parameters.descriptors] if any(d is None for d in ds): # If any of the descriptors is None, # we declare the whole composite descriptor to be None. return None else: concatenated = np.concatenate(ds) return concatenated
[docs] def fp_info(self): return { json.dumps(serialize(d)): len(d.calculate_from_smi("C")) for d in self.parameters.descriptors }
AnyChemPropIncompatible = Union[CompositeCompatibleDescriptor, CompositeDescriptor] AnyDescriptor = Union[AnyChemPropIncompatible, SmilesBasedDescriptor]
[docs]@dataclass class CanonicalSmiles(MolDescriptor): """Canonical Smiles for use in utility functions (not for user selection)."""
[docs] @apischema.type_name("CanonicalSmilesParams") @dataclass class Parameters: pass
name: Literal["CanonicalSmiles"] parameters: Parameters = Parameters()
[docs] def calculate_from_smi(self, smi: str) -> Any | None: mol = mol_from_smi(smi) if mol is None: return None else: return Chem.MolToSmiles(mol, isomericSmiles=False, canonical=True)
[docs]@dataclass class Scaffold(MolDescriptor): """Scaffold Smiles for use in utility functions (not for user selection)."""
[docs] @apischema.type_name("ScaffoldParams") @dataclass class Parameters: pass
name: Literal["Scaffold"] parameters: Parameters = Parameters()
[docs] def calculate_from_smi(self, smi: str) -> Any | None: mol = mol_from_smi(smi) if mol is None: return None else: return Chem.MolToSmiles(GetScaffoldForMol((mol)))
[docs]@dataclass class GenericScaffold(MolDescriptor): """Generic Scaffold Smiles for use in utility functions (not for user selection)."""
[docs] @apischema.type_name("GenericScaffoldParams") @dataclass class Parameters: pass
name: Literal["GenericScaffold"] parameters: Parameters = Parameters()
[docs] def calculate_from_smi(self, smi: str) -> Any | None: mol = mol_from_smi(smi) if mol is None: return None else: return Chem.MolToSmiles(MakeScaffoldGeneric(GetScaffoldForMol((mol))))
[docs]@dataclass class ValidDescriptor(MolDescriptor): """Validates Smiles for use in utility functions (not for user selection)."""
[docs] @apischema.type_name("ValidDescriptorParams") @dataclass class Parameters: pass
name: Literal["ValidDescriptor"] parameters: Parameters = Parameters()
[docs] def calculate_from_smi(self, smi: str) -> bool: """Returns a descriptor (fingerprint) for a given SMILES string. The descriptor is returned as a 1-d Numpy ndarray. Returns None if input SMILES string is not valid according to RDKit. """ mol = mol_from_smi(smi) if mol is None: return False else: return True
[docs]def descriptor_from_config( smiles: List[str], descriptor: AnyDescriptor, cache=None, return_failed_idx=True ) -> Tuple[Optional[np.ndarray], Optional[list]]: """Returns molecular descriptors (fingerprints) for a given set of SMILES and configuration. When return_failed_idx is True, this returns a 2d numpy array and valid indices for that descriptor When return_failed_idx is False, this returns the raw descriptor output (e.g. for canonical smiles etc) """ if smiles is None or len(smiles) < 1: raise NoValidSmiles( f"Descriptor {descriptor} cannot generate empty smiles: {smiles}" ) descriptors = descriptor.parallel_compute_descriptor(smiles, cache=cache) if return_failed_idx: list_of_arrays = [] failed_idx = [] for d_idx, d in enumerate(descriptors): if d is None: failed_idx.append(d_idx) elif isinstance(descriptor, SmilesBasedDescriptor): list_of_arrays.append(d) elif not pd.isnull(np.array(d)).any(): list_of_arrays.append(d) else: failed_idx.append(d_idx) if len(list_of_arrays) > 0: list_of_arrays = np.stack(list_of_arrays, axis=0) return list_of_arrays, failed_idx else: return descriptors