Source code for icolos.core.workflow_steps.prediction.model_building

import json
import os
import numpy as np
import pandas as pd
from collections import OrderedDict
from copy import deepcopy
from typing import Tuple, List

from pydantic import BaseModel

from icolos.core.containers.compound import Conformer
from icolos.core.containers.generic import GenericData
from icolos.utils.enums.program_parameters import ModelBuilderEnum
from icolos.utils.enums.step_enums import StepModelBuilderEnum
from icolos.core.workflow_steps.io.base import StepIOBase
from icolos.core.workflow_steps.step import _LE, StepSettingsParameters
from icolos.utils.enums.write_out_enums import WriteOutEnum
from icolos.utils.execute_external.execute import Executor

_SMBE = StepModelBuilderEnum()
_SME = ModelBuilderEnum()
_WE = WriteOutEnum()


[docs]class StepModelBuilder(StepIOBase, BaseModel): def __init__(self, **data): super().__init__(**data) # initialize the executor self._initialize_backend(executor=Executor) def _generate_temporary_input_output_files( self, tmp_dir: str ) -> Tuple[str, str, str, str, str]: tmp_input_config_json = os.path.join(tmp_dir, _SMBE.TMP_INPUT_CONFIG) tmp_input_data_csv = os.path.join(tmp_dir, _SMBE.TMP_INPUT_DATA) tmp_output_best_model_pkl = os.path.join(tmp_dir, _SMBE.TMP_OUTPUT_BEST_MODEL) tmp_output_best_parameters_json = os.path.join( tmp_dir, _SMBE.TMP_OUTPUT_BEST_PARAMETERS ) tmp_output_production_pkl = os.path.join( tmp_dir, _SMBE.TMP_OUTPUT_PRODUCTION_MODEL ) return ( tmp_input_config_json, tmp_input_data_csv, tmp_output_best_model_pkl, tmp_output_best_parameters_json, tmp_output_production_pkl, ) def _update_data_block( self, conf: dict, tmp_input_data_csv: str, settings: StepSettingsParameters ) -> dict: # the user can specify additional things for the "data" block of the configuration # in the "additional" field; the input CSV file needs to be overwritten in every case, though specified_data_block = settings.additional.get(_SMBE.DATA, {}) for key in specified_data_block.keys(): conf[_SMBE.DATA][key] = specified_data_block[key] conf[_SMBE.DATA][_SMBE.DATA_TRAININGSET_FILE] = tmp_input_data_csv if _SMBE.DATA_TESTSET_FILE in conf[_SMBE.DATA].keys(): conf[_SMBE.DATA].pop(_SMBE.DATA_TESTSET_FILE, None) self._logger.log( f"Removed test set specification, not supported yet.", _LE.WARNING ) return conf def _write_OptunaAZ_configuration( self, tmp_input_config_json: str, tmp_input_data_csv: str, settings: StepSettingsParameters, ): config_path = settings.arguments.parameters[_SME.CONFIG] with open(config_path, "r") as file: optunaaz_conf = file.read().replace("\r", "").replace("\n", "") optunaaz_conf = json.loads(optunaaz_conf) optunaaz_conf = self._update_data_block( optunaaz_conf, tmp_input_data_csv, settings ) with open(tmp_input_config_json, "w") as file: json.dump(optunaaz_conf, fp=file, indent=4) self._logger.log( f"Wrote updated OptunaAZ configuration file to {tmp_input_config_json}.", _LE.DEBUG, ) def _write_input_csv( self, conformers: List[Conformer], tmp_input_data_csv: str, settings: StepSettingsParameters, ): def _get_tag(conformer: Conformer, tag: str) -> str: try: value = conformer.get_molecule().GetProp(tag).strip() except KeyError: value = np.nan return value smiles_column = settings.additional[_SMBE.DATA][_SMBE.DATA_INPUT_COLUMN] response_column = settings.additional[_SMBE.DATA][_SMBE.DATA_RESPONSE_COLUMN] # initialize the dictionary dict_result = OrderedDict() dict_result[_WE.RDKIT_NAME] = ["" for _ in range(len(conformers))] dict_result[smiles_column] = ["" for _ in range(len(conformers))] dict_result[response_column] = ["" for _ in range(len(conformers))] # populate the dictionary with the values for irow in range(len(conformers)): conf = conformers[irow] dict_result[_WE.RDKIT_NAME][irow] = conf.get_index_string() dict_result[smiles_column][irow] = _get_tag(conf, smiles_column) dict_result[response_column][irow] = _get_tag(conf, response_column) # do the writeout (after sanitation) df_result = pd.DataFrame.from_dict(dict_result) df_result.to_csv( path_or_buf=tmp_input_data_csv, sep=",", na_rep="", header=True, index=False, mode="w", quoting=None, ) def _get_arguments( self, tmp_input_config_json: str, tmp_output_best_model_pkl: str, tmp_output_best_parameters_json: str, tmp_output_production_pkl: str, ) -> List[str]: arguments = [ _SME.CONFIG, tmp_input_config_json, _SME.MERGED_MODEL_OUTPATH, tmp_output_production_pkl, _SME.BEST_MODEL_OUTPATH, tmp_output_best_model_pkl, _SME.BEST_BUILDCONFIG_OUTPATH, tmp_output_best_parameters_json, ] return arguments def _parse_output( self, tmp_input_config_json: str, tmp_input_data_csv: str, tmp_output_best_parameters_json: str, tmp_output_production_pkl: str, ): # loading the final model is crucial (and the end-artifact for this step) try: with open(tmp_output_production_pkl, "rb") as f: data = f.read() self.data.generic.add_file( GenericData( file_name=_SMBE.TMP_OUTPUT_PRODUCTION_MODEL, file_data=data ) ) except FileNotFoundError as e: self._logger.log( f"Could not load production model from path {tmp_output_production_pkl}.", _LE.ERROR, ) raise e # loading the JSON with the best hyper-parameter configuration try: with open(tmp_output_best_parameters_json, "r") as f: data = f.read().replace("\r", "").replace("\n", "") data = json.loads(data) self.data.generic.add_file( GenericData( file_name=_SMBE.TMP_OUTPUT_BEST_PARAMETERS, file_data=data ) ) except FileNotFoundError as e: self._logger.log( f"Could not load best hyper-parameter configuration from path {tmp_output_best_parameters_json}.", _LE.WARNING, ) # loading the input JSON for OptunaAZ try: with open(tmp_input_config_json, "r") as f: data = f.read() self.data.generic.add_file( GenericData(file_name=_SMBE.TMP_INPUT_CONFIG, file_data=data) ) except FileNotFoundError as e: self._logger.log( f"Could not load input CSV file from path {tmp_input_config_json}.", _LE.WARNING, ) # loading the input CSV try: with open(tmp_input_config_json, "r") as f: data = f.read() self.data.generic.add_file( GenericData(file_name=_SMBE.TMP_INPUT_DATA, file_data=data) ) except FileNotFoundError as e: self._logger.log( f"Could not load input CSV file from path {tmp_input_config_json}.", _LE.WARNING, )
[docs] def execute(self): # make a copy of the settings to avoid side-effects with the dictionaries settings = deepcopy(self.settings) # generate temporary files tmp_dir = self._move_to_temp_dir() ( tmp_input_config_json, tmp_input_data_csv, tmp_output_best_model_pkl, tmp_output_best_parameters_json, tmp_output_production_pkl, ) = self._generate_temporary_input_output_files(tmp_dir) # write OptunaAZ configuration to file self._write_OptunaAZ_configuration( tmp_input_config_json=tmp_input_config_json, tmp_input_data_csv=tmp_input_data_csv, settings=settings, ) # unroll all conformers all_conformers = [] for compound in self.get_compounds(): for enumeration in compound: all_conformers = all_conformers + enumeration.get_conformers() # write input CSV, derived from the conformers self._write_input_csv( conformers=all_conformers, tmp_input_data_csv=tmp_input_data_csv, settings=settings, ) # execute OptunaAZ self._backend_executor.execute( command=_SME.OPTBUILD_ENTRY_POINT, arguments=self._get_arguments( tmp_input_config_json=tmp_input_config_json, tmp_output_best_model_pkl=tmp_output_best_model_pkl, tmp_output_best_parameters_json=tmp_output_best_parameters_json, tmp_output_production_pkl=tmp_output_production_pkl, ), check=False, ) # parse the output self._parse_output( tmp_input_config_json=tmp_input_config_json, tmp_input_data_csv=tmp_input_data_csv, tmp_output_best_parameters_json=tmp_output_best_parameters_json, tmp_output_production_pkl=tmp_output_production_pkl, ) # clean-up self._restore_working_dir() self._remove_temporary(tmp_dir)