Source code for icolos.core.workflow_steps.pmx.atomMapping

from typing import Dict, List
from icolos.core.containers.perturbation_map import Edge
from icolos.core.workflow_steps.pmx.base import StepPMXBase
from pydantic import BaseModel
from icolos.utils.execute_external.pmx import PMXExecutor
import os
from icolos.utils.enums.program_parameters import PMXEnum, PMXAtomMappingEnum
from icolos.utils.general.parallelization import SubtaskContainer

_PE = PMXEnum()
_PAE = PMXAtomMappingEnum()


[docs]class StepPMXatomMapping(StepPMXBase, BaseModel): """Ligand alchemy: map atoms for morphing.""" def __init__(self, **data): super().__init__(**data) self._initialize_backend(executor=PMXExecutor) def _prepare_arguments(self, args, output_dir): # prepare the final set of arguments as a list prepared_args = [] default_args = { "-o1": f"{output_dir}/pairs1.dat", "-o2": f"{output_dir}/pairs2.dat", "-opdb1": f"{output_dir}/out_pdb1.pdb", "-opdb2": f"{output_dir}/out_pdb2.pdb", "-opdbm1": f"{output_dir}/out_pdbm1.pdb", "-opdbm2": f"{output_dir}/out_pdbm2.pdb", "-score": f"{output_dir}/score.dat", "-log": f"{output_dir}/mapping.log", } for key, value in args.items(): default_args[key] = value for key, value in default_args.items(): prepared_args.append(key), prepared_args.append(value) return prepared_args def _execute_command(self, jobs: List[str]): edge = jobs[0] parts = edge.split("_") lig1 = parts[0] lig2 = parts[1] # write them to the right dir as a pdb from the outset arguments = { "-i1": os.path.join( self.work_dir, _PAE.LIGAND_DIR, lig1, "MOL.pdb", ), "-i2": os.path.join( self.work_dir, _PAE.LIGAND_DIR, lig2, "MOL.pdb", ), } output_dir = os.path.join(self.work_dir, edge, _PE.HYBRID_STR_TOP) arguments = self._prepare_arguments(args=arguments, output_dir=output_dir) self._backend_executor.execute( command=_PE.ATOMMAPPING, arguments=arguments, check=True, location=self.work_dir, )
[docs] def execute(self): # check the workflow has been configured correctly to use a shared work_dir assert self.work_dir is not None and os.path.isdir(self.work_dir) edges = [e.get_edge_id() for e in self.get_edges()] # enforce single edge per job queue self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(edges) self._execute_pmx_step_parallel( run_func=self._execute_command, step_id="atomMapping", result_checker=self._check_result, )
def _check_result(self, batch: List[List[Edge]]) -> List[List[bool]]: """ Look in each hybridStrTop dir and check the output pdb files exist for the edges """ output_files = ["pairs1.dat", "pairs2.dat", "out_pdb1.pdb", "out_pdb2.pdb"] results = [] for subjob in batch: subjob_results = [] for job in subjob: subjob_results.append( all( [ os.path.isfile( os.path.join(self.work_dir, job, "hybridStrTop", f) ) for f in output_files ] ) ) results.append(subjob_results) return results