Source code for icolos.core.steps_utils
from icolos.core.workflow_steps.step import StepBase
from icolos.utils.enums.step_enums import StepBaseEnum
from icolos.utils.general.convenience_functions import nested_get
from icolos.utils.enums.step_initialization_enum import StepInitializationEnum
from icolos.utils.enums.flow_control_enums import FlowControlInitializationEnum
_IE = StepInitializationEnum()
_FCE = FlowControlInitializationEnum()
[docs]def initialize_step_from_dict(step_conf: dict) -> StepBase:
"""Map the configuration from JSON to an initialized step object
:param dict step_conf: dictionary containing configuration for the step
:raises ValueError: raised if provided step_type does not correspond to known step
:return StepBase: Initialized step object with the provided config
"""
_STE = StepBaseEnum
step_type = nested_get(step_conf, _STE.STEP_TYPE, default=None)
step_type = None if step_type is None else step_type.upper()
if step_type in _IE.STEP_INIT_DICT.keys():
return _IE.STEP_INIT_DICT[step_type](**step_conf)
elif step_type in _FCE.FLOW_CONTROL_INIT_DICT.keys():
return _FCE.FLOW_CONTROL_INIT_DICT[step_type](**step_conf)
else:
raise ValueError(
f"Backend for step {nested_get(step_conf, _STE.STEPID, '')} unknown."
)