Source code for icolos.core.flow_control.flow_control

from typing import List
from pydantic import BaseModel, PrivateAttr
from icolos.core.workflow_steps.step import StepSettingsParameters
from icolos.core.workflow_steps.step import StepBase
from icolos.loggers.steplogger import StepLogger
from icolos.core.workflow_steps.step import (
    StepData,
    StepInputParameters,
    StepWriteoutParameters,
    StepExecutionParameters,
)


[docs]class BaseStepConfig(BaseModel): """ Minimal template class for the base config, without the unnecessary stuff that StepBase requires """ step_id: str = None work_dir: str = None type: str = None data: StepData = StepData() input: StepInputParameters = StepInputParameters() writeout: List[StepWriteoutParameters] = [] execution: StepExecutionParameters = StepExecutionParameters() settings: StepSettingsParameters = StepSettingsParameters()
[docs] def as_dict(self): return { "step_id": self.step_id, "type": self.type, "execution": self.execution, "settings": self.settings, "work_dir": self.work_dir, "data": self.data, "input": self.input, "writeout": self.writeout, }
[docs]class FlowControlBase(BaseModel): # List of steps to be iterated over, each set needs their inputs chained together base_config: List[BaseStepConfig] = None initialized_steps: List[StepBase] = None _logger = PrivateAttr() def __init__(self, **data) -> None: super().__init__(**data) self._logger = StepLogger()