Source code for icolos.core.workflow_steps.pmx.run_analysis

from typing import Dict, List
from icolos.core.containers.compound import Compound, Enumeration
from icolos.core.containers.perturbation_map import Edge
from icolos.core.workflow_steps.pmx.base import StepPMXBase
from pydantic import BaseModel
from icolos.core.workflow_steps.step import _LE
from icolos.utils.execute_external.pmx import PMXExecutor
from icolos.utils.general.parallelization import SubtaskContainer
import numpy as np
import glob
import pandas as pd
import os

[docs]class StepPMXRunAnalysis(StepPMXBase, BaseModel): """ Analyses map, returns both a summary and a full results dataframe, written to top level of work_dir, and attaches properties to the compound """ results_summary: pd.DataFrame = None results_all: pd.DataFrame = None
[docs] class Config: arbitrary_types_allowed = True
def __init__(self, **data): super().__init__(**data) self._initialize_backend(executor=PMXExecutor) self.results_summary = pd.DataFrame() self.results_all = pd.DataFrame()
[docs] def execute(self): edges = [e.get_edge_id() for e in self.get_edges()] self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(edges) self._execute_pmx_step_parallel( run_func=self.run_analysis, step_id="pmx_run_analysis", result_checker=self._check_result, ) self.analysis_summary(self.get_edges()) # reattach compounds from perturbation map to step for writeout # REINVENT expects the same number of compounds back, if they failed to dock, they need to report a 0.00 score # discard the hub compound = self.get_perturbation_map().compounds[1:]
# output_conf = edge.node_to.conformer # enum: Enumeration = output_conf.get_enumeration_object() # comp: Compound = enum.get_compound_object() # print(enum, comp) #[comp.get_compound_number()].get_enumerations()[ # enum.get_enumeration_id() # ].get_conformers()[0].set_molecule(output_conf.get_molecule()) # # print(comp, enum, conf) # # match the output conformer to the compounds attached to the step from docking # #[int(comp)].get_enumerations[int(enum)].get_conformers[ # # int(conf) # # ] = output_conf # self._logger.log( # f"attached conf to step data for {output_conf.get_index_string()}", # _LE.DEBUG, # ) # Edges that failed will have 0.00 attached, compounds that failed to dock and were never part of the map will get caught by the writeout method and set to 0.00 # # the hub compound will not have data attached, this will be pruned here # all_confs = comp.unroll_conformers() # attached_prop = False # # check all conformers attached to compound, if ddG tag was attached, append to step compounds # if any(["ddG" in conf.get_molecule().GetPropNames() for conf in all_confs]): # # # self._logger.log(f"Failed to attach compound {comp.get_index_string()}, error was {e}", _LE.WARNING) # continue def _run_analysis_script(self, analysispath, stateApath, stateBpath): fA = " ".join(glob.glob("{0}/*xvg".format(stateApath))) fB = " ".join(glob.glob("{0}/*xvg".format(stateBpath))) oA = "integ0.dat" oB = "integ1.dat" wplot = "wplot.png" o = "results.txt" # TODO: at the moment we ignore flags from the command line # args = " ".join(self.settings.arguments.flags) cmd = "$PMX analyse" args = [ "--quiet", "-fA", fA, "-fB", fB, "-o", o, "-oA", oA, "-oB", oB, "-w", wplot, "-t", 298, "-b", 100, ] self._backend_executor.execute( command=cmd, arguments=args, location=analysispath, check=False ) def _read_neq_results(self, fname): try: with open(fname, "r") as fp: lines = fp.readlines() except FileNotFoundError: return out = [] for l in lines: l = l.rstrip() foo = l.split() if "BAR: dG" in l: out.append(float(foo[-2])) elif "BAR: Std Err (bootstrap)" in l: out.append(float(foo[-2])) elif "BAR: Std Err (analytical)" in l: out.append(float(foo[-2])) elif "0->1" in l: out.append(int(foo[-1])) elif "1->0" in l: out.append(int(foo[-1])) return out def _fill_resultsAll(self, res, edge, wp, r): try: rowName = "{0}_{1}_{2}".format(edge, wp, r) self.results_all.loc[rowName, "val"] = res[2] self.results_all.loc[rowName, "err_analyt"] = res[3] self.results_all.loc[rowName, "err_boot"] = res[4] self.results_all.loc[rowName, "framesA"] = res[0] self.results_all.loc[rowName, "framesB"] = res[1] except IndexError: self._logger.log( f"Index Error encountered whilst parsing results to summary file for job {edge}/{wp}/{r}", _LE.WARNING, ) def _summarize_results(self, edges: List[Edge]): fail_score = self._get_additional_setting("fail_score", default=100.0) bootnum = 1000 for edge in edges: try: for wp in self.therm_cycle_branches: dg = [] erra = [] errb = [] distra = [] distrb = [] for r in range(1, self.get_perturbation_map().replicas + 1): rowName = "{0}_{1}_{2}".format(edge.get_edge_id(), wp, r) dg.append(self.results_all.loc[rowName, "val"]) erra.append(self.results_all.loc[rowName, "err_analyt"]) errb.append(self.results_all.loc[rowName, "err_boot"]) distra.append( np.random.normal( self.results_all.loc[rowName, "val"], self.results_all.loc[rowName, "err_analyt"], size=bootnum, ) ) distrb.append( np.random.normal( self.results_all.loc[rowName, "val"], self.results_all.loc[rowName, "err_boot"], size=bootnum, ) ) rowName = "{0}_{1}".format(edge.get_edge_id(), wp) distra = np.array(distra).flatten() distrb = np.array(distrb).flatten() if self.get_perturbation_map().replicas == 1: self.results_all.loc[rowName, "val"] = dg[0] self.results_all.loc[rowName, "err_analyt"] = erra[0] self.results_all.loc[rowName, "err_boot"] = errb[0] else: self.results_all.loc[rowName, "val"] = np.mean(dg) self.results_all.loc[rowName, "err_analyt"] = np.sqrt( np.var(distra) / float(self.get_perturbation_map().replicas) ) self.results_all.loc[rowName, "err_boot"] = np.sqrt( np.var(distrb) / float(self.get_perturbation_map().replicas) ) # also collect self.results_summary rowNameWater = "{0}_{1}".format(edge.get_edge_id(), "unbound") rowNameProtein = "{0}_{1}".format(edge.get_edge_id(), "bound") dg = ( self.results_all.loc[rowNameProtein, "val"] - self.results_all.loc[rowNameWater, "val"] ) edge.ddG = dg try: edge.node_to.get_conformer().get_molecule().SetProp("ddG", str(dg)) self._logger.log( f"Attached score {dg} to conformer {edge.node_to.get_conformer().get_index_string()}", _LE.DEBUG, ) except AttributeError as e: self._logger.log( f"Could not attach score to mol for edge {edge.get_edge_id()}, defaulting to {fail_score}", _LE.WARNING, ) edge.node_to.get_conformer().get_molecule().SetProp( "ddG", str(fail_score) ) erra = np.sqrt( np.power(self.results_all.loc[rowNameProtein, "err_analyt"], 2.0) + np.power(self.results_all.loc[rowNameWater, "err_analyt"], 2.0) ) edge.ddG_err = erra errb = np.sqrt( np.power(self.results_all.loc[rowNameProtein, "err_boot"], 2.0) + np.power(self.results_all.loc[rowNameWater, "err_boot"], 2.0) ) rowName = edge.get_edge_id() self.results_summary.loc[rowName, "lig1"] = edge.get_edge_id().split( "_" )[0] self.results_summary.loc[rowName, "lig2"] = edge.get_edge_id().split( "_" )[1] self.results_summary.loc[rowName, "val"] = dg self.results_summary.loc[rowName, "err_analyt"] = erra self.results_summary.loc[rowName, "err_boot"] = errb except KeyError as e: print(f"Error in generating summary, error was {e}")
[docs] def analysis_summary(self, edges: List[Edge]): edge_ids = [e.get_edge_id() for e in edges] for edge in edge_ids: for r in range(1, self.get_perturbation_map().replicas + 1): for wp in self.therm_cycle_branches: analysispath = "{0}/analyse{1}".format( self._get_specific_path( workPath=self.work_dir, edge=edge, wp=wp ), r, ) resultsfile = "{0}/results.txt".format(analysispath) res = self._read_neq_results(resultsfile) if res is not None: self._fill_resultsAll(res, edge, wp, r) # the values have been collected now # let's calculate ddGs self._summarize_results(edges) # compare with experimental results automatically if provided try: if "exp_results" in self.settings.additional.keys() and os.path.isfile( self.settings.additional["exp_results"] ): exp_data = pd.read_csv( self.settings.additional["exp_results"], converters={"Ligand": lambda x: str(x).split(".")[0]}, ) # compute the experimental ddG and append to resultsSummary node_data = self.get_perturbation_map().node_df self.results_summary["exp_ddG"] = self.results_summary.apply( lambda x: np.array( self.compute_exp_ddG( x["lig1"], x["lig2"], node_data=node_data, exp_data=exp_data ) ), axis=1, ) except Exception as e: self._logger.log( f"Failed to compute experimental results, error was: {e}", _LE.WARNING ) # final write to disk self.results_summary.to_csv(os.path.join(self.work_dir, "resultsSummary.csv")) self.results_all.to_csv(os.path.join(self.work_dir, "resultsAll.csv"))
[docs] def compute_exp_ddG( self, lig1: str, lig2: str, node_data: pd.DataFrame, exp_data: pd.DataFrame ) -> float: """ Compute the ddG between two ligands from experimental data """ lig1_id = ( node_data.loc[node_data["hash_id"] == lig1]["node_id"] .to_list()[0] .replace(" ", "") ) lig2_id = ( node_data.loc[node_data["hash_id"] == lig2]["node_id"] .to_list()[0] .replace(" ", "") ) lig1_dG = float( exp_data.loc[exp_data["Ligand"] == lig1_id]["Exp. ΔG"].tolist()[0] ) lig2_dG = float( exp_data.loc[exp_data["Ligand"] == lig2_id]["Exp. ΔG"].tolist()[0] ) return lig2_dG - lig1_dG
[docs] def run_analysis(self, jobs: List[str]): for idx, edge in enumerate(jobs): for r in range(1, self.get_perturbation_map().replicas + 1): # ligand wp = "unbound" analysispath = "{0}/analyse{1}".format( self._get_specific_path(workPath=self.work_dir, edge=edge, wp=wp), r, ) os.makedirs(analysispath, exist_ok=True) stateApath = self._get_specific_path( workPath=self.work_dir, edge=edge, wp=wp, state="stateA", r=r, sim="transitions", ) stateBpath = self._get_specific_path( workPath=self.work_dir, edge=edge, wp=wp, state="stateB", r=r, sim="transitions", ) self._run_analysis_script(analysispath, stateApath, stateBpath) # protein wp = "bound" analysispath = "{0}/analyse{1}".format( self._get_specific_path(workPath=self.work_dir, edge=edge, wp=wp), r, ) os.makedirs(analysispath, exist_ok=True) stateApath = self._get_specific_path( workPath=self.work_dir, edge=edge, wp=wp, state="stateA", r=r, sim="transitions", ) stateBpath = self._get_specific_path( workPath=self.work_dir, edge=edge, wp=wp, state="stateB", r=r, sim="transitions", ) self._run_analysis_script(analysispath, stateApath, stateBpath)
def _check_result(self, batch: List[List[str]]) -> List[List[bool]]: """ Look in each hybridStrTop dir and check the output pdb files exist for the edges """ output_files = ["integ0.dat", "integ1.dat", "results.txt", "wplot.png"] analyse_folders = [ f"analyse{i}" for i in range(1, self.get_perturbation_map().replicas + 1) ] results = [] for subjob in batch: subjob_results = [] for job in subjob: subjob_results.append( all( [ os.path.isfile( os.path.join(self.work_dir, job, branch, folder, f) ) for f in output_files for branch in self.therm_cycle_branches for folder in analyse_folders ] ) ) results.append(subjob_results) return results