Source code for icolos.core.workflow_steps.calculation.cosmo

import os
import tempfile
from typing import Tuple, List
from copy import deepcopy

from pydantic import BaseModel

from icolos.utils.execute_external.turbomole import TurbomoleExecutor

from icolos.core.containers.compound import Conformer, Enumeration

from icolos.utils.enums.program_parameters import TurbomoleEnum
from icolos.utils.enums.program_parameters import CosmoOutputEnum
from icolos.utils.enums.compound_enums import ConformerContainerEnum
from icolos.core.workflow_steps.calculation.base import StepCalculationBase
from icolos.core.workflow_steps.step import _LE
from icolos.loggers.logger_utils import log_multiline_string
from icolos.utils.general.files_paths import attach_root_path

_EE = TurbomoleEnum()
_CTE = ConformerContainerEnum()
_COE = CosmoOutputEnum()


[docs]class StepCosmo(StepCalculationBase, BaseModel): """Step that executes Cosmo. Note, that the execution (especially in conjunction with a preceding turbomole step) is relatively complex. (1) Take the coord file from the additional data attached to the conformers, (2) run Cosmo, (3) extract the final XYZ snapshot with x2t, (4) translate it to a SDF file with obabel and (5) combined the new coordinates with the tags.""" def __init__(self, **data): super().__init__(**data) # initialize the executor and test availability self._initialize_backend(executor=TurbomoleExecutor) self._check_backend_availability() # initialize the executor and test availability # as they are linked, use a "TurbomoleExecutor" here self._initialize_backend(executor=TurbomoleExecutor) self._check_backend_availability() def _prepare_tmp_input_directories( self, enumeration: Enumeration ) -> Tuple[List[str], List[str], List[str], List[str]]: tmp_dirs = [] paths_input_cosmofile = [] paths_config_cosmotherm = [] paths_output_cosmotherm = [] for conformer in enumeration: # 1) generate all temporary paths tmp_dir = tempfile.mkdtemp() path_input_cosmofile = os.path.join(tmp_dir, _EE.TM_OUTPUT_COSMOFILE) path_config_cosmofile = os.path.join(tmp_dir, _EE.CT_COSMOTHERM_CONFIG_FILE) path_output_cosmotherm = os.path.join( tmp_dir, _EE.CT_COSMOTHERM_OUTPUT_FILE ) # 2) write-out the COSMO file # Note, that the generation of the COSMO files is part of the Turbomole execution. The reason is, that # the generation is complicated and uses a lot of input form the TM step, thus "cosmoprep" is # executed there. if _CTE.EXTRA_DATA_COSMOFILE not in conformer.get_extra_data().keys(): self._logger.log( f"In order to write out COSMO files, the content needs to be annotated as extra data in the conformers. Have you executed Turbomole before?", _LE.ERROR, ) raise ValueError("Could not find COSMO data to write out - abort.") with open(path_input_cosmofile, "w") as f: f.writelines(conformer.get_extra_data()[_CTE.EXTRA_DATA_COSMOFILE]) # 3) add paths tmp_dirs.append(tmp_dir) paths_input_cosmofile.append(path_input_cosmofile) paths_config_cosmotherm.append(path_config_cosmofile) paths_output_cosmotherm.append(path_output_cosmotherm) return ( tmp_dirs, paths_input_cosmofile, paths_config_cosmotherm, paths_output_cosmotherm, ) def _execute_run(self, config_path: str): result = self._backend_executor.execute( command=_EE.CT_COSMOTHERM, arguments=[config_path], check=True ) if _EE.CT_COSMOTHERM_FAIL_STRING in result.stderr: self._logger.log( f"Execution of {_EE.CT_COSMOTHERM} failed. Error message:", _LE.ERROR ) log_multiline_string( logger=self._logger_blank, level=_LE.ERROR, multi_line_string=result.stdout, ) def _write_config_file(self, config_path: str): # by default use the internal configuration, but if one has been specified, use this one # note, that the default name of the COSMO file is "mol.cosmo", so this should be used in any config file if _EE.CT_CONFIG not in self.settings.arguments.parameters.keys(): with open(attach_root_path(_EE.CT_CONFIG_DEFAULTPATH), "r") as f: config = f.readlines() self._logger.log( f"Loaded {_EE.CT_COSMOTHERM} configuration from default file {_EE.CT_CONFIG_DEFAULTPATH}.", _LE.DEBUG, ) else: config = self.settings.arguments.parameters[_EE.CT_CONFIG] with open(config_path, "w") as f: f.writelines([line.rstrip("\n") + "\n" for line in config]) def _get_line_by_pattern(self, lines: List[str], pattern: str) -> str: for line in lines: if pattern in line: return line def _get_values_from_line(self, line: str) -> List[str]: try: value_part = line.split(":")[1] return value_part.split() except Exception: return [] def _annotate_from_output_block( self, conformer: Conformer, block: List[str], annotation: dict ): for key in annotation.keys(): # get the line with the values line = self._get_line_by_pattern( lines=block, pattern=annotation[key][_COE.PATTERN] ) if line is None: continue # get the values and select the one that is to be added try: values = self._get_values_from_line(line=line) value = values[annotation[key][_COE.ELEMENT]] except IndexError: continue # add it as a tag to the conformer; we can replace part of the tag name with e.g. the solvent # names if we need to conformer.get_molecule().SetProp(key, value) def _get_solvents_from_header(self, header: List[str]): line_solvents = self._get_line_by_pattern( header, pattern=_COE.SOLVENT_BLOCK_HEADER_COMPOUNDS_PATTERN ) return self._get_values_from_line(line_solvents) def _get_current_solvent_from_header(self, header: List[str]): line_mol_fraction = self._get_line_by_pattern( header, pattern=_COE.SOLVENT_BLOCK_HEADER_MOLFRACTION_PATTERN ) solvent_index = self._get_values_from_line(line_mol_fraction).index( _COE.SOLVENT_BLOCK_CURRENT_FRACTION_VALUE ) return self._get_solvents_from_header(header)[solvent_index] def _parse_general_block(self, lines: List[str], conformer: Conformer): general_block = [] for index in range(len(lines)): if _COE.GENERAL_BLOCK_PATTERN_STRING in lines[index]: # skip the first lines after the header index += 2 while not lines[index] == "": general_block.append(lines[index]) index += 1 break self._annotate_from_output_block( conformer=conformer, block=general_block, annotation=_COE.GENERAL_BLOCK_ANNOTATIONS, ) def _load_solvent_blocks(self, lines: List[str]) -> List[dict]: solvent_blocks = [] index = 0 while index < len(lines): if _COE.SOLVENT_BLOCK_PATTERN_STRING in lines[index]: # we need to extract both the header (which solvent?) and the body (actual values) new_block = {"header": [], "body": []} # go back to start of block while ( index >= 0 and _COE.SOLVENT_BLOCK_START_PATTERN not in lines[index] ): index -= 1 # extract the header while _COE.SOLVENT_BLOCK_BODY_START_PATTERN not in lines[index]: new_block["header"].append(lines[index]) index += 1 # extract the body while index < len(lines) and not ( lines[index] == "" and lines[index + 1] == "" ): new_block["body"].append(lines[index]) index += 1 solvent_blocks.append(new_block) index += 1 return solvent_blocks def _annotate_solvent_blocks( self, solvent_blocks: List[dict], conformer: Conformer ): for block_dict in solvent_blocks: # get solvent and translate according to internal solvent abbreviation table try: current_solvent = self._get_current_solvent_from_header( block_dict["header"] ) if current_solvent in _COE.SOLVENT_TRANSLATE_SOLVENT.keys(): current_solvent = _COE.SOLVENT_TRANSLATE_SOLVENT[current_solvent] except ValueError: continue # overwrite the solvent name placeholder in and annotate template_annotations = deepcopy(_COE.SOLVENT_BLOCK_BODY_ANNOTATIONS) annotations = {} for key in template_annotations.keys(): new_key = key.replace(_COE.SOLVENT_REPLACEHOLDER, current_solvent) annotations[new_key] = template_annotations[key] # annotate self._annotate_from_output_block( conformer=conformer, block=block_dict["body"], annotation=annotations ) def _parse_output(self, path_output: str, conformer: Conformer): # there are two sets of blocks we need to parse: the "general" block, that is always present and, if specified, # free energies from solvents ("mixtures") # 1) load the file with open(path_output, "r") as f: lines = f.readlines() lines = [line.rstrip("\n") for line in lines] # 2) extract the general block: from the match of the pattern line until the second empty line occurs # e.g. "--- Compound 1 (mol) ---\n\nAtomic weights : 111111\n <etc> ...\n\n" self._parse_general_block(lines, conformer) # 3) extract the solvent blocks (if available) # search for the first occurrence of a Gibb's free energy and expand top until the pattern line is found and # to bottom until more than one empty line is hit; proceed until all blocks are processed solvent_blocks = self._load_solvent_blocks(lines) if len(solvent_blocks) > 0: self._annotate_solvent_blocks(solvent_blocks, conformer)
[docs] def execute(self): for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): if len(enumeration.get_conformers()) == 0: continue # generate copies of the conformers, as to not accidentally manipulate them inp_enum = deepcopy(enumeration) # prepare the temporary files and retrieve paths (TM config is charge-state dependent!) ( tmp_dirs, paths_input_cosmofile, paths_config_cosmotherm, paths_output_cosmotherm, ) = self._prepare_tmp_input_directories(enumeration=inp_enum) # execute individual conformers for ( tmp_dir, path_config_cosmotherm, conformer, path_output_cosmotherm, ) in zip( tmp_dirs, paths_config_cosmotherm, enumeration.get_conformers(), paths_output_cosmotherm, ): self._move_to_dir(tmp_dir) # set a necessary environment variable to avoid clashes os.environ[_EE.TM_TURBOTMPDIR] = tmp_dir # write configuration file self._write_config_file(config_path=path_config_cosmotherm) # all ready; start the execution self._execute_run(config_path=path_config_cosmotherm) # parse the results self._parse_output( path_output=path_output_cosmotherm, conformer=conformer ) # restore working directory and remove temporary files self._restore_working_dir() for tmp_dir in tmp_dirs: if os.path.isdir(tmp_dir): self._remove_temporary(tmp_dir) self._logger.log( f"Executed COSMO for {len(enumeration.get_conformers())} conformers for enumeration {enumeration.get_index_string()}.", _LE.INFO, )