Source code for icolos.core.workflow_steps.calculation.kallisto

import os
from copy import deepcopy
from tempfile import mkdtemp
from typing import List, Tuple
from pydantic import BaseModel
from rdkit import Chem

from icolos.core.containers.compound import Conformer
from icolos.core.workflow_steps.calculation.base import StepCalculationBase
from icolos.utils.enums.program_parameters import KallistoEnum, OpenBabelEnum
from icolos.utils.enums.step_enums import StepKallistoEnum
from icolos.core.workflow_steps.step import _LE
from icolos.utils.execute_external.kallisto import KallistoExecutor
from icolos.utils.execute_external.openbabel import OpenBabelExecutor
from icolos.utils.general.files_paths import gen_tmp_file
from icolos.utils.general.icolos_exceptions import StepFailed
from icolos.utils.general.parallelization import SubtaskContainer, Parallelizer

_SKE = StepKallistoEnum()
_KE = KallistoEnum()
_OBE = OpenBabelEnum()

_all_kallisto_commands = [
    _KE.ALP,
    _KE.BONDS,
    _KE.CNS,
    _KE.EEQ,
    _KE.EXS,
    _KE.LIG,
    _KE.PROX,
    _KE.RMS,
    _KE.SORT,
    _KE.STM,
    _KE.VDW,
]


[docs]class KallistoAdditional(BaseModel): features: List[str] = [_KE.ALP, _KE.BONDS] # list of features to be obtained
[docs]class StepKallisto(StepCalculationBase, BaseModel): kallisto_additional: KallistoAdditional = None _openbabel_executor: OpenBabelExecutor = None def __init__(self, **data): super().__init__(**data) self._initialize_backend(executor=KallistoExecutor) self._check_backend_availability() # initialize the executor for all "OpenBabel" self._openbabel_executor = OpenBabelExecutor() if not self._openbabel_executor.is_available(): raise StepFailed( "Kallisto requires OpenBabel execution, initialization failed." ) # initialize the additional settings self.kallisto_additional = KallistoAdditional(**self.settings.additional) def _prepare_temp_input(self, tmp_dir: str, molecule: Chem.Mol) -> str: tmp_sdf_path = super()._prepare_temp_input(tmp_dir, molecule) # Kallisto expects the input either in a mol2 file or an XYZ file _, tmp_xyz_path = gen_tmp_file(suffix=".xyz", dir=tmp_dir) # translate the arguments = [ tmp_sdf_path, _OBE.OBABEL_INPUTFORMAT_SDF, _OBE.OBABEL_OUTPUT_FORMAT_XYZ, "".join([_OBE.OBABEL_O, tmp_xyz_path]), ] self._openbabel_executor.execute( command=_OBE.OBABEL, arguments=arguments, check=False ) self._logger.log( f"Translated input molecule to file {tmp_xyz_path}.", _LE.DEBUG ) return tmp_xyz_path def _prepare_batch(self, batch) -> Tuple: tmp_dirs = [] input_files = [] output_files = [] conformers = [] for next_subtask_list in batch: tmp_dir = mkdtemp() tmp_dirs.append(tmp_dir) for subtask in next_subtask_list: _, tmp_out_path = gen_tmp_file(suffix=".out", dir=tmp_dir) output_files.append(tmp_out_path) conformer = subtask.data conformers.append(conformer) input_xyz_file = self._prepare_temp_input( tmp_dir, conformer.get_molecule() ) input_files.append(input_xyz_file) return tmp_dirs, input_files, output_files, conformers def _prepare_arguments(self, settings: List) -> List: # add flags for flag in self.settings.arguments.flags: settings.append(flag) # add parameters parameters = deepcopy(self.settings.arguments.parameters) # flatten the dictionary into a list for command-line execution for key in parameters.keys(): if key in _all_kallisto_commands: self._logger.log( f"Use the additional block to specify Kallisto commands, parameter {key} ignored.", _LE.WARNING, ) continue settings.append(key) settings.append(parameters[key]) return settings def _run_subjob(self, tmp_dir: str, input_file: str, output_file: str) -> None: work_dir = os.getcwd() os.chdir(tmp_dir) # construct the specified command-line call arguments = [] for feature in self.kallisto_additional.features: if feature not in _all_kallisto_commands: self._logger.log( f"Kallisto feature {feature} not supported, will be ignored.", _LE.WARNING, ) continue arguments.append(feature) arguments.append(input_file) arguments = self._prepare_arguments(arguments) result = self._backend_executor.execute( command=_KE.KALLISTO, arguments=arguments, check=False ) # Kallisto prints the result to stdout -> store it in a temporary file with open(output_file, "w") as f: f.writelines(result.stdout) os.chdir(work_dir) def _parse_kallisto_result( self, output_files: List[str], conformers: List[Conformer] ) -> List: def _split_list(inp: List, chunk_size: int) -> List[List[str]]: assert len(inp) % chunk_size == 0 return [ inp[idx : idx + chunk_size] for idx in range(0, len(inp), chunk_size) ] results = [] number_features = len(self.kallisto_additional.features) for output_file, conformer in zip(output_files, conformers): number_atoms = conformer.get_molecule().GetNumAtoms() # load features from output file with open(output_file, "r") as f: features_lines = f.readlines() features_lines = [line.lstrip().rstrip() for line in features_lines] # check, that all features could be calculated for all atoms expected_lines = number_features * number_atoms if expected_lines != len(features_lines): self._logger.log( f"Kallisto result for conformer {conformer.get_index_string()} incomplete ({len(features_lines)} lines instead of the expected {expected_lines}), check {output_file} - proceeding.", _LE.WARNING, ) results.append(_SKE.FAILURE) continue # group the features and add them to the conformers sublists = _split_list(features_lines, number_atoms) for feature, sublist in zip(self.kallisto_additional.features, sublists): conformer.get_molecule().SetProp(feature, "|".join(sublist)) results.append(_SKE.SUCCESS) return results def _execute_kallisto(self): kallisto_parallelizer = Parallelizer(func=self._run_subjob) n = 1 while self._subtask_container.done() is False: next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores()) tmp_dirs, input_files, output_files, conformers = self._prepare_batch( next_batch ) _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] self._logger.log(f"Executing Kallisto for batch {n}.", _LE.DEBUG) kallisto_parallelizer.execute_parallel( tmp_dir=tmp_dirs, input_file=input_files, output_file=output_files ) results = self._parse_kallisto_result(output_files, conformers) for sublist, result in zip(next_batch, results): assert len(sublist) == 1 for task in sublist: if result == _SKE.SUCCESS: task.set_status_success() else: task.set_status_failed() n += 1 self._remove_temporary(tmp_dirs)
[docs] def execute(self): assert len(self.kallisto_additional.features) > 0 all_conformers = [] for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): if enumeration.get_conformers(): for conformer in enumeration.get_conformers(): all_conformers.append(conformer) self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(all_conformers) self._execute_kallisto() self._logger.log( f"Completed execution of Kallisto for {len(all_conformers)} conformers, attached the following features: [{', '.join(self.kallisto_additional.features)}].", _LE.INFO, )