Source code for icolos.core.flow_control.iterator

from typing import Dict, List, Union
from pydantic import BaseModel
from icolos.core.composite_agents.workflow import WorkFlow

from icolos.core.flow_control.flow_control import BaseStepConfig, FlowControlBase
from copy import deepcopy
from icolos.core.workflow_steps.step import _LE
from icolos.core.step_dispatch.dispatcher import StepDispatcher
from icolos.utils.enums.composite_agents_enums import WorkflowEnum
from icolos.utils.enums.step_enums import StepBaseEnum
from icolos.core.workflow_steps.step import StepBase
from icolos.utils.enums.step_enums import IteratorEnum
from icolos.core.composite_agents.workflow import WorkflowHeaderParameters
import os

_IE = IteratorEnum
_SBE = StepBaseEnum
_WE = WorkflowEnum()


[docs]class IterSettingsParameters(BaseModel): # unpacked version of StepSettingsParameters flags: List = [] parameters: Dict = {} additional: Dict = {} work_dir: Union[List, str] = []
[docs]class IterParallelizer(BaseModel): # if true, steps must be totally independent, the iterator step parallelize: bool = False jobs: int = 1 dependent_steps: int = None
[docs]class IterSettings(BaseModel): # dictionary of settings to change # settings: {step_id: {IterSettingsParameters}} settings: Dict[str, IterSettingsParameters] = {} iter_mode: _IE = _IE.N_ITERS n_iters: int = None remove_temporary_files: bool = True single_directory: bool = False parallelizer_settings: IterParallelizer = IterParallelizer()
[docs]class StepIterator(FlowControlBase, BaseModel): """ Implements iterator mechanism: wraps one or multiple steps and generates n copies of that set of steps according to iter_config Becomes master job when parallize=True, using Icolos JobControl to interface with external resources """ # holds the dict of iterables for the bits to chang iter_settings: IterSettings = IterSettings() dispatcher: StepDispatcher = None def __init__(self, **data): super().__init__(**data) self.dispatcher = self._initialize_workflows() def _modify_settings( self, settings: BaseStepConfig, step_config: BaseStepConfig, i: int ): base_conf = deepcopy(step_config) iter_settings = deepcopy(settings) if settings.flags: settings.flags = {"flags": settings.flags} # iterate over the flags # if it's been converted, hence there are flags to be applied base_conf.settings.arguments.flags.append(iter_settings.flags.values()[i]) for ( key, val, ) in iter_settings.parameters.items(): base_conf.settings.arguments.parameters[key] = val[i] for ( key, val, ) in iter_settings.additional.items(): # however many lists of n items base_conf.settings.additional[key] = val[i] return base_conf def _initialize_n_iters(self) -> List[WorkFlow]: """ Initialise n identical copies of the same step config """ workflows = [] for i in range(self.iter_settings.n_iters): workflow_steps = [] list_step_conf = deepcopy(self.base_config) # hand all steps over to the config updater step_confs = self._update_config(list_step_conf, f"run_{i}") for step in step_confs: workflow_steps.append(step.as_dict()) wf_config = { _WE.HEADER: { _WE.ID: f"workflow_{i}", _WE.ENVIRONMENT: {_WE.ENVIRONMENT_EXPORT: []}, _WE.GLOBAL_VARIABLES: {}, _WE.GLOBAL_SETTINGS: { "remove_temporary_files": self.iter_settings.remove_temporary_files, "single_directory": self.iter_settings.single_directory, }, }, _WE.STEPS: workflow_steps, } # manually set some things for these workflows workflows.append(WorkFlow(**wf_config)) return workflows def _initialize_settings(self) -> List: """ Iterate through all settings step-wise, changing all setting blocks simultaneously, returning n initialised steps for n """ init_steps = [] for i in range(self.iter_settings.n_iters): # iterate over the steps in the base config, and the corresponding settings, if these are to be modified step_sublist = [] for step_config in deepcopy(self.base_config): # check if we need to iterate through settings in this step, else just use the base config if step_config.step_id in self.iter_settings.settings.keys(): settings = self.iter_settings.settings[step_config.step_id] step_sublist.append(self._modify_settings(settings, step_config, i)) else: step_sublist.append(step_config) # update all configs with references to updated step_ids etc formatted_configs = self._update_config(step_sublist, f"run_{i}") for step_conf in formatted_configs: initialized_step = self._initialize_step_from_dict(step_conf._as_dict()) init_steps.append(initialized_step) return init_steps def _initialize_workflows(self) -> StepDispatcher: """ Handle step init according to config Returns a Step-like JobControl that holds a list of separate workflows, which will be executed according to the parallelization scheme. """ workflows = [] if self.iter_settings.iter_mode == _IE.N_ITERS: # simplest mode, generate n independent workflows with the same set of steps workflows += self._initialize_n_iters() elif self.iter_settings.iter_mode == _IE.SINGLE: raise NotImplementedError else: raise NotImplementedError self._logger.log( f"Initialized {len(workflows)} jobs for step {self.base_config[0].step_id}", _LE.DEBUG, ) dispatcher = StepDispatcher( step_id="step_dispatcher", type=_SBE.STEP_DISPATCHER, workflows=workflows, parallel_execution=self.iter_settings.parallelizer_settings, ) return dispatcher def _update_config( self, step_conf: List[BaseStepConfig], run_id: str ) -> List[BaseStepConfig]: """ Manages modifications to each step in the config: * writeout paths are updated to separate output from each of the runs """ formatted_confs = [] for conf in step_conf: # modify the writeout paths: add a key_value dir the writeout path for block in conf.writeout: if block.destination.resource is not None: resource = block.destination.resource parts = resource.split("/") new_resource = os.path.join("/".join(parts[:-1]), run_id, parts[-1]) block.destination.resource = new_resource formatted_confs.append(conf) return formatted_confs