Source code for icolos.core.workflow_steps.pmx.assemble_systems
from typing import Dict, List
from icolos.core.workflow_steps.pmx.base import StepPMXBase
from pydantic import BaseModel
import os
from icolos.utils.enums.program_parameters import PMXAtomMappingEnum, PMXEnum
from icolos.utils.execute_external.pmx import PMXExecutor
from icolos.utils.general.parallelization import SubtaskContainer
_PE = PMXEnum()
_PAE = PMXAtomMappingEnum()
[docs]class StepPMXAssembleSystems(StepPMXBase, BaseModel):
    """
    Executes the assemble_systems.py script, edges are parallelized over available cores
    """
    def __init__(self, **data):
        super().__init__(**data)
        self._initialize_backend(executor=PMXExecutor)
[docs]    def execute(self):
        assert self.work_dir is not None and os.path.isdir(self.work_dir)
        # get edges from the perturbation map attached to the step
        edges = [e.get_edge_id() for e in self.get_edges()]
        # enforce one edge per task list (results in multiple batches for large maps)
        self.execution.parallelization.max_length_sublists = 1
        self._subtask_container = SubtaskContainer(
            max_tries=self.execution.failure_policy.n_tries
        )
        self._subtask_container.load_data(edges)
        self._execute_pmx_step_parallel(
            run_func=self._execute_command,
            step_id="pmx assemble_systems",
            result_checker=self._check_results,
        )
    def _execute_command(self, jobs: List):
        args = {
            "-edges": '"' + " ".join([e for e in jobs]) + '"',
            "-ligand_path": os.path.join(self.work_dir, _PAE.LIGAND_DIR),
            "-workPath": self.work_dir,
        }
        self._backend_executor.execute(
            command=_PE.ASSEMBLE_SYSTEMS,
            arguments=self.get_arguments(defaults=args),
            check=True,
            location=self.work_dir,
        )
    def _check_results(self, batch: List[List[str]]) -> List[List[bool]]:
        output_files = ["bound/init.pdb", "unbound/init.pdb"]
        results = []
        for subjob in batch:
            subjob_results = []
            for job in subjob:
                subjob_results.append(
                    all(
                        [
                            os.path.isfile(os.path.join(self.work_dir, job, f))
                            for f in output_files
                        ]
                    )
                )
            results.append(subjob_results)
        return results