Source code for icolos.core.workflow_steps.ccdc.docking

import os
import glob
import tempfile
import time
from typing import List, Tuple

from pydantic import BaseModel, Field
from rdkit import Chem
from copy import deepcopy

from icolos.utils.enums.step_enums import StepBaseEnum, StepGoldEnum
from icolos.utils.execute_external.gold import GoldExecutor
from icolos.utils.general.files_paths import gen_tmp_file
from icolos.core.containers.compound import Conformer
from icolos.utils.enums.program_parameters import GoldOutputEnum, GoldEnum
from icolos.core.workflow_steps.step import _LE, StepBase
from icolos.utils.general.parallelization import Subtask, SubtaskContainer, Parallelizer
from icolos.core.step_utils.gold_config import (
    ConfigAutomaticSettings,
    ConfigPopulation,
    ConfigGeneticOperators,
    ConfigFloodFill,
    ConfigDataFiles,
    ConfigFlags,
    ConfigProteinData,
    ConfigFitnessFunctionSettings,
    ConfigSaveOptions,
    ConfigCovalentBonding,
    ConfigConstraints,
    ConfigTermination,
)

_SBE = StepBaseEnum
_CGE = GoldEnum()
_SGE = StepGoldEnum()
_GOE = GoldOutputEnum()


