import tempfile
from typing import Callable, List
from icolos.core.containers.generic import GenericData
from icolos.core.containers.gmx_state import GromacsState
from icolos.utils.enums.execution_enums import ExecutionPlatformEnum
from icolos.utils.enums.step_enums import StepGromacsEnum
from icolos.utils.enums.program_parameters import GromacsEnum
from icolos.core.workflow_steps.gromacs.base import StepGromacsBase
from pydantic import BaseModel
from icolos.core.workflow_steps.step import _LE
import os
from icolos.utils.execute_external.gromacs import GromacsExecutor
from icolos.utils.general.parallelization import Parallelizer, SubtaskContainer
_GE = GromacsEnum()
_SGE = StepGromacsEnum()
_ERE = ExecutionPlatformEnum
[docs]class StepGMXMDrun(StepGromacsBase, BaseModel):
"""
Launch gmx mdrun
"""
topol: GromacsState = None
def __init__(self, **data):
super().__init__(**data)
self._initialize_backend(executor=GromacsExecutor)
self._check_backend_availability()
def _get_log_file(self, tmp_dir):
"""
Find and parse the log file
"""
log_file = [f for f in os.listdir(tmp_dir) if f.endswith(".log")]
assert len(log_file) == 1
with open(os.path.join(tmp_dir, log_file[0]), "r") as f:
data = f.readlines()
return data
def _tail_log_file(self, tmp_dir):
"""
Log the last 50 lines of the log file to capture performance metrics from the run
"""
log_file = self._get_log_file(tmp_dir)
for line in log_file[-50:]:
self._logger_blank.log(line, _LE.INFO)
[docs] def execute_mdrun(self, path: str, index: int):
"""
Make a single call to mdrun
"""
flag_dict = (
{
"-s": _SGE.STD_TPR,
"-c": _SGE.STD_STRUCTURE,
"-e": _SGE.STD_EDR,
"-cpo": _SGE.STD_CPT,
"-x": _SGE.STD_XTC,
}
if not self.data.generic.get_files_by_extension("cpt")
else {"-cpi", os.path.join(path, "state.cpt")}
)
arguments = self._parse_arguments(flag_dict)
self._backend_executor.execute(
command=_GE.MDRUN, arguments=arguments, location=path, check=True
)
[docs] def execute_parallel_simulations(self, work_dirs, run_func: Callable):
# attach the index of the workdir
work_dirs = [(idx, wkdir) for idx, wkdir in enumerate(work_dirs)]
self._subtask_container = SubtaskContainer(
max_tries=self.execution.failure_policy.n_tries
)
self._subtask_container.load_data(work_dirs)
parallelizer = Parallelizer(func=run_func)
n = 1
while self._subtask_container.done() is False:
next_batch = self._get_sublists(get_first_n_lists=self._get_number_cores())
_ = [sub.increment_tries() for element in next_batch for sub in element]
_ = [sub.set_status_failed() for element in next_batch for sub in element]
paths, indices = self.prepare_jobs(next_batch)
parallelizer.execute_parallel(path=paths, index=indices)
n += 1
[docs] def prepare_jobs(self, batch) -> List[tuple]:
paths, indices = [], []
for task in batch:
for element in task:
# tuple of (idx, dirpath)
paths.append(element.data[1])
indices.append(element.data[0])
return paths, indices
[docs] def run_single_tpr(self, tmp_dir: str):
"""
Normal gmx mdrun call, if multiple structures are loaded into the topology, run them in parallel according to the parallelizer settings
"""
# if we have multiple structures, run the simulations externally, in parallel
work_dirs = [tempfile.mkdtemp(dir=tmp_dir) for _ in range(len(self.topol.tprs))]
# prepare tmpdirs with tpr files
for path, tpr in zip(work_dirs, self.topol.tprs.values()):
tpr.write(path)
# if > 1, instantiate a parallelizer, load the paths in and execute in parallel, user should be using the slurm/SGE interface to request extern resources
if len(work_dirs) > 1:
self.execute_parallel_simulations(work_dirs, run_func=self.execute_mdrun)
else:
tmp_dir = work_dirs[0]
self.execute_mdrun(tmp_dir, index=0)
# now parse the outputs
for index, path in enumerate(work_dirs):
# set a structure other than confout.gro e.g. if a pdb output has been set
struct = (
self.settings.arguments.parameters["-c"]
if "-c" in self.settings.arguments.parameters.keys()
else _SGE.STD_STRUCTURE
)
self.topol.set_structure(path, file=struct, index=index)
self.topol.set_trajectory(path, index=index)
self.topol.set_log(path, index=index)
self.topol.set_edr(path, index=index)
try:
self.topol.set_cpt(path, index=index)
except FileNotFoundError:
self._logger.log("No checkpoint file generated", _LE.DEBUG)
[docs] def run_multidir_sim(self, tmp_dir: str):
"""
Runs a multidir simulation, allowing for replex simulations. Several conditions are required for this running mode
1) the previous step in the workflow should have been an iterator to produce n tpr files. This must have been run with single_dir mode ON and remove_temprorary_files OFF, so we can extract files from those workflows' tmpdirs
"""
if not self.execution.platform == _ERE.SLURM:
self._logger.log(
"WARNING: Running HREX simulation without external resources! Normally this should be run as a separate batch job",
_LE.WARNING,
)
# extract the tprs from the topol object, write to separate tmpdirs
work_dirs = [tempfile.mkdtemp(dir=tmp_dir) for _ in range(len(self.topol.tprs))]
self._logger.log(
f"Initiating gmx multidir run in directories {', '.join(work_dirs)}",
_LE.DEBUG,
)
for path, tpr in zip(work_dirs, self.topol.tprs.values()):
tpr.write(path)
# note, this must be a multiple of the number of simulations
tasks = self.execution.resources.tasks
# map the PP and PME tasks to the GPUs
command = f"mpirun -np {tasks} gmx_mpi mdrun -multidir {' '.join(work_dirs)}"
arguments = self._parse_arguments(flag_dict={"-x": _SGE.STD_XTC})
self._backend_executor.execute(
command=command, arguments=arguments, location=tmp_dir, check=True
)
# udpate the structures to the new coordinates
for i, work_dir in enumerate(work_dirs):
self.topol.set_structure(work_dir, index=i)
self.topol.set_trajectory(work_dir, index=i)
self.topol.set_tpr(work_dir, index=i)
self.topol.set_log(work_dir, index=i)
self.topol.set_edr(path, index=i)
try:
self.topol.set_cpt(path, index=i)
except FileNotFoundError:
self._logger.log("No checkpoint file generated", _LE.DEBUG)
[docs] def execute(self):
tmp_dir = self._make_tmpdir()
self.topol = self.get_topol()
if self.data.generic.get_files_by_extension("cpt"):
# a cpt file has been passed, simply restart
print(self.data.generic.get_file_names_by_extension("gpt"))
self._run_checkpoint_files(self.data.generic.get_files_by_extension("cpt"))
self.execution.parallelization.max_length_sublists = 1
# pickle the topol to the mdrun dir, if something goes wrong/the job dies, the workflow can be picked up where we left off by unpickling the topology object
self.pickle_topol(self.topol, tmp_dir)
multidir = self._get_additional_setting(_SGE.MULTIDIR, default=False)
if multidir:
self.run_multidir_sim(tmp_dir)
else:
self.run_single_tpr(tmp_dir)
self._remove_temporary(tmp_dir)