icolos.core.composite_agents package#
Submodules#
icolos.core.composite_agents.base_agent module#
- class icolos.core.composite_agents.base_agent.AgentEnvironmentParameters(*, export: List[WorkflowExportParameters] = [])[source]#
Bases:
BaseModel
- class WorkflowExportParameters(*, key: str, value: str)[source]#
Bases:
BaseModel
- key: str#
- value: str#
- export: List[WorkflowExportParameters]#
- class icolos.core.composite_agents.base_agent.AgentHeaderParameters(*, id: str = None, description: str = None, logging: AgentLoggingParameters = AgentLoggingParameters(logfile=None), environment: AgentEnvironmentParameters = None, global_variables: Dict = None, global_settings: AgentHeaderParametersSettings = AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False))[source]#
Bases:
BaseModel
- description: str#
- environment: AgentEnvironmentParameters#
- global_settings: AgentHeaderParametersSettings#
- global_variables: Dict#
- id: str#
- logging: AgentLoggingParameters#
- class icolos.core.composite_agents.base_agent.AgentHeaderParametersSettings(*, remove_temporary_files: bool = True, single_directory: bool = False)[source]#
Bases:
BaseModel
- remove_temporary_files: bool#
- single_directory: bool#
- class icolos.core.composite_agents.base_agent.BaseAgent(*, header: AgentHeaderParameters = AgentHeaderParameters(id=None, description=None, logging=AgentLoggingParameters(logfile=None), environment=None, global_variables=None, global_settings=AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False)))[source]#
Bases:
BaseModel
- header: AgentHeaderParameters#
icolos.core.composite_agents.scheduler module#
- class icolos.core.composite_agents.scheduler.Scheduler(*, header: SchedulerHeaderParameters = SchedulerHeaderParameters(id=None, description=None, logging=AgentLoggingParameters(logfile=None), environment=None, global_variables=None, global_settings=AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False)))[source]#
Bases:
BaseAgent
,BaseModel
Class to hold the whole logic for scheduling sub-jobs.
- header: SchedulerHeaderParameters#
- class icolos.core.composite_agents.scheduler.SchedulerHeaderParameters(*, id: str = None, description: str = None, logging: AgentLoggingParameters = AgentLoggingParameters(logfile=None), environment: AgentEnvironmentParameters = None, global_variables: Dict = None, global_settings: AgentHeaderParametersSettings = AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False))[source]#
Bases:
AgentHeaderParameters
,BaseModel
- description: str#
- environment: AgentEnvironmentParameters#
- global_settings: AgentHeaderParametersSettings#
- global_variables: Dict#
- id: str#
- logging: AgentLoggingParameters#
icolos.core.composite_agents.workflow module#
- class icolos.core.composite_agents.workflow.WorkFlow(*, header: WorkflowHeaderParameters = WorkflowHeaderParameters(id=None, description=None, logging=AgentLoggingParameters(logfile=None), environment=None, global_variables=None, global_settings=AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False)), steps: List[Dict] = [], workflow_data: WorkflowData = WorkflowData(work_dir=None, perturbation_map=None))[source]#
Bases:
BaseAgent
,BaseModel
Class to hold the whole logic for a workflow.
- get_perturbation_map() PerturbationMap [source]#
- header: WorkflowHeaderParameters#
- set_perturbation_map(p_map: PerturbationMap) None [source]#
- steps: List[Dict]#
- workflow_data: WorkflowData#
- class icolos.core.composite_agents.workflow.WorkflowData(*, work_dir: str = None, perturbation_map: PerturbationMap = None)[source]#
Bases:
BaseModel
- perturbation_map: PerturbationMap#
- work_dir: str#
- class icolos.core.composite_agents.workflow.WorkflowHeaderParameters(*, id: str = None, description: str = None, logging: AgentLoggingParameters = AgentLoggingParameters(logfile=None), environment: AgentEnvironmentParameters = None, global_variables: Dict = None, global_settings: AgentHeaderParametersSettings = AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False))[source]#
Bases:
AgentHeaderParameters
,BaseModel
- description: str#
- environment: AgentEnvironmentParameters#
- global_settings: AgentHeaderParametersSettings#
- global_variables: Dict#
- id: str#
- logging: AgentLoggingParameters#