[docs]class GoldConfiguration(BaseModel): automatic_settings: ConfigAutomaticSettings = Field( alias="AUTOMATIC SETTINGS", default=ConfigAutomaticSettings() ) population: ConfigPopulation = Field(alias="POPULATION", default=ConfigPopulation()) genetic_operators: ConfigGeneticOperators = Field( alias="GENETIC OPERATORS", default=ConfigGeneticOperators() ) flood_fill: ConfigFloodFill = Field(alias="FLOOD FILL", default=None) data_files: ConfigDataFiles = Field(alias="DATA FILES", default=ConfigDataFiles()) flags: ConfigFlags = Field(alias="FLAGS", default=ConfigFlags()) termination: ConfigTermination = Field( alias="TERMINATION", default=ConfigTermination() ) constraints: ConfigConstraints = Field( alias="CONSTRAINTS", default=ConfigConstraints() ) covalent_bonding: ConfigCovalentBonding = Field( alias="COVALENT BONDING", default=ConfigCovalentBonding() ) save_options: ConfigSaveOptions = Field( alias="SAVE OPTIONS", default=ConfigSaveOptions() ) fitness_function_settings: ConfigFitnessFunctionSettings = Field( alias="FITNESS FUNCTION SETTINGS", default=ConfigFitnessFunctionSettings() ) protein_data: ConfigProteinData = Field(alias="PROTEIN DATA", default=None)
[docs]class GoldAdditional(BaseModel): configuration: GoldConfiguration = GoldConfiguration() gold_config_file: str = None docking_score_tag: str = "Gold.Goldscore.Fitness" grid_ids: List[str] = ["grid0"]
[docs]class StepGold(StepBase, BaseModel): gold_additional: GoldAdditional = None _gold_executor: GoldExecutor = None def __init__(self, **data): super().__init__(**data) # initialize the executor and test availability self._initialize_backend(executor=GoldExecutor) self._check_backend_availability() # set Gold specific settings self.gold_additional = GoldAdditional(**self.settings.additional) def _set_docking_score(self, conformer: Chem.Mol) -> bool: tag = self.gold_additional.docking_score_tag try: docking_score = conformer.GetProp(tag) except KeyError: self._logger.log( f'Could not find tag "{tag}" in conformer, use "docking_score_tag" to adjust.', _LE.DEBUG, ) return False conformer.SetProp(_SBE.ANNOTATION_TAG_DOCKING_SCORE, str(docking_score)) return True def _generate_temporary_input_output_files( self, batch: List[List[Subtask]] ) -> Tuple[List[str], List[List[str]], List[List[str]]]: tmp_output_dirs = [] tmp_input_sdf_paths = [] tmp_output_sdf_paths = [] for next_subtask_list in batch: # generate temporary input files and output directory cur_tmp_output_dir = tempfile.mkdtemp() # write-out the temporary input files: one molecule per file each one_written = False list_input_files = [] list_output_files = [] for subtask in next_subtask_list: enumeration = subtask.data mol = deepcopy(enumeration.get_molecule()) if mol is not None: _, cur_tmp_sdf = gen_tmp_file(suffix=".sdf", dir=cur_tmp_output_dir) writer = Chem.SDWriter(cur_tmp_sdf) mol.SetProp("_Name", enumeration.get_index_string()) writer.write(mol) writer.close() list_input_files.append(cur_tmp_sdf) # output files for gold look something like: ranked_tmp1cwtavoh_m1_2.sdf # where: (1) "ranked_" is a constant, (2) followed by the input file name, # (3) followed by the molecule # (here always "m1" as only 1 compound/file at the moment), # (4) followed by the pose # and finally (5) the ".sdf" ending # use '*' for getting all files basename_filepart = os.path.basename(cur_tmp_sdf).rstrip(".sdf") list_output_files.append( os.path.join( cur_tmp_output_dir, "".join(["ranked_", basename_filepart, "_m1_*.sdf"]), ) ) one_written = True if one_written is False: # no compound left from this batch: remove the temporary folder and skip this one self._remove_temporary(cur_tmp_output_dir) continue # since all went well, add input and output paths here tmp_input_sdf_paths.append(list_input_files) tmp_output_sdf_paths.append(list_output_files) tmp_output_dirs.append(cur_tmp_output_dir) return (tmp_output_dirs, tmp_input_sdf_paths, tmp_output_sdf_paths) def _execute_gold(self): # get number of sublists in batch and initialize Parallelizer gold_parallelizer = Parallelizer(func=self._run_subjob) # continue until everything is successfully done or number of retries have been exceeded while self._subtask_container.done() is False: next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores()) # generate paths and initialize molecules (so that if they fail, this can be covered) ( tmp_output_dirs, tmp_input_sdf_paths, tmp_output_sdf_paths, ) = self._generate_temporary_input_output_files(next_batch) # execute the current batch in parallel; hand over lists of parameters (will be handled by Parallelizer) # also increment the tries and set the status to "failed" (don't do that inside subprocess, as data is # copied, not shared!) _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] gold_parallelizer.execute_parallel( output_dir=tmp_output_dirs, input_paths=tmp_input_sdf_paths ) # parse the output of that particular batch and remove temporary files self._parse_gold_output( tmp_output_paths=tmp_output_sdf_paths, batch=next_batch ) # clean-up self._remove_temporary(tmp_output_dirs) # print the progress for this execution self._log_execution_progress() def _parse_gold_output( self, tmp_output_paths: List[List[str]], batch: List[List[Subtask]] ): def _update_subtask(sublist: List[Subtask], enum_identifier: str): for task in sublist: if task.data.get_index_string() == enum_identifier: task.set_status_success() grid_id = self.gold_additional.grid_ids[0] grid_path = self.gold_additional.configuration.protein_data.protein_datafile for batch_id in range(len(batch)): cur_sublist = batch[batch_id] list_comp_output = tmp_output_paths[batch_id] for compound_number in range(len(list_comp_output)): # expand the output paths to cover all pose files for this compound # one input sdf matches to x output sdfs (for x poses) comp_sdf_output_paths = glob.glob(list_comp_output[compound_number]) # loop over all output files for comp_output in comp_sdf_output_paths: # this is a protection against the case where empty (file size == 0 bytes) files # are generated due to a failure during docking if ( not os.path.isfile(comp_output) or os.path.getsize(comp_output) == 0 ): continue mol_supplier = Chem.SDMolSupplier(comp_output, removeHs=False) for mol in mol_supplier: if mol is None: continue # The name is something like: 10:0|tmp8x3rtg4d|sdf|1|dock7 cur_enumeration_name = str(mol.GetProp("_Name")).split("|")[0] # add the information on the actual grid used mol.SetProp(_SBE.ANNOTATION_GRID_ID, str(grid_id)) mol.SetProp(_SBE.ANNOTATION_GRID_PATH, str(grid_path)) mol.SetProp( _SBE.ANNOTATION_GRID_FILENAME, os.path.basename(grid_path) ) # if no docking score is attached (i.e. the molecule is a receptor or so, skip it) if self._set_docking_score(mol) is not True: continue # add molecule to the appropriate ligand for compound in self.get_compounds(): for enumeration in compound: if ( enumeration.get_index_string() == cur_enumeration_name ): new_conformer = Conformer( conformer=mol, conformer_id=None, enumeration_object=enumeration, ) enumeration.add_conformer( new_conformer, auto_update=True ) _update_subtask( cur_sublist, enum_identifier=cur_enumeration_name, ) break def _run_subjob(self, output_dir: str, input_paths: List[str]): # generate the appropriate config; note that input and output paths are referring to one compound each config_path = os.path.join(output_dir, "gold.config") self.generate_config_file(path=config_path, ligand_files=input_paths) # set up arguments list and execute; change path to temporary sub-folder to avoid cluttering with files arguments = [config_path] old_dir = os.getcwd() os.chdir(output_dir) self._backend_executor.execute( command=_CGE.GOLD_CALL, arguments=arguments, check=True ) os.chdir(old_dir) time.sleep(3) def _config_from_file(self, ligand_files: List[str]) -> List[str]: config = [] with open(self.gold_additional.gold_config_file, "r") as f: buffer = f.readlines() idx = 0 while idx < len(buffer): line = buffer[idx].rstrip("\n") config.append(line) if _SGE.DATA_FILES in line: # skip over all defined ligand files while _SGE.LIGAND_DATA_FILE in buffer[idx + 1]: self._logger.log( 'Skipping pre-defined ligand in configuration file (remove lines starting with "ligand_data_file" to suppress this information).', _LE.DEBUG, ) idx = idx + 1 # add ligand files as required for ligand_file in ligand_files: config.append( " ".join([_SGE.LIGAND_DATA_FILE, ligand_file, "10"]) ) idx = idx + 1 return config def _config_from_default(self, ligand_files: List[str]) -> List[str]: def block_indent(block_name: str) -> str: return "".join([_SGE.BLOCK_INDENT, block_name]) def empty_line() -> str: return "" def add_block(list_lines: List[str], block: dict): for key, value in block.items(): if key == _SGE.LIGAND_DATA_FILE: # this needs to be overwritten by the actual compound collection self._logger.log( f"Do not set ligand data files explicitly, use the compound handover - skipping.", _LE.WARNING, ) continue list_lines.append(" ".join([key, "=", str(value)])) list_lines.append(empty_line()) config = [block_indent(_SGE.CONFIGURATION_START), empty_line()] # automatic settings config.append(block_indent(_SGE.AUTOMATIC_SETTINGS)) add_block(config, self.gold_additional.configuration.automatic_settings.dict()) # population config.append(block_indent(_SGE.POPULATION)) add_block(config, self.gold_additional.configuration.population.dict()) # genetic operators config.append(block_indent(_SGE.GENETIC_OPERATORS)) add_block(config, self.gold_additional.configuration.genetic_operators.dict()) # flood fill config.append(block_indent(_SGE.FLOOD_FILL)) add_block(config, self.gold_additional.configuration.flood_fill.dict()) # data files config.append(block_indent(_SGE.DATA_FILES)) for ligand_file in ligand_files: config.append(" ".join([_SGE.LIGAND_DATA_FILE, ligand_file, "10"])) add_block(config, self.gold_additional.configuration.data_files.dict()) # flags config.append(block_indent(_SGE.FLAGS)) add_block(config, self.gold_additional.configuration.flags.dict()) # termination config.append(block_indent(_SGE.TERMINATION)) add_block(config, self.gold_additional.configuration.termination.dict()) # constraints config.append(block_indent(_SGE.CONSTRAINTS)) add_block(config, self.gold_additional.configuration.constraints.dict()) # covalent bonding config.append(block_indent(_SGE.COVALENT_BONDING)) add_block(config, self.gold_additional.configuration.covalent_bonding.dict()) # save options config.append(block_indent(_SGE.SAVE_OPTIONS)) add_block(config, self.gold_additional.configuration.save_options.dict()) # fitness function settings config.append(block_indent(_SGE.FITNESS_FUNCTION_SETTINGS)) add_block( config, self.gold_additional.configuration.fitness_function_settings.dict() ) # protein data config.append(block_indent(_SGE.PROTEIN_DATA)) add_block(config, self.gold_additional.configuration.protein_data.dict()) return config
[docs] def generate_config_file(self, path: str, ligand_files: List[str]): # very complicated custom format for GOLD; see file in "external_documentation/gold.conf" for details if self.gold_additional.gold_config_file is None: # generate config file step by step from default and overwrite set elements # TODO: constraints are not yet supported (and probably a few other fine-grained settings) config = self._config_from_default(ligand_files=ligand_files) else: # load path to config file and replace "ligand_files" as required; as configuration element is ignored in # this mode, check that it is not set if _SGE.CONFIGURATION in self.gold_additional.dict().keys(): self._logger.log( 'The "configuration" elements are ignored when a config file path is specified.', _LE.WARNING, ) config = self._config_from_file(ligand_files=ligand_files) # write out with open(path, "w") as f: for line in config: f.write(line + "\n")
[docs] def execute(self): # Note: This step only supports one grid at a time, ensemble docking is taken care of at the workflow level! # in order to be able to efficiently execute Gold on the enumeration level, all of them have to be unrolled # Note: As they retain their respective Compound object, the attribution later on is simple all_enumerations = [] for compound in self.get_compounds(): all_enumerations = all_enumerations + compound.get_enumerations() for enumeration in compound: enumeration.clear_conformers() # split into sublists, according to the settings self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(all_enumerations) # execute Gold docking self._execute_gold()