icolos.core.composite_agents package#

Submodules#

icolos.core.composite_agents.base_agent module#

class icolos.core.composite_agents.base_agent.AgentEnvironmentParameters(*, export: List[WorkflowExportParameters] = [])[source]#

Bases: BaseModel

class WorkflowExportParameters(*, key: str, value: str)[source]#

Bases: BaseModel

key: str#
value: str#
export: List[WorkflowExportParameters]#
class icolos.core.composite_agents.base_agent.AgentHeaderParameters(*, id: str = None, description: str = None, logging: AgentLoggingParameters = AgentLoggingParameters(logfile=None), environment: AgentEnvironmentParameters = None, global_variables: Dict = None, global_settings: AgentHeaderParametersSettings = AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False))[source]#

Bases: BaseModel

class AgentLoggingParameters(*, logfile: str = None)[source]#

Bases: BaseModel

logfile: str#
description: str#
environment: AgentEnvironmentParameters#
global_settings: AgentHeaderParametersSettings#
global_variables: Dict#
id: str#
logging: AgentLoggingParameters#
class icolos.core.composite_agents.base_agent.AgentHeaderParametersSettings(*, remove_temporary_files: bool = True, single_directory: bool = False)[source]#

Bases: BaseModel

remove_temporary_files: bool#
single_directory: bool#
class icolos.core.composite_agents.base_agent.BaseAgent(*, header: AgentHeaderParameters = AgentHeaderParameters(id=None, description=None, logging=AgentLoggingParameters(logfile=None), environment=None, global_variables=None, global_settings=AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False)))[source]#

Bases: BaseModel

class Config[source]#

Bases: object

underscore_attrs_are_private = True#
abstract execute()[source]#
get_description() str[source]#
get_id() str[source]#
header: AgentHeaderParameters#
initialize()[source]#
is_valid() bool[source]#
set_description(description: str)[source]#
set_id(id: str)[source]#

icolos.core.composite_agents.scheduler module#

class icolos.core.composite_agents.scheduler.Scheduler(*, header: SchedulerHeaderParameters = SchedulerHeaderParameters(id=None, description=None, logging=AgentLoggingParameters(logfile=None), environment=None, global_variables=None, global_settings=AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False)))[source]#

Bases: BaseAgent, BaseModel

Class to hold the whole logic for scheduling sub-jobs.

class Config[source]#

Bases: object

underscore_attrs_are_private = True#
execute()[source]#
header: SchedulerHeaderParameters#
initialize()[source]#
is_valid() bool[source]#
class icolos.core.composite_agents.scheduler.SchedulerHeaderParameters(*, id: str = None, description: str = None, logging: AgentLoggingParameters = AgentLoggingParameters(logfile=None), environment: AgentEnvironmentParameters = None, global_variables: Dict = None, global_settings: AgentHeaderParametersSettings = AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False))[source]#

Bases: AgentHeaderParameters, BaseModel

description: str#
environment: AgentEnvironmentParameters#
global_settings: AgentHeaderParametersSettings#
global_variables: Dict#
id: str#
logging: AgentLoggingParameters#

icolos.core.composite_agents.workflow module#

class icolos.core.composite_agents.workflow.WorkFlow(*, header: WorkflowHeaderParameters = WorkflowHeaderParameters(id=None, description=None, logging=AgentLoggingParameters(logfile=None), environment=None, global_variables=None, global_settings=AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False)), steps: List[Dict] = [], workflow_data: WorkflowData = WorkflowData(work_dir=None, perturbation_map=None))[source]#

Bases: BaseAgent, BaseModel

Class to hold the whole logic for a workflow.

class Config[source]#

Bases: object

underscore_attrs_are_private = True#
add_step(step: StepBase)[source]#
execute()[source]#
find_step_by_step_id(step_id: str)[source]#
get_perturbation_map() PerturbationMap[source]#
get_steps() list[source]#
header: WorkflowHeaderParameters#
initialize()[source]#
is_valid() bool[source]#
set_perturbation_map(p_map: PerturbationMap) None[source]#
steps: List[Dict]#
workflow_data: WorkflowData#
class icolos.core.composite_agents.workflow.WorkflowData(*, work_dir: str = None, perturbation_map: PerturbationMap = None)[source]#

Bases: BaseModel

perturbation_map: PerturbationMap#
work_dir: str#
class icolos.core.composite_agents.workflow.WorkflowHeaderParameters(*, id: str = None, description: str = None, logging: AgentLoggingParameters = AgentLoggingParameters(logfile=None), environment: AgentEnvironmentParameters = None, global_variables: Dict = None, global_settings: AgentHeaderParametersSettings = AgentHeaderParametersSettings(remove_temporary_files=True, single_directory=False))[source]#

Bases: AgentHeaderParameters, BaseModel

description: str#
environment: AgentEnvironmentParameters#
global_settings: AgentHeaderParametersSettings#
global_variables: Dict#
id: str#
logging: AgentLoggingParameters#

Module contents#