Source code for icolos.core.workflow_steps.confgen.xtb

import os
from tempfile import mkdtemp

from pydantic import BaseModel
from rdkit import Chem
from copy import deepcopy
from typing import List, Tuple
from icolos.utils.execute_external.xtb import XTBExecutor

from icolos.utils.general.molecules import get_charge_for_molecule

from icolos.core.containers.compound import Conformer

from icolos.utils.enums.program_parameters import XTBEnum, XTBOutputEnum
from icolos.core.workflow_steps.step import _LE, _CTE
from icolos.core.workflow_steps.confgen.base import StepConfgenBase
from icolos.utils.general.parallelization import SubtaskContainer, Parallelizer

_EE = XTBEnum()
_COE = XTBOutputEnum()


[docs]class StepXTB(StepConfgenBase, BaseModel): def __init__(self, **data): super().__init__(**data) # initialize the executor and test availability self._initialize_backend(executor=XTBExecutor) self._check_backend_availability() def _parse_XTB_result(self, tmp_dirs: List, conformers: List[Conformer]): # XTB will output a variety of files to "dir_path" results = [] for dir_path, conformer in zip(tmp_dirs, conformers): optimized_conformer_sdf = os.path.join(dir_path, _COE.XTBOPT_SDF) enum = conformer.get_enumeration_object() # as the energies are added as a tag, but we will use ours # note, that XTB is called to operate on one conformer at a time (which we will return here) mol_supplier = Chem.SDMolSupplier(optimized_conformer_sdf, removeHs=False) try: for mol in mol_supplier: mol.SetProp( _CTE.CONFORMER_ENERGY_TAG, mol.GetProp(_COE.TOTAL_ENERGY_TAG) ) mol.ClearProp(_COE.TOTAL_ENERGY_TAG) mol.SetProp( _CTE.FORMAL_CHARGE_TAG, str(get_charge_for_molecule(mol)) ) enum.add_conformer(Conformer(conformer=mol), auto_update=True) results.append(_COE.SUCCESS) except: self._logger.log( f"Failed to parse XTB results for conformer {conformer.get_index_string()}.", _LE.WARNING, ) results.append(_COE.FAILURE) return results def _prepare_batch(self, batch) -> Tuple: # first position is the input (SDF) file; the internal input at this stage is a list of molecules # -> write it to a temporary SDF file (undocumented input functionality) and add the path tmp_dirs = [] input_files = [] charges = [] conformers = [] for next_subtask_list in batch: tmp_dir = mkdtemp() tmp_dirs.append(tmp_dir) for ( subtask ) in ( next_subtask_list ): # enforced as one since xtb can't handle multiple files in one call conformer = subtask.data conformers.append(conformer) input_file = self._prepare_temp_input(tmp_dir, conformer.get_molecule()) charge = get_charge_for_molecule(conformer.get_molecule()) charges.append(charge) input_files.append(input_file) return tmp_dirs, input_files, charges, conformers def _prepare_arguments(self, settings: List) -> List: # add flags for flag in self.settings.arguments.flags: settings.append(flag) # add parameters parameters = deepcopy(self.settings.arguments.parameters) # flatten the dictionary into a list for command-line execution for key in parameters.keys(): settings.append(key) settings.append(parameters[key]) return settings def _run_subjob(self, tmp_dir: str, input_file: str, charge: int) -> None: work_dir = os.getcwd() os.chdir(tmp_dir) arguments = [input_file, _EE.XTB_P, charge] arguments = self._prepare_arguments( arguments ) # add additional parameters from config self._backend_executor.execute( command=_EE.XTB, arguments=arguments, check=False ) # for line in result.stdout.split("\n"): # self._logger_blank.log(line, _LE.DEBUG) # # print(line) os.chdir(work_dir) def _execute_xtb(self): xtb_parallelizer = Parallelizer(func=self._run_subjob) n = 1 tmp_dirs = None while self._subtask_container.done() is False: next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores()) tmp_dirs, input_files, charges, conformers = self._prepare_batch(next_batch) _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] self._logger.log(f"Executing xtb for batch {n}.", _LE.DEBUG) xtb_parallelizer.execute_parallel( tmp_dir=tmp_dirs, input_file=input_files, charge=charges, ) results = self._parse_XTB_result(tmp_dirs, conformers) for sublist, result in zip(next_batch, results): assert len(sublist) == 1 # TODO: this only works if max length sublist == 1, fine for now as that is all turbomole can handle for task in sublist: if result == _COE.SUCCESS: task.set_status_success() else: task.set_status_failed() n += 1 self._remove_temporary(tmp_dirs)
[docs] def execute(self): all_conformers = [] for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): if enumeration.get_conformers(): for conformer in enumeration.get_conformers(): all_conformers.append(conformer) enumeration.clear_conformers() self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(all_conformers) self._execute_xtb() self._logger.log( f"Completed execution of XTB for {len(all_conformers)} conformers.", _LE.INFO, )