Source code for icolos.core.workflow_steps.schrodinger.fep_plus_setup

from copy import deepcopy
from typing import List
from icolos.core.containers.generic import GenericData
from icolos.core.step_utils.structconvert import StructConvert
from icolos.core.workflow_steps.schrodinger.base import StepSchrodingerBase
from icolos.utils.enums.program_parameters import (
    FepPlusEnum,
    SchrodingerExecutablesEnum,
)
from icolos.utils.enums.step_enums import StepBaseEnum, StepFepPlusEnum, StepGlideEnum
from icolos.utils.execute_external.fep_plus import FepPlusExecutor
from rdkit.Chem import SDMolSupplier
from icolos.utils.execute_external.schrodinger import SchrodingerExecutor
from icolos.core.workflow_steps.step import _LE
import os
from pydantic import BaseModel
from rdkit.Chem import SDWriter

_SFE = StepFepPlusEnum()
_FE = FepPlusEnum()
_SEE = SchrodingerExecutablesEnum()
_SBE = StepBaseEnum
_SGE = StepGlideEnum()


[docs]class StepFepPlusSetup(StepSchrodingerBase, BaseModel): """ Construct and analyse perturbation map for set of congeneric ligands Supports extracting structures from poseviewer or pdb files """ _schrodinger_executor: SchrodingerExecutor = None _converter: StructConvert = None def __init__(self, **data): super().__init__(**data) self._initialize_backend(executor=FepPlusExecutor) self._check_backend_availability() self._schrodinger_executor = SchrodingerExecutor( prefix_execution=self.execution.prefix_execution, binary_location=self.execution.binary_location, ) self._converter = StructConvert( prefix_execution=self.execution.prefix_execution, binary_location=self.execution.binary_location, ) def _extract_receptor_from_pv(self, tmp_dir, input_file: str = _SFE.RECEPTOR_MAEGZ): # run split_structure.py to obtain the receptor_structure self._logger.log(f"Extracting receptor from structure.", _LE.INFO) self._schrodinger_executor.execute( command=_SEE.STRUCT_SPLIT, arguments=[ "-m", "pdb", "-many_files", os.path.join(tmp_dir, input_file), f"{_SFE.STRUCT_SPLIT_BASE}.mae", ], check=True, location=tmp_dir, ) # get rid of the original receptor structure now we have the new one os.remove(os.path.join(tmp_dir, _SFE.RECEPTOR_MAEGZ)) def _write_receptor_from_pv(self, tmp_dir): # Handles writing the receptor structure to tmpdir, either from a poseviewer file, or a provided receptor # take the first poseviewer file it can find and split the stricure, take only the receptor for compound in self.get_compounds(): for enumeration in compound.get_enumerations(): for conformer in enumeration.get_conformers(): if ( _SGE.GLIDE_POSEVIEWER_FILE_KEY in conformer.get_extra_data().keys() ): with open( os.path.join(tmp_dir, _SFE.RECEPTOR_MAEGZ), "wb" ) as f: f.write( conformer.get_extra_data()[ _SGE.GLIDE_POSEVIEWER_FILE_KEY ] ) break if _SFE.RECEPTOR_MAEGZ in os.listdir(tmp_dir): self._logger.log( f"Writing poseviewer file to temporary directory.", _LE.INFO ) self._extract_receptor_from_pv(tmp_dir) elif self.data.generic.get_files_by_extension("pdb"): # a pdb file was loaded to generic data, use this as the receptor structure self.data.generic.get_argument_by_extension( "pdb", rtn_file_object=True ).write(os.path.join(tmp_dir, "receptor.pdb"), join=False) self._logger.log( "Converting provided pdb receptor structure to mae", _LE.DEBUG ) self._converter.convert( os.path.join(tmp_dir, "receptor.pdb"), os.path.join(tmp_dir, f"{_SFE.STRUCT_SPLIT_BASE}_receptor1.mae"), ) os.remove(os.path.join(tmp_dir, "receptor.pdb")) else: self._logger.log( "No poseviewer file was found attached to any of the conformers, and no PDB receptor file was specified - this must be set in the docking step", _LE.ERROR, ) raise FileNotFoundError def _check_xray_structure(self, compound_number): # check to see if an xray structure has been provided for that compound if _SFE.XRAY_STRUCTURES in self.settings.additional.keys(): if isinstance(self.settings.additional[_SFE.XRAY_STRUCTURES], dict): if ( compound_number in self.settings.additional[_SFE.XRAY_STRUCTURES].keys() ): return True, _FE.DICT elif os.path.isdir(self.settings.additional[_SFE.XRAY_STRUCTURES]): if os.path.isfile( os.path.join( self.settings.additional[_SFE.XRAY_STRUCTURES], f"{compound_number}.pdb", ) ): return True, _FE.PATH return False, None def _rename_sdf(self, path, comp_num): with open(path, "r") as f: lines = f.readlines()[1:] new_lines = [f"{comp_num}:0:0\n"] for line in lines: new_lines.append(line) self._remove_temporary(path) with open(path, "w") as f: f.writelines(new_lines) def _extract_ligand_from_pdb(self, tmp_dir: str, comp_num: int, type: str): # if ligand poses have been provided from xray structures, extract just the ligand self._logger.log( f"Extracting ligand from provided Xray structure for compound {comp_num}", _LE.DEBUG, ) if type == _FE.DICT: file_path = self.settings.additional[_SFE.XRAY_STRUCTURES[comp_num]] else: file_path = os.path.join( self.settings.additional[_SFE.XRAY_STRUCTURES], f"{comp_num}.pdb" ) if not os.path.isfile(file_path): raise FileNotFoundError( "The provided path to the xray structure does not exist or is not accessible" ) self._schrodinger_executor.execute( command=_SEE.STRUCT_SPLIT, arguments=["-m", "pdb", "-many_files", file_path, f"{_SFE.XRAY_SPLIT}.sdf"], check=True, location=tmp_dir, ) # remove everything apart from the ligand sdf which is concatenated later lig_found = False for file in os.listdir(tmp_dir): idx = file.split("/")[-1] if idx.startswith(_SFE.XRAY_SPLIT): if "ligand" in idx: # need to modify the name from the standard that Schrodinger provides self._rename_sdf(os.path.join(tmp_dir, file), comp_num) mols = SDMolSupplier(os.path.join(tmp_dir, file)) data = mols[0] lig_found = True self._remove_temporary(os.path.join(tmp_dir, file)) else: self._remove_temporary(os.path.join(tmp_dir, file)) if lig_found: return data def _write_input_files(self, tmp_dir): # write receptor structure to tmpdir, either from poseviewer or provided pdb file self._write_receptor_from_pv(tmp_dir) # write out all conformers present in self.data.compounds to a single sdf file. writer = SDWriter(os.path.join(tmp_dir, "concatenated.sdf")) for compound in self.get_compounds(): # If an xray pose is provided, use this flag, type = self._check_xray_structure(compound.get_compound_number()) if flag is True: self._logger.log( "Found Xray structure for the ligand - using this in preference to a docking pose", _LE.DEBUG, ) mol = self._extract_ligand_from_pdb( tmp_dir, compound.get_compound_number(), type ) writer.write(mol) else: # use the docked conformer for enumeration in compound.get_enumerations(): for conformer in enumeration.get_conformers(): mol = deepcopy(conformer.get_molecule()) mol.SetProp("_Name", conformer.get_compound_name()) writer.write(mol) def _parse_arguments(self, io_dict: dict) -> List[str]: arguments = [] for key in self.settings.arguments.parameters.keys(): arguments.append(key) arguments.append(str(self.settings.arguments.parameters[key])) for flag in self.settings.arguments.flags: arguments.append(str(flag)) for key, value in io_dict.items(): arguments.append(key) arguments.append(value) return arguments def _get_structcat_args( self, tmp_dir: str, out_file_type: str, outfile: str ) -> List[str]: arguments = [ f"{_SEE.STRUCTCAT_I}mae", os.path.join(tmp_dir, f"{_SFE.STRUCT_SPLIT_BASE}_receptor1.mae"), f"{_SEE.STRUCTCAT_I}sd", ] for file in os.listdir(tmp_dir): if file.endswith("sdf"): arguments.append(os.path.join(tmp_dir, file)) arguments.append(f"{_SEE.STRUCTCAT_O}{out_file_type}") arguments.append(os.path.join(tmp_dir, outfile)) return arguments def _concatenate_pv_files(self, tmp_dir: str): # create a poseviewer-formatted file with receptor structure, then docked ligand poses arguments = self._get_structcat_args( tmp_dir=tmp_dir, out_file_type="mae", outfile=_SFE.STRUCTCAT_MAEGZ_OUTFILE ) self._schrodinger_executor.execute( command=_SEE.STRUCTCAT, arguments=arguments, check=True ) def _analyse_map(self, tmp_dir): """run fmp_stats program to analyse map - generate node similarities etc""" result = self._schrodinger_executor.execute( command=_SEE.FMP_STATS, arguments=["out.fmp", "-f"], check=True, location=tmp_dir, ) log_lines = [] for line in str(result.stdout).split("\n"): self._logger_blank.log(line, _LE.INFO) log_lines.append(line + "\n") self.data.generic.add_file( GenericData(file_name="fep_mapper.log", file_data=log_lines) ) def _parse_output(self, tmp_dir: str): # needs to retrieve the edge and fmp files produced by the mapper step and attach to the generic dict files = [ os.path.join(tmp_dir, f) for f in os.listdir(tmp_dir) if f.endswith(("fmp", "edge", "log")) ] for file in files: try: with open(file, "r") as f: data = f.read() except UnicodeDecodeError: with open(file, "rb") as f: data = f.read() self._add_data_to_generic(file, data)
[docs] def execute(self): # run the job in a temporary directory tmp_dir = self._make_tmpdir() self._write_input_files(tmp_dir) self._concatenate_pv_files(tmp_dir) io_dict = { "": os.path.join(tmp_dir, _SFE.STRUCTCAT_MAEGZ_OUTFILE), "-o": _SFE.FEP_MAPPER_OUTPUT, } arguments = self._parse_arguments(io_dict=io_dict) self._apply_token_guard() # need to implement for reliability self._logger.log("Optimising perturbation map", _LE.DEBUG) self._backend_executor.execute( command=_FE.FEP_MAPPER, arguments=arguments, check=True, location=tmp_dir ) assert os.path.isfile(os.path.join(tmp_dir, "out.fmp")) self._logger.log( f"Successfully executed fep_mapper in directory {tmp_dir}.", _LE.DEBUG ) self._logger.log("Analysing the perturbation map.", _LE.DEBUG) self._analyse_map(tmp_dir) self._parse_output(tmp_dir) self._remove_temporary(tmp_dir)