Source code for icolos.core.workflow_steps.schrodinger.prime

import os

from pydantic import BaseModel, PrivateAttr
from rdkit import Chem
from copy import deepcopy

from icolos.core.workflow_steps.schrodinger.base import StepSchrodingerBase
from icolos.utils.execute_external.prime import PrimeExecutor
from icolos.utils.execute_external.schrodinger import SchrodingerExecutor

from icolos.core.containers.compound import Conformer

from icolos.utils.enums.program_parameters import PrimeEnum, SchrodingerExecutablesEnum
from icolos.utils.enums.step_enums import StepPrimeEnum, StepGlideEnum
from icolos.core.workflow_steps.step import _LE
from icolos.core.step_utils.sdconvert_util import SDConvertUtil
from icolos.core.step_utils.structcat_util import StructcatUtil
from icolos.utils.general.files_paths import gen_tmp_file
from icolos.utils.general.parallelization import SubtaskContainer, Parallelizer
from tempfile import mkdtemp

_SPE = StepPrimeEnum()
_PE = PrimeEnum()
_SEE = SchrodingerExecutablesEnum()
_SGE = StepGlideEnum()


[docs]class StepPrime(StepSchrodingerBase, BaseModel): """ Interface to Schrodinger's Prime mmgbsa implementation """ _schrodinger_executor: SchrodingerExecutor = None
[docs] class Config: underscore_attrs_are_private = True
_sdconvert_util = PrivateAttr() _structcat_util = PrivateAttr() def __init__(self, **data): super().__init__(**data) # initialize the executor and test availability self._initialize_backend(executor=PrimeExecutor) self._check_backend_availability() self._schrodinger_executor = SchrodingerExecutor( prefix_execution=self.execution.prefix_execution ) # prepare sdconvert utility self._sdconvert_util = SDConvertUtil( prefix_execution=self.execution.prefix_execution, binary_location=self.execution.binary_location, ) # prepare structcat utility self._structcat_util = StructcatUtil( prefix_execution=self.execution.prefix_execution, binary_location=self.execution.binary_location, ) def _execute_prime(self): # note, that as the output file name cannot be set (an "-out.maegz" will be attached), this does # not need to be heeded here and is encoded in the fixed file name strings prime_parallelizer = Parallelizer(func=self._run_subjob) n = 1 while self._subtask_container.done() is False: next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores()) # generate lists for the next batch tmp_dirs, complex_paths, output_sdf_paths = self._prepare_batch(next_batch) self._apply_token_guard() _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] self._logger.log(f"Executing prime for batch {n}", _LE.DEBUG) prime_parallelizer.execute_parallel( complex_path=complex_paths, sdf_output=output_sdf_paths, tmp_output_dir=tmp_dirs, ) self._parse_prime_output( complex_paths, tmp_dirs, output_sdf_paths, next_batch ) n += 1 def _parse_prime_output(self, complex_paths, tmp_dirs, output_sdf_paths, batch): # go through the batch, get the info from the output file and scores = [] for i in range(len(output_sdf_paths)): sdf_path = output_sdf_paths[i] curr_enum = None curr_conformer = None mol_supplier = Chem.SDMolSupplier(sdf_path, removeHs=False) for mol in mol_supplier: # check whether the name corresponds to an enum or conformer identifier = str(mol.GetProp("_Name")) is_enum = True if len(identifier.split(":")) == 2 else False if ( not is_enum ): # if we are dealing with a conformer, drop the conformer index to get the enum id enum_index = ":".join(list(mol.GetProp("_Name").split(":"))[:-1]) else: enum_index = identifier # extract the enumeration object, regardless of whether we're dealing with a conformer or an enumeration prime_score = mol.GetProp(_SPE.MMGBSA_SCORE) for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): if enumeration.get_index_string() == enum_index: curr_enum = enumeration # if we have a conformer, find the conformer with the right ID and append the score to the existing object if not is_enum: assert curr_enum is not None for conformer in curr_enum.get_conformers(): if conformer.get_index_string() == identifier: curr_conformer = conformer else: # if scoring an enumeration, create and attach a conformer out of it curr_conformer = Conformer(conformer=mol) curr_enum.add_conformer(curr_conformer) assert curr_conformer is not None # now we have the conformer from the originals, set the prime score curr_conformer.get_molecule().SetProp(_SPE.MMGBSA_SCORE, prime_score) self._logger.log( f"Calculated dG Bind of {prime_score} for conformer {curr_conformer.get_index_string()}", _LE.INFO, ) scores.append(prime_score) # after parsing, remove the directories self._remove_temporary(tmp_dirs) # set success status for sublist in batch: for task in sublist: task.set_status_success() def _prepare_batch(self, batch): # generate input files for the batch and return tmpdirs tmp_dirs = [] complex_paths = [] output_sdf_paths = [] for next_subtask_list in batch: tmp_dir = mkdtemp() _, tmp_input_sdf_file = gen_tmp_file(suffix=".sdf", dir=tmp_dir) _, tmp_input_mae_file = gen_tmp_file(suffix=".maegz", dir=tmp_dir) _, tmp_output_sdf_file = gen_tmp_file(suffix=".sdf", dir=tmp_dir) writer = Chem.SDWriter(tmp_input_sdf_file) for subtask in next_subtask_list: mol = deepcopy(subtask.data.get_molecule()) conf_id = subtask.data.get_index_string() mol.SetProp("_Name", conf_id) writer.write(mol) writer.close() # now we have an sdf file with all the conformers from that batch. Attach the structcat_args = [ "-imae", self.settings.additional[_SPE.RECEPTOR], "-isd", tmp_input_sdf_file, "-omae", tmp_input_mae_file, ] self._schrodinger_executor.execute( command=_SEE.STRUCTCAT, arguments=structcat_args, location=tmp_dir, check=True, ) tmp_dirs.append(tmp_dir) complex_paths.append(tmp_input_mae_file) output_sdf_paths.append(tmp_output_sdf_file) return tmp_dirs, complex_paths, output_sdf_paths def _run_subjob(self, complex_path, sdf_output, tmp_output_dir): work_dir = os.getcwd() os.chdir(tmp_output_dir) arguments = [complex_path, _PE.PRIME_OUTTYPE, _PE.PRIME_OUTTYPE_LIGAND] for key in self.settings.arguments.parameters.keys(): if key not in [_PE.PRIME_OUTTYPE, _PE.PRIME_NJOBS]: arguments.append(key) arguments.append(str(self.settings.arguments.parameters[key])) for flag in self.settings.arguments.flags: arguments.append(str(flag)) if _PE.PRIME_WAIT not in arguments: arguments.append(_PE.PRIME_WAIT) result = self._backend_executor.execute( command=_PE.PRIME_MMGBSA, arguments=arguments, check=True, location=tmp_output_dir, ) print(result) output_file = complex_path.split(".")[0] + "-out.maegz" assert os.path.isfile(output_file) # Convert the mae ligand output back to sdf self._sdconvert_util.mae2sdf(output_file, sdf_output) os.chdir(work_dir) return result
[docs] def execute(self): # need to unwrap the conformers to efficiently run in parallel, create lists of subtasks, each with their own files, tmpdirs, then execute them in parallel all_conformers = [] for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): if enumeration.get_conformers(): # default running mode is to score incoming conformers without changing their configurations for conformer in enumeration.get_conformers(): all_conformers.append(conformer) else: all_conformers.append(enumeration) self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(all_conformers) self._execute_prime() self._logger.log( f"Executed Prime for {len(all_conformers)} confomers", _LE.DEBUG )