Source code for icolos.core.step_dispatch.dispatcher

from typing import List
from pydantic.main import BaseModel
from icolos.core.workflow_steps.step import StepBase
from icolos.utils.general.parallelization import Parallelizer, SubtaskContainer
from icolos.core.workflow_steps.step import _LE


[docs]class IterParallelizer(BaseModel): # config block controlling how the steps are parallelized # if you are executing a 5 step workflow with 10 repeats, dependent_steps = 5, cores = 10 # this will allow each independent replica to be allocated to a single job queue, retaining step order parallelize: bool = False jobs: int = 1 dependent_steps: int = None
[docs]class StepDispatcher(StepBase, BaseModel): """ Step class containing job control functionality required for StepIterator, supports Slurm for job scheduling Supports running Icolos process as master job for parallel step execution on cluster. Generates a pool of initialized steps to be executed, based on the Step-type class for disaptching multiple steps in parallel, useful for executing multiple batch jobs simultaneously """ workflows: List = [] # expect the parallel execution block to be handed over from flow control parallel_execution: IterParallelizer = IterParallelizer() def __init__(self, **data): super().__init__(**data) def _prepare_batch(self, batch) -> List[List[StepBase]]: batch_steps = [] for sublist in batch: sublist_steps = [] for task in sublist: sublist_steps.append(task.data) batch_steps.append(sublist_steps) return batch_steps
[docs] def execute(self): """ Execute multiple steps in parallel """ # Spin up multiple processes. self.execution.parallelization.jobs = self.parallel_execution.jobs # TODO, we can repeat entire workflows if we want, I'm not sure this makes sense though self._subtask_container = SubtaskContainer(max_tries=1) self._subtask_container.load_data(self.workflows) parallelizer = Parallelizer(func=self.execute_workflow) n = 1 while self._subtask_container.done() is False: next_batch = self._get_sublists( get_first_n_lists=self.parallel_execution.jobs ) # return n lists of length max_sublist_length _ = [sub.increment_tries() for element in next_batch for sub in element] _ = [sub.set_status_failed() for element in next_batch for sub in element] self._logger.log( f"Starting {len(next_batch)} parallel jobs under Icolos JobControl, execution batch {n}", _LE.INFO, ) jobs = self._prepare_batch(next_batch) parallelizer.execute_parallel(jobs=jobs) # TODO: sucessful execution of each step is not explicitly checked, # the step is responsible for throwing errors if something has gone wrong for task in next_batch: for subtask in task: subtask.set_status_success()
[docs] def execute_workflow(self, jobs): # submits then monitors the step wf_data = self.get_workflow_object().workflow_data for idx, job in enumerate(jobs): # copy existing wf data up to this point int othe new wf object job.initialize() job.workflow_data = wf_data self._logger.log(f"Executing workflow {idx} of {len(jobs)}", _LE.DEBUG) job.execute()