Source code for optunaz.objective

import json
import logging
import re
from dataclasses import dataclass
from typing import List, Optional, Union
import numpy as np
import warnings
from apischema import serialize, deserialize
from joblib import Memory, effective_n_jobs
from functools import partial

import sklearn.model_selection
from sklearn.metrics import make_scorer
from sklearn.exceptions import UndefinedMetricWarning

from optunaz.config.build_from_opt import (
    suggest_alg_params,
    suggest_aux_params,
    check_invalid_descriptor_param,
)
from optunaz.config.optconfig import OptimizationConfig, ModelMode
import optunaz.config.buildconfig as build
from optunaz.descriptors import (
    descriptor_from_config,
    AnyDescriptor,
    ScalingFittingError,
)
from optunaz.utils import remove_failed_idx
from optunaz.utils.enums import TrialParams
from optuna import TrialPruned
from optuna.trial import TrialState
from optunaz.metircs import auc_pr_cal, bedroc_score, concordance_index

warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
logging.getLogger("chemprop").disabled = True
logging.getLogger("train").disabled = True

logger = logging.getLogger(__name__)

sklearn.metrics._scorer._SCORERS.update(
    {
        "auc_pr_cal": make_scorer(auc_pr_cal, response_method="predict_proba"),
        "bedroc": make_scorer(bedroc_score, response_method="predict_proba"),
        "concordance_index": make_scorer(
            concordance_index, response_method="predict_proba"
        ),
    }
)

classification_scores = {
    "accuracy": "accuracy",
    "average_precision": "average_precision",
    "balanced_accuracy": "balanced_accuracy",
    "f1": "f1",
    "f1_macro": "f1_macro",
    "f1_micro": "f1_micro",
    "f1_weighted": "f1_weighted",
    "jaccard": "jaccard",
    "jaccard_macro": "jaccard_macro",
    "jaccard_micro": "jaccard_micro",
    "jaccard_weighted": "jaccard_weighted",
    "neg_brier_score": "neg_brier_score",
    "precision": "precision",
    "precision_macro": "precision_macro",
    "precision_micro": "precision_micro",
    "precision_weighted": "precision_weighted",
    "recall": "recall",
    "recall_macro": "recall_macro",
    "recall_micro": "recall_micro",
    "recall_weighted": "recall_weighted",
    "roc_auc": "roc_auc",
    "auc_pr_cal": make_scorer(auc_pr_cal, response_method="predict_proba"),
    "bedroc": make_scorer(bedroc_score, response_method="predict_proba"),
    "concordance_index": make_scorer(
        concordance_index, response_method="predict_proba"
    ),
}


regression_scores = (
    "explained_variance",
    "max_error",
    "neg_mean_absolute_error",
    "neg_mean_squared_error",
    "neg_median_absolute_error",
    "r2",
)


