Source code for icolos.core.workflow_steps.pmx.assemble_systems

from typing import Dict, List
from icolos.core.workflow_steps.pmx.base import StepPMXBase
from pydantic import BaseModel
import os
from icolos.utils.enums.program_parameters import PMXAtomMappingEnum, PMXEnum
from icolos.utils.execute_external.pmx import PMXExecutor
from icolos.utils.general.parallelization import SubtaskContainer

_PE = PMXEnum()
_PAE = PMXAtomMappingEnum()


[docs]class StepPMXAssembleSystems(StepPMXBase, BaseModel): """ Executes the assemble_systems.py script, edges are parallelized over available cores """ def __init__(self, **data): super().__init__(**data) self._initialize_backend(executor=PMXExecutor)
[docs] def execute(self): assert self.work_dir is not None and os.path.isdir(self.work_dir) # get edges from the perturbation map attached to the step edges = [e.get_edge_id() for e in self.get_edges()] # enforce one edge per task list (results in multiple batches for large maps) self.execution.parallelization.max_length_sublists = 1 self._subtask_container = SubtaskContainer( max_tries=self.execution.failure_policy.n_tries ) self._subtask_container.load_data(edges) self._execute_pmx_step_parallel( run_func=self._execute_command, step_id="pmx assemble_systems", result_checker=self._check_results, )
def _execute_command(self, jobs: List): args = { "-edges": '"' + " ".join([e for e in jobs]) + '"', "-ligand_path": os.path.join(self.work_dir, _PAE.LIGAND_DIR), "-workPath": self.work_dir, } self._backend_executor.execute( command=_PE.ASSEMBLE_SYSTEMS, arguments=self.get_arguments(defaults=args), check=True, location=self.work_dir, ) def _check_results(self, batch: List[List[str]]) -> List[List[bool]]: output_files = ["bound/init.pdb", "unbound/init.pdb"] results = [] for subjob in batch: subjob_results = [] for job in subjob: subjob_results.append( all( [ os.path.isfile(os.path.join(self.work_dir, job, f)) for f in output_files ] ) ) results.append(subjob_results) return results