Source code for icolos.core.workflow_steps.schrodinger.glide

import gzip
import os
import shutil
import tempfile
from copy import deepcopy
from typing import List, Tuple

from pydantic import BaseModel
from rdkit import Chem

from icolos.core.workflow_steps.schrodinger.base import StepSchrodingerBase
from icolos.utils.execute_external.glide import GlideExecutor
from icolos.utils.execute_external.schrodinger import SchrodingerExecutor
from icolos.utils.general.files_paths import any_in_file, gen_tmp_file

from icolos.core.containers.compound import Conformer

from icolos.utils.enums.program_parameters import SchrodingerExecutablesEnum, GlideEnum
from icolos.utils.enums.step_enums import StepGlideEnum, StepBaseEnum
from icolos.core.workflow_steps.step import _LE
from icolos.utils.general.parallelization import SubtaskContainer, Parallelizer, Subtask
from icolos.utils.general.strings import stringify


[docs]class GlideSupportEnum: GLIDE_INPUTBLOCK_COMMASEPARATED = [ "CONSTRAINT_GROUP" ] # define list of block keys which are to have commas GLIDE_INPUTBLOCK_VALUEQUOTED = [ "FEATURE" ] # define list of block keys, where values are to be put # into double quotation marks GLIDE_TG_WAIT_INTERVAL = "wait_interval_seconds" GLIDE_TG_WAIT_LIMIT = "wait_limit_seconds" # try to find the internal value and return def __getattr__(self, name): if name in self: return name raise AttributeError # prohibit any attempt to set any values def __setattr__(self, key, value): raise ValueError("No changes allowed.")
_SBE = StepBaseEnum _EE = GlideEnum() _SGE = StepGlideEnum() _SEE = SchrodingerExecutablesEnum() _GSE = GlideSupportEnum()
[docs]class StepGlide(StepSchrodingerBase, BaseModel): """Interface to Schrodinger's Glide docking engine Additional settings: :dict configuration: key-value pairs to define contents of maestro configuration file :int time_limit_per_task: set timeout (seconds) for each job before reporting failure (default=120) :str maestro_in_file: alternative to configuration, directly specify a path to a maestro-formatted config file :bool fill_dummy_confs: control whether to add dummy conformer to enumeration if no poses produced. Useful when REINVENT expects an output """ _schrodinger_executor: SchrodingerExecutor = None
[docs] class Config: underscore_attrs_are_private = True
def __init__(self, **data): super().__init__(**data) # initialize the executors and test availability self._initialize_backend(executor=GlideExecutor) self._check_backend_availability() self._schrodinger_executor = SchrodingerExecutor( prefix_execution=self.execution.prefix_execution, binary_location=self.execution.binary_location, ) def _get_scores_from_conformer(self, conformer: Chem.Mol) -> Tuple[float, float]: return ( float(conformer.GetProp(_SGE.GLIDE_DOCKING_SCORE)), float(conformer.GetProp(_SGE.GLIDE_GSCORE)), ) def _set_docking_score(self, conformer: Chem.Mol) -> bool: try: docking_score, g_score = self._get_scores_from_conformer(conformer) except KeyError: return False conformer.SetProp(_SBE.ANNOTATION_TAG_DOCKING_SCORE, str(docking_score)) conformer.SetProp(_SBE.ANNOTATION_TAG_G_SCORE, str(g_score)) return True def _generate_temporary_input_output_files( self, batch: List[List[Subtask]] ) -> Tuple[List[str], List[str], List[str], List[str]]: tmp_output_dirs = [] tmp_input_mae_paths = [] tmp_output_sdf_paths = [] tmp_output_maegz_paths = [] for next_subtask_list in batch: # generate temporary input files and output directory cur_tmp_output_dir = tempfile.mkdtemp() _, cur_tmp_sdf = gen_tmp_file(suffix=".sdf", dir=cur_tmp_output_dir) _, cur_tmp_mae = gen_tmp_file(suffix=".mae", dir=cur_tmp_output_dir) # write-out the temporary input file writer = Chem.SDWriter(cur_tmp_sdf) one_written = False for subtask in next_subtask_list: enumeration = subtask.data mol = deepcopy(enumeration.get_molecule()) if mol is not None: mol.SetProp("_Name", enumeration.get_index_string()) one_written = True writer.write(mol) writer.close() if one_written is False: self._remove_temporary(cur_tmp_output_dir) continue # translate the SDF into a MAE file self._translate_SDF_to_MAE( sdf_path=cur_tmp_sdf, mae_path=cur_tmp_mae, executor=self._schrodinger_executor, ) # add the path to which "_dock_subjob()" will write the result SDF _, output_sdf_path = gen_tmp_file( suffix="_result.sdf", dir=cur_tmp_output_dir ) _, output_maegz_path = gen_tmp_file( suffix="_result.maegz", dir=cur_tmp_output_dir, text=False ) tmp_output_sdf_paths.append(output_sdf_path) tmp_output_maegz_paths.append(output_maegz_path) tmp_input_mae_paths.append(cur_tmp_mae) tmp_output_dirs.append(cur_tmp_output_dir) return ( tmp_output_dirs, tmp_input_mae_paths, tmp_output_sdf_paths, tmp_output_maegz_paths, ) def _all_keywords(self) -> dict: """Returns joined keywords from JSON and from .in file (if specified).""" keywords = {} # keywords from maestro file; they can be overwritten by explicitly set values from the "configuration" block maestro_in_file = deepcopy( self.settings.additional.get(_SGE.MAESTRO_IN_FILE, None) ) if maestro_in_file is not None: with open(maestro_in_file[_SGE.MAESTRO_IN_FILE_PATH], "rt") as f: keywords_from_file = self._parse_maestro_in_file(f.readlines()) keywords.update(keywords_from_file) # Add keywords from advanced_glide_keywords # (they are keywords with file paths), # skipping keywords that are None. # Also skip maestro file - that's not a keyword. # TODO: This is legacy code from DockStream's implementation, which was necessary to accommodate the GUI. # Remove? # if self.parameters.advanced_glide_keywords is not None: # adv_kw = stringify({ # k: v # for k, v in self.parameters.advanced_glide_keywords.dict().items() # if v is not None and k not in {'maestro_file'} # }) # keywords.update(adv_kw) # Add "ordinary" keywords, overwriting existing ones. json_keywords = stringify( deepcopy(self.settings.additional.get(_SGE.CONFIGURATION, {})) ) keywords.update( json_keywords ) # Overwrites any keywords that are already present. return keywords def _configuration_Maestro_reformat(self, configuration: dict): # rewrite keyword input file in Maestro format maestro_indent = " " maestro_spacing = " " element_lines = [] block_lines = [] for key in configuration.keys(): if isinstance(configuration[key], str): # keyword holds one dictionary (string) only element_lines.append( maestro_spacing.join([key, configuration[key] + "\n"]) ) elif isinstance(configuration[key], dict): # keyword holds a composite block and has no dictionary (e.g. constraints); note, that these must # always be at the end of the file block_lines.append("\n" + key + "\n") block = configuration[key] for key_idx, block_key in enumerate(block.keys()): block_value = block[block_key] # if this is a value in certain blocks, put it into double quotation marks as spaces are present if any([x in key for x in _GSE.GLIDE_INPUTBLOCK_VALUEQUOTED]): block_value = '"' + block_value + '"' line = maestro_indent + maestro_spacing.join( [block_key, block_value] ) # add comma to block definition, if there are more lines to come and the block requires it # note, that not all blocks in GLIDE require this; in some cases, the comma is already part of # the line (then skip it!) if any([x in key for x in _GSE.GLIDE_INPUTBLOCK_COMMASEPARATED]): if (key_idx + 1) < len(block) and line[-1] != ",": line = line + "," block_lines.append(line + "\n") else: raise Exception( f"Cannot handle type {type(configuration[key])} in configuration file specification, only use strings and blocks." ) return element_lines, block_lines def _write_configuration_to_file(self, configuration: dict, path: str): """Function to generate a keyword input file in Maestro format :param dict configuration: configuration as key-value pairs :param str path: writeout path for the config file in maestro format """ # call a function that returns the input keywords in Maestro format element_lines, block_lines = self._configuration_Maestro_reformat( configuration=configuration ) # arrange the elements and blocks if path is None: _, path = gen_tmp_file(suffix=".in") with open(path, mode="w") as f: self._logger.log(f"Writing GLIDE input file {path}:\n", _LE.DEBUG) for line in element_lines: f.write(line) self._logger_blank.log(line.rstrip("\n"), _LE.DEBUG) for line in block_lines: f.write(line) self._logger_blank.log(line.rstrip("\n"), _LE.DEBUG) self._logger_blank.log("", _LE.DEBUG) self._logger.log("--- End file", _LE.DEBUG) def _get_time_limit_per_task(self): # for "SP" method, it can be expected to that about 90 s / ligand is required at most # use a bit extra return int(self.settings.additional.get(_SGE.TIME_LIMIT_PER_TASK, 240)) def _get_path_tmp_results( self, glide_pose_outtype: str, base_path: str ) -> Tuple[str, str]: if glide_pose_outtype == _EE.GLIDE_POSE_OUTTYPE_LIGANDLIB: path_tmp_results = os.path.join( os.path.dirname(base_path), "".join( [ os.path.splitext(os.path.basename(base_path))[0], _SGE.GLIDE_SDF_DEFAULT_EXTENSION, ] ), ) elif glide_pose_outtype == _EE.GLIDE_POSE_OUTTYPE_POSEVIEWER: path_tmp_results = os.path.join( os.path.dirname(base_path), "".join( [ os.path.splitext(os.path.basename(base_path))[0], _SGE.GLIDE_MAEGZ_DEFAULT_EXTENSION, ] ), ) else: raise NotImplementedError( f"Specified out-type {glide_pose_outtype} for Glide not supported." ) path_tmp_log = os.path.join( os.path.dirname(base_path), "".join([os.path.splitext(os.path.basename(base_path))[0], _SGE.GLIDE_LOG]), ) return path_tmp_results, path_tmp_log def _move_result_files( self, glide_pose_outtype: str, path_tmp_results: str, path_sdf_results: str, path_maegz_results: str, ): if glide_pose_outtype == _EE.GLIDE_POSE_OUTTYPE_LIGANDLIB: if os.path.isfile(path_tmp_results): with gzip.open(path_tmp_results, "rb") as fin: with open(path_sdf_results, "wb") as fout: shutil.copyfileobj(fin, fout) elif glide_pose_outtype == _EE.GLIDE_POSE_OUTTYPE_POSEVIEWER: # as the output is in MAEGZ format, we need to translate it into an SDF (and move the original file to the # expected path) self._translate_MAE_to_SDF( mae_path=path_tmp_results, sdf_path=path_sdf_results, executor=self._schrodinger_executor, ) os.rename(path_tmp_results, path_maegz_results) else: raise NotImplementedError( f"Specified out-type {glide_pose_outtype} for Glide not supported." ) def _run_subjob( self, mae_ligand_path, path_sdf_results, path_maegz_results, tmp_output_dir, grid_path, sublist, ): # 1) increase the sublist "tries" and set status to "failed" _ = [task.increment_tries() for task in sublist] _ = [task.set_status_failed() for task in sublist] # 2) change to directory, to be able to use relative paths (to compensate for Schrodinger bug with AWS) working_dir = os.getcwd() os.chdir(tmp_output_dir) # 3) get "keywords" dictionary and overwrite necessary values # add "LIGANDFILE" keyword to list of keywords: full path to "mae" formatted ligands configuration = self._all_keywords() if configuration is None: raise ValueError( f"You need to specify at least the gridfile path in the configuration for Glide." ) configuration[_EE.GLIDE_LIGANDFILE] = mae_ligand_path # set the path to the grid file for this run configuration[_EE.GLIDE_GRIDFILE] = grid_path # if not set, set the liand pose outtype to "LIGANDLIB" (SDF output without receptor) glide_pose_outtype = configuration.get( _EE.GLIDE_POSE_OUTTYPE, _EE.GLIDE_POSE_OUTTYPE_LIGANDLIB ) configuration[_EE.GLIDE_POSE_OUTTYPE] = glide_pose_outtype # 4) write the keyword-input file for the "Glide" backend; write-out to temporary file _, glide_configuration_path = gen_tmp_file(suffix=".in", dir=tmp_output_dir) self._write_configuration_to_file( configuration=configuration, path=glide_configuration_path, ) # 5) wait / sleep until job is completed # Note, that while Glide has an option "-WAIT", this does not seem to work when getting back # data from AWS (probably it ends before copying back the data properly); stay with this solution for now path_tmp_results, path_tmp_log = self._get_path_tmp_results( glide_pose_outtype=glide_pose_outtype, base_path=glide_configuration_path ) # 5) execute the "Glide" backend arguments = self._prepare_glide_arguments(glide_configuration_path) execution_result = self._backend_executor.execute( command=_EE.GLIDE, arguments=arguments, check=True, location=os.path.dirname(glide_configuration_path), ) # 6) check return code (anything but '0' is bad) and add "stdout" to log file time_exceeded = False if execution_result.returncode != 0: msg = ( f"Could not dock with Glide, error message: {execution_result.stdout}." ) self._logger.log(msg, _LE.ERROR) self._print_log_file(path_tmp_log) raise RuntimeError() else: if ( self._wait_until_file_generation( path=path_tmp_results, path_log=path_tmp_log, interval_sec=10, maximum_sec=max( self._get_time_limit_per_task() * len(sublist), 300 ), success_strings=_EE.GLIDE_LOG_FINISHED_STRINGS, fail_strings=_EE.GLIDE_LOG_FAIL_STRINGS, ) is False ): time_exceeded = True self._logger.log( f"Sublist docking for output file {path_tmp_results} exceeded time limit or failed, " f"all these ligands are ignored in the final write-out. This could mean that none of " f"them could be docked or a runtime error in Glide occured.", _LE.DEBUG, ) # 6) load the log-file (if generated) and check if all went well if ( any_in_file(path_tmp_log, _EE.GLIDE_LOG_SUCCESS_STRING) and time_exceeded is False ): self._logger.log( f"Finished sublist (input: {mae_ligand_path}, output: {path_sdf_results}).", _LE.DEBUG, ) else: self._print_log_file(path_tmp_log) # 7) collect the results; Glide outputs the sdf with a given, semi-hard-coded path; extract the sdf file self._move_result_files( glide_pose_outtype=glide_pose_outtype, path_tmp_results=path_tmp_results, path_sdf_results=path_sdf_results, path_maegz_results=path_maegz_results, ) # 8) revert back to working directory os.chdir(working_dir) def _prepare_glide_arguments(self, glide_configuration_path: str) -> List[str]: # Note, that the first argument is the path to the configuration input file # If the number of cores has been set, overwrite "N_JOBS" and parallelize internally and also note # that each subjob requires a license; instead start each with "N_JOBS" = 1 arguments = [glide_configuration_path] # copy parameters and overwrite as necessary parameters = deepcopy(self.settings.arguments.parameters) parameters[_EE.GLIDE_NJOBS] = 1 if len(self.settings.arguments.flags) > 0: for flag in self.settings.arguments.flags: # -WAIT leads to issues at times: The process may not return properly # (e.g. because of writing problems) and then gets stuck; workaround with waiting # for file completion, so remove it if set if flag not in [_EE.GLIDE_WAIT]: arguments.append(str(flag)) if parameters: for key in parameters.keys(): # remove "-WAIT" if set as a parameter, as this leads to instability issues and ignore empty keys if key == _EE.GLIDE_WAIT or key == "": continue arguments.append(key) if parameters[key] is not None and parameters[key] != "": arguments.append(str(parameters[key])) return arguments def _execute_glide(self, grid_id: str, grid_path: str): # TODO: add individual resubmission for failed subtasks # get number of sublists in batch and initialize Parallelizer glide_parallelizer = Parallelizer(func=self._run_subjob) # continue until everything is successfully done or number of retries have been exceeded while self._subtask_container.done() is False: next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores()) # generate paths and initialize molecules (so that if they fail, this can be covered) ( tmp_output_dirs, tmp_input_mae_paths, tmp_output_sdf_paths, tmp_output_maegz_paths, ) = self._generate_temporary_input_output_files(next_batch) # call "token guard" method (only executed, if block is specified in the configuration), which will wait # with the execution if not enough tokens are available at the moment self._apply_token_guard() # execute the current batch in parallel; hand over lists of parameters (will be handled by Parallelizer) # also increment the tries and set the status to "failed" (don't do that inside subprocess, as data is # copied, not shared!) _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] list_grid_path = [grid_path for _ in tmp_input_mae_paths] glide_parallelizer.execute_parallel( mae_ligand_path=tmp_input_mae_paths, path_sdf_results=tmp_output_sdf_paths, path_maegz_results=tmp_output_maegz_paths, tmp_output_dir=tmp_output_dirs, grid_path=list_grid_path, sublist=next_batch, ) # parse the output of that particular batch and remove temporary files self._parse_glide_output( tmp_output_sdf_paths, tmp_output_maegz_paths, next_batch, grid_id, grid_path, ) # clean-up self._remove_temporary(tmp_output_dirs) # print the progress for this execution self._log_execution_progress() def _log_execution(self, grid_id: str, number_grids: int): number_enumerations = 0 number_conformers = 0 for compound in self.get_compounds(): number_enumerations += len(compound) for enumeration in compound: number_conformers += len(enumeration) if len(enumeration) == 0: self._logger.log( f"Enumeration {enumeration.get_index_string()} has no docked poses attached.", _LE.DEBUG, ) self._logger.log( f"Executed Schrodinger/Glide backend for grid {grid_id} (of {number_grids}), now storing a total of {number_conformers} conformers for {number_enumerations} enumerations in {len(self.get_compounds())} compounds.", _LE.INFO, ) def _parse_glide_output( self, tmp_output_sdf_paths: List[str], tmp_output_maegz_paths: List[str], batch: List[List[Subtask]], grid_id: str, grid_path: str, ): # TODO: refactor that (recombine with ligprep parsing?) def _update_subtask(sublist: List[Subtask], enum_identifier: str): for task in sublist: if task.data.get_index_string() == enum_identifier: task.set_status_success() def _add_poseviewer_file(conformer: Conformer, maegz_path: str): if os.path.isfile(maegz_path) and os.path.getsize(maegz_path) > 0: with open(maegz_path, "rb") as f: conformer.add_extra_data( key=_SGE.GLIDE_POSEVIEWER_FILE_KEY, data=f.read() ) for i in range(len(tmp_output_sdf_paths)): # get input and output paths and check the files are there path_sdf_results = tmp_output_sdf_paths[i] path_maegz_results = tmp_output_maegz_paths[i] cur_sublist = batch[i] # this is a protection against the case where empty (file size == 0 bytes) files are generated due to # a failure during docking if ( not os.path.isfile(path_sdf_results) or os.path.getsize(path_sdf_results) == 0 ): continue mol_supplier = Chem.SDMolSupplier(path_sdf_results, removeHs=False) for mol in mol_supplier: if mol is None: continue cur_enumeration_name = str(mol.GetProp("_Name")) # add the information on the actual grid used mol.SetProp(_SBE.ANNOTATION_GRID_ID, str(grid_id)) mol.SetProp(_SBE.ANNOTATION_GRID_PATH, str(grid_path)) mol.SetProp(_SBE.ANNOTATION_GRID_FILENAME, os.path.basename(grid_path)) # if no docking score is attached (i.e. the molecule is a receptor or so, skip it) if self._set_docking_score(mol) is not True: continue # add molecule to the appropriate ligand for compound in self.get_compounds(): for enumeration in compound: if enumeration.get_index_string() == cur_enumeration_name: new_conformer = Conformer( conformer=mol, conformer_id=None, enumeration_object=enumeration, ) _add_poseviewer_file( conformer=new_conformer, maegz_path=path_maegz_results ) enumeration.add_conformer(new_conformer, auto_update=True) _update_subtask( cur_sublist, enum_identifier=cur_enumeration_name ) break def _fill_dummy_conformers(self): # if any enumerations have not got poses attached by this point, docking failed. Attach a dummy conformer with the same mol object as the enumeration with gscore = 0 num_added = 0 for compound in self.get_compounds(): for enum in compound.get_enumerations(): if not enum.get_conformers(): num_added += 1 self._logger.log( f"Added dummy conformer for enumeration {enum.get_enumeration_id()}!", _LE.DEBUG, ) dummy_conf = Conformer( conformer=enum.get_molecule(), conformer_id=0 ) dummy_conf.get_molecule().SetProp( _SBE.ANNOTATION_TAG_DOCKING_SCORE, str(0.0) ) dummy_conf.get_molecule().SetProp( _SGE.GLIDE_DOCKING_SCORE, str(0.0) ) enum.add_conformer(dummy_conf) self._logger.log(f"Added {num_added} dummy conformers", _LE.DEBUG) def _sort_conformers(self): # sort the conformers (best to worst) and update their names to contain the conformer id # -> <compound>:<enumeration>:<conformer_number> for compound in self.get_compounds(): for enumeration in compound: enumeration.sort_conformers( by_tag=_SGE.GLIDE_DOCKING_SCORE, reverse=False )
[docs] def execute(self): """Executes Glide docking using the provided config on all conformers""" # in order to be able to efficiently execute Glide on the enumeration level, all of them have to be unrolled # Note: As they retain their respective Compound object, the attribution later on is simple all_enumerations = [] for compound in self.get_compounds(): all_enumerations = all_enumerations + compound.get_enumerations() for enumeration in compound: enumeration.clear_conformers() # to allow ensemble docking, loop over all provided grid files and annotate the origin of the conformers gridfiles = deepcopy(self.settings.additional.get(_SGE.CONFIGURATION, None))[ _EE.GLIDE_GRIDFILE ] if not isinstance(gridfiles, list): gridfiles = [gridfiles] # set grid ids (generate indices, if not specified) grid_ids = self.settings.additional.get(_SBE.GRID_IDS, []) if len(grid_ids) != len(gridfiles): self._logger.log( f"There were {len(grid_ids)} grid_ids specified for {len(gridfiles)}, using indices instead.", _LE.DEBUG, ) grid_ids = [str(idx) for idx in range(len(gridfiles))] for grid_id, grid_path in zip(grid_ids, gridfiles): # split into sublists, according to the settings self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(all_enumerations) # execute Glide self._execute_glide(grid_id=grid_id, grid_path=grid_path) # do the logging self._log_execution(grid_id=grid_id, number_grids=len(gridfiles)) if self._get_additional_setting(_SGE.FILL_DUMMY_CONFS, default=False): self._fill_dummy_conformers() # sort the conformers loaded to the enumerations self._sort_conformers()