[docs]class NoValidDescriptors(Exception): """Raised when none of the supplied descriptors are compatible with any of the supplied algorithms""" pass
[docs]def null_scores(scoring): null_scoring = {k: [float("nan")] for k in scoring} return null_scoring
[docs]@dataclass class Objective: optconfig: OptimizationConfig train_smiles: List[str] train_y: np.ndarray train_aux: np.ndarray = None cache: Optional[Memory] = None def __call__(self, trial): # Set up the mode for reg or cls mode = self.optconfig.settings.mode score_for_objective = f"test_{self.optconfig.settings.scoring}" minimise_std_dev_objective = self.optconfig.settings.minimise_std_dev if mode == ModelMode.REGRESSION: scoring = regression_scores elif mode == ModelMode.CLASSIFICATION: scoring = classification_scores else: raise ValueError(f"Unrecognized mode: {mode}.") # Ensure train/test scores are set (NaN), since mlflow.py always expects them trial.set_user_attr(key="train_scores", value=null_scores(scoring)) trial.set_user_attr(key="test_scores", value=null_scores(scoring)) trial.set_user_attr(key="trial_ran", value=False) # Get algo & descriptor from Optuna, get valid descriptor combo self._validate_algos() build_alg = self._get_estimator(trial) try: estimator = build_alg.estimator() except (ValueError, FileNotFoundError) as e: raise TrialPruned(f"Estimator initiation failed for algorithm: {e}") try: descriptor, valid_descriptors, aux_weight_pc = self._get_descriptor( trial, build_alg ) if self.cache is not None: _descriptor_from_config = partial( descriptor_from_config, cache=self.cache ) cache_desc_from_conf = self.cache.cache(_descriptor_from_config) X, failed_idx = cache_desc_from_conf(self.train_smiles, descriptor) else: X, failed_idx = descriptor_from_config(self.train_smiles, descriptor) if len(X) == 0: raise ValueError except (ScalingFittingError, ValueError) as e: raise TrialPruned(f"Descriptor generation failed for descriptor: {e}") train_y, train_smiles, train_aux = remove_failed_idx( failed_idx, self.train_y, self.train_smiles, self.train_aux ) if train_aux is not None: X = np.hstack((X, train_aux)) if len(failed_idx) > 0: logger.warning( f"Descriptor [{descriptor}] for trial [{trial}] has {len(failed_idx)} \ erroneous smiles at indices {failed_idx}" ) if len(X) < self.optconfig.settings.cross_validation: raise TrialPruned( f"Issue with structures or descriptor config. Insufficient descriptors ({len(X)} generated for: " f"{descriptor.name}" ) # Check trial duplication, prune if this is detected. for t in trial.study.trials: if t.state == TrialState.COMPLETE and t.params == trial.params: # Set the pruned trial test/train scores to the duplicated trial trial.set_user_attr( key="train_scores", value=t.user_attrs["train_scores"] ) trial.set_user_attr( key="test_scores", value=t.user_attrs["test_scores"] ) if hasattr(t, "values"): print(f"Duplicated trial: {trial.params}, return {t.values}") else: print(f"Duplicated trial: {trial.params}, return {t.value}") # Raising `TrialPruned` instead of just 'return t.value' means that the # sampler is more likely to avoid evaluating identical parameters again. # See stackoverflow.com/questions/58820574 discussion wrt this issue raise TrialPruned("Duplicate parameter set") # CV is only attempted when the descriptor is compatible with the algo if type(descriptor) in valid_descriptors: # Auxiliary weight is applied here, if used, and if algorithm supports this if aux_weight_pc is not None: if hasattr(estimator, "aux_weight_pc"): estimator.aux_weight_pc = aux_weight_pc elif hasattr(estimator, "base_estimator") and hasattr( estimator.base_estimator, "aux_weight_pc" ): estimator.base_estimator.aux_weight_pc = aux_weight_pc cv = self.optconfig.settings.cv_split_strategy.get_sklearn_splitter( n_splits=self.optconfig.settings.cross_validation ) n_jobs = effective_n_jobs(self.optconfig.settings.n_jobs) # ensure ChemProp uses parallelisation within trial, not cross_validate if ( hasattr(estimator, "num_workers") or hasattr(estimator, "estimator") and hasattr(estimator.estimator, "num_workers") ): n_jobs = 1 try: scores = sklearn.model_selection.cross_validate( estimator=estimator, X=X, y=train_y, n_jobs=n_jobs, cv=cv, scoring=scoring, return_train_score=True, ) except (TypeError, ValueError) as e: raise TypeError( f"CV failed for alg {build_alg}, estimator {estimator}: {e}" ) # Add attributes to the trial to be accessed later. train_scores = {k: scores["train_" + k].tolist() for k in scoring} test_scores = {k: scores["test_" + k].tolist() for k in scoring} trial.set_user_attr(key="train_scores", value=train_scores) trial.set_user_attr(key="test_scores", value=test_scores) trial.set_user_attr(key="trial_ran", value=True) # Take mean test score for all CV folds and return it as objective. if minimise_std_dev_objective: return ( scores[score_for_objective].mean(), scores[score_for_objective].std(), ) else: return scores[score_for_objective].mean() # Otherwise, the descriptor is not compatible, and is handled here else: # Return the _worst_ possible score, since Optuna does not allow pruning 1st trials. # FYI: Returning NaN would result in 'ValueError: No trials are completed yet' due to # calling Optuna attribute 'study.best_trial' in build_from_opt.py. if len(trial.study.trials) == 1: if minimise_std_dev_objective: if trial.study.directions[0].name == "MAXIMIZE": return -np.inf, np.inf else: return np.inf, np.inf else: if trial.study.direction.name == "MAXIMIZE": return -np.inf else: return np.inf # Otherwise, this trial is not the 1st trial & Optuna allows pruning this trial. Pruning guides # the optimiser away from invalid subspaces (incompatible algo/descriptor pairs). # See stackoverflow.com/questions/70681612 for a discussion implementing this solution. else: raise TrialPruned("Incompatible subspace") def _validate_algos(self): """Ensures algorithms are compatible with the input data before starting objective""" # additional validation for prf possible_algs = [alg.name for alg in self.optconfig.algorithms] cp_regex = re.compile("ChemProp.*?Regressor") if ( "PRFClassifier" in possible_algs and not np.logical_and(self.train_y >= 0, self.train_y <= 1).all() ): raise ValueError( "PRFClassifier supplied but response column outside [0.0-1.0] acceptable range. " f"Response max: {self.train_y.max()}, response min: {self.train_y.min()} " ) elif any([re.match(cp_regex, alg) for alg in possible_algs]) and set( self.train_y ) == {0, 1}: raise ValueError( "ChemProp regressor supplied but response column appears classification." ) return def _get_estimator(self, trial) -> build.AnyAlgorithm: """Calculates an estimator (algorithm) for the trial.""" alg_choices = [alg.name for alg in self.optconfig.algorithms] alg_name = trial.suggest_categorical( TrialParams.ALGORITHM_NAME.value, alg_choices ) # Get alg from list by alg's hash. hash_choices = [ alg.hash for alg in self.optconfig.algorithms if alg.name == alg_name ] alg_hash = trial.suggest_categorical( f"{alg_name}_{TrialParams.ALGORITHM_HASH.value}", hash_choices ) trial.set_user_attr("alg_hash", alg_hash) alg = next(alg for alg in self.optconfig.algorithms if alg.hash == alg_hash) build_alg = suggest_alg_params(trial, alg) return build_alg def _get_descriptor(self, trial, algo) -> [AnyDescriptor, tuple, int | None]: """Calculates a descriptor (fingerprint) for the trial.""" valid_descriptors = check_invalid_descriptor_param(algo) # Check that there are possible choices first possible_choices = [ d for d in self.optconfig.descriptors if isinstance(d, Union[valid_descriptors]) ] # Raise value error so the user must provide some possible algo/descriptor combinations if len(possible_choices) == 0: raise NoValidDescriptors( "None of the supplied descriptors: " f"{[desc.name for desc in self.optconfig.descriptors]} " f"are compatible with the supplied algo: {algo.parameters}." ) # Convert descriptor config to `str` to store name+params in `trial`. descriptor_choices = [ json.dumps(serialize(d)) for d in self.optconfig.descriptors ] # Ideally we could suggest_categorical from possible_choices here, and negate the workarounds # above, but currently CategoricalDistribution suggestor has no 'dynamic value space' support. # See https://github.com/optuna/optuna/issues/2328 for discussion on the issue descriptor_str = trial.suggest_categorical( TrialParams.DESCRIPTOR.value, descriptor_choices ) # Get back the object. descriptor_dict = json.loads(descriptor_str) descriptor = deserialize( AnyDescriptor, descriptor_dict, additional_properties=True ) # Suggest aux params if supported by descriptor aux_params = suggest_aux_params(trial, descriptor) return descriptor, valid_descriptors, aux_params