Source code for icolos.core.workflow_steps.pmx.setup_workpath

import os
from typing import List
from pydantic import BaseModel
from icolos.core.containers.perturbation_map import Node
from icolos.core.workflow_steps.pmx.base import StepPMXBase
from icolos.utils.enums.step_enums import StepPMXSetupEnum
from icolos.utils.execute_external.execute import Executor
from icolos.utils.execute_external.gromacs import GromacsExecutor
from icolos.utils.general.parallelization import SubtaskContainer
from icolos.core.workflow_steps.step import _LE

_SPSE = StepPMXSetupEnum()
# These classes are based on the work of Vytautas Gapsys et al: https://github.com/deGrootLab/pmx/
[docs]class StepPMXSetup(StepPMXBase, BaseModel): """ Create the directory tree structure. Requires the pmx workflow to be executing using the single_dir running mode Operates on the perturbation map object, runs acpype on the written structures to produce the amber-compatible itp files Additional settings: :param int replicas: number of replicas to run for each edge, default=3 :param str charge_method: partial charge type, must be recognised by antechamber :param str boxshape: specify the boxshape to use in calculation setup, deafult = dodecahedron :param float boxd: spefify solvent box buffer dimention, default = 1.5 :param str water: specify water model, default = tip3p :param float conc: specify salt concentration, default=0.15 :param str forcefield: specify the forcefield for protein parametrisation. Must be findable in $GMXLIB :param str topology: specify perturbation map topology, default = "normal" """ _gromacs_executor: GromacsExecutor = None def __init__(self, **data): super().__init__(**data) self._gromacs_executor = GromacsExecutor( prefix_execution=self.execution.prefix_execution ) self._initialize_backend(executor=Executor)
[docs] def execute(self): # sets the number of replicas to be used throughput the pmx run replicas = self._get_additional_setting(_SPSE.REPLICAS, default=3) if self.work_dir is None: self.work_dir = self._make_tmpdir() self._logger.log(f"Set workflow directory to {self.work_dir}", _LE.DEBUG) self._construct_perturbation_map(self.work_dir, replicas) # create the directory structure for subsequent calculations edges = self.get_edges() nodes = self.get_nodes() # create the input directory to sit at the top level of the workdir, contains ligands, # mdp and protein topology files os.makedirs(os.path.join(self.work_dir, "input"), exist_ok=True) for folder in ["ligands", "mdp", "protein"]: os.makedirs(os.path.join(self.work_dir, "input", folder), exist_ok=True) # handle protein parametrization with pdb2gmx protein = ( self.get_workflow_object().workflow_data.perturbation_map.get_protein() ) protein.write(os.path.join(self.work_dir, "input/protein")) self._parametrise_protein(protein=protein.get_file_name(), path="input/protein") # remove the backup file old_protein = [ f for f in os.listdir(os.path.join(self.work_dir, "input/protein")) if f.endswith("#") ] # only want the parametrised processed pdb file in there old_protein.append(protein.get_file_name()) for f in old_protein: os.remove(os.path.join(self.work_dir, "input/protein", f)) self._clean_protein() mdp_dir = self.data.generic.get_argument_by_extension( ext="mdp", rtn_file_object=True ) mdp_dir.write(os.path.join(self.work_dir, "input/mdp")) # parallelize the antechamber call across the pool of nodes self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(nodes) self._execute_pmx_step_parallel( run_func=self._parametrise_nodes, step_id="pmx_setup", result_checker=self._check_results, ) # create the output folder structure for edge in edges: edgepath = os.path.join( self.work_dir, str(f"{edge.node_from.get_node_hash()}_{edge.node_to.get_node_hash()}"), ) hybridTopFolder = f"{edgepath}/hybridStrTop" os.makedirs(hybridTopFolder, exist_ok=True) # water/protein for wp in self.therm_cycle_branches: wppath = f"{edgepath}/{wp}" os.makedirs(wppath, exist_ok=True) # stateA/stateB for state in self.states: statepath = f"{wppath}/{state}" os.makedirs(statepath, exist_ok=True) # run1/run2/run3 for r in range(1, replicas + 1): runpath = f"{statepath}/run{r}" os.makedirs(runpath, exist_ok=True) # em/eq_posre/eq/transitions for sim in self.sim_types: simpath = f"{runpath}/{sim}".format(runpath, sim) os.makedirs(simpath, exist_ok=True)
def _check_results(self, batch: List[List[Node]]) -> List[List[bool]]: output_files = ["ffMOL.itp", "MOL.itp", "MOL.pdb"] results = [] for subjob in batch: subjob_results = [] for job in subjob: subjob_results.append( all( [ os.path.isfile( os.path.join( self.work_dir, "input/ligands", job.get_node_hash(), f, ) ) for f in output_files ] ) ) results.append(subjob_results) return results