import json
import os
import re
from copy import deepcopy
from tempfile import mkdtemp
from typing import List, Tuple
from pydantic import BaseModel

from icolos.core.containers.compound import Conformer
from icolos.core.workflow_steps.calculation.base import StepCalculationBase
from icolos.utils.enums.program_parameters import JazzyEnum
from icolos.utils.enums.step_enums import StepJazzyEnum
from icolos.core.workflow_steps.step import _LE
from icolos.utils.execute_external.jazzy import JazzyExecutor
from icolos.utils.general.files_paths import gen_tmp_file
from icolos.utils.general.parallelization import SubtaskContainer, Parallelizer

_SJE = StepJazzyEnum()
_JE = JazzyEnum()

_all_jazzy_commands = [_JE.VEC, _JE.VIS]

_all_jazzy_properties = [

[docs]class JazzyAdditional(BaseModel): command: str = _JE.VEC
[docs]class StepJazzy(StepCalculationBase, BaseModel): jazzy_additional: JazzyAdditional = None def __init__(self, **data): super().__init__(**data) self._initialize_backend(executor=JazzyExecutor) self._check_backend_availability() # initialize the additional settings self.jazzy_additional = JazzyAdditional(**self.settings.additional) if self.jazzy_additional.command not in _all_jazzy_commands: raise ValueError( f"Jazzy command {self.jazzy_additional.command} unknown - abort." ) def _prepare_batch(self, batch) -> Tuple: tmp_dirs = [] input_smiles = [] output_files = [] conformers = [] for next_subtask_list in batch: tmp_dir = mkdtemp() tmp_dirs.append(tmp_dir) for subtask in next_subtask_list: _, tmp_out_path = gen_tmp_file(suffix=".out", dir=tmp_dir) output_files.append(tmp_out_path) conformer = conformers.append(conformer) # TODO: not really efficient for SMILES, but keep it (?) on conformer level in anticipation of # structural input that differs per conformer input_smile = conformer.get_enumeration_object().get_smile() input_smiles.append(input_smile) return tmp_dirs, input_smiles, output_files, conformers def _prepare_arguments(self, settings: List) -> List: # add flags for flag in self.settings.arguments.flags: settings.append(flag) # add parameters parameters = deepcopy(self.settings.arguments.parameters) # flatten the dictionary into a list for command-line execution for key in parameters.keys(): if key in _all_jazzy_commands: self._logger.log( f"Use the additional block to specify Jazzy commands, parameter {key} ignored.", _LE.WARNING, ) continue settings.append(key) settings.append(parameters[key]) return settings def _run_subjob(self, tmp_dir: str, input_smile: str, output_file: str) -> None: work_dir = os.getcwd() os.chdir(tmp_dir) # construct the specified command-line call; only one command can be used at a time # e.g. jazzy vec [OPTIONS] SMILES arguments = [self.jazzy_additional.command, '"' + input_smile + '"'] arguments = self._prepare_arguments(arguments) result = self._backend_executor.execute( command=_JE.JAZZY, arguments=arguments, check=False ) # Jazzy prints the result to stdout -> store it in a temporary file with open(output_file, "w") as f: f.writelines(result.stdout) os.chdir(work_dir) def _parse_jazzy_result( self, output_files: List[str], conformers: List[Conformer] ) -> List: results = [] for output_file, conformer in zip(output_files, conformers): # load the JSON string that was captured from stdout and written to the output file try: with open(output_file) as file: # Jazzy does not output valid JSONs (' instead of "), so we need to replace those # except escaped ones result ="\r", "").replace("\n", "") p = re.compile("(?<!\\\\)'") result = p.sub('"', result) result = json.loads(result) except FileNotFoundError: self._logger.log( f"Jazzy result for conformer {conformer.get_index_string()} stored in file {output_file} not found - proceeding.", _LE.WARNING, ) results.append(_SJE.FAILURE) continue # attach the properties obtained as tags for key in result.keys(): if key in _all_jazzy_properties: conformer.get_molecule().SetProp(key, str(result[key])) results.append(_SJE.SUCCESS) return results def _execute_jazzy(self): jazzy_parallelizer = Parallelizer(func=self._run_subjob) n = 1 while self._subtask_container.done() is False: next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores()) tmp_dirs, input_smiles, output_files, conformers = self._prepare_batch( next_batch ) _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] self._logger.log(f"Executing Jazzy for batch {n}.", _LE.DEBUG) jazzy_parallelizer.execute_parallel( tmp_dir=tmp_dirs, input_smile=input_smiles, output_file=output_files ) results = self._parse_jazzy_result(output_files, conformers) for sublist, result in zip(next_batch, results): assert len(sublist) == 1 for task in sublist: if result == _SJE.SUCCESS: task.set_status_success() else: task.set_status_failed() n += 1 self._remove_temporary(tmp_dirs)
[docs] def execute(self): all_conformers = [] for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): if enumeration.get_conformers(): for conformer in enumeration.get_conformers(): all_conformers.append(conformer) self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(all_conformers) self._execute_jazzy() self._logger.log( f"Completed execution of Jazzy for {len(all_conformers)} conformers (using their SMILES strings).", _LE.INFO, )