LoopedNode#
- class maize.core.node.LoopedNode(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]#
Bases:
Node
Node variant that loops its run method by default
Methods
__init__
([max_loops, initial_status])as_dict
()Provides a non-recursive dictionary view of the component.
build
()Builds the node by instantiating all interfaces from descriptions.
check
()Checks if the node was built correctly.
Check if all node dependencies are met by running the prepare method
Check if all required node parameters are set
cleanup
()Method run on component shutdown in the main process.
execute
()This is the main entrypoint for node execution.
Returns all available and registered nodes.
Returns all inputs available to the node.
get_interfaces
([kind])Returns all interfaces available to the node.
get_node_class
(name)Returns the node class corresponding to the given name.
Returns all outputs available to the node.
Returns all parameters available to the node.
Provides a one-line summary of the node.
Check if all required ports are active.
prepare
()Prepares the execution environment for run.
run
()This is the main high-level node execution point.
run_command
(command[, validators, verbose, ...])Runs an external command.
run_multi
(commands[, working_dirs, ...])Runs multiple commands in parallel.
Send a status update to the main process.
Provides a serialized representation of the component type.
setup_directories
([parent_path])Sets up the required directories.
shutdown
()Shuts down the component gracefully.
update_parameters
(**kwargs)Update component parameters.
Attributes
Returns all settable parameters and unconnected inputs
If given, will run commands on the batch system instead of locally
Custom paths to any commands
Provides the full path to the component as a tuple of names.
The component datatype if it's generic.
Modules to load in addition to ones defined in the configuration
Returns the number of items waiting to be received
Returns the number of items waiting to be sent
Provides the configuration of the current node
Provides all parent components.
Provides a convenience iterator for all inputs and outputs.
The path to the python executable to use for this node, allows custom environments
List of external commandline programs that are required for running the component.
List of required python packages
Provides the root workflow or graph instance.
Additional script specifications require to run.
Python logger for both the build and run procedures.
Timer for the run duration, without waiting for resources or other nodes.
Timer for the full duration, including waiting for resources or other nodes.
Working directory for the component.
Current status of the component.
name
signal
n_signals
- property all_parameters: dict[str, Input[Any] | MultiInput[Any] | Parameter[Any]]#
Returns all settable parameters and unconnected inputs
- batch_options: Parameter[JobResourceConfig | None]#
If given, will run commands on the batch system instead of locally
- build() None #
Builds the node by instantiating all interfaces from descriptions.
Examples
>>> class Foo(Node): ... def build(self): ... self.inp = self.add_input( ... "inp", datatype="pdb", description="Example input") ... self.param = self.add_parameter("param", default=42)
- check() None #
Checks if the node was built correctly.
- Raises:
NodeBuildException – If the node didn’t declare at least one port
- check_dependencies() None #
Check if all node dependencies are met by running the prepare method
- Raises:
NodeBuildException – If required callables were not found
ImportError – If required python packages were not found
- check_parameters() None #
Check if all required node parameters are set
- Raises:
NodeBuildException – If parameters were not set
- property component_path: tuple[str, ...]#
Provides the full path to the component as a tuple of names.
- datatype: Any#
The component datatype if it’s generic.
- execute() None #
This is the main entrypoint for node execution.
- Raises:
KeyboardInterrupt – If the underlying process gets interrupted or receives
SIGINT
- classmethod get_interfaces(kind: Literal['input', 'output', 'parameter'] | None = None) set[str] #
Returns all interfaces available to the node.
- static get_node_class(name: str) type[Component] #
Returns the node class corresponding to the given name.
- Parameters:
name – Name of the component class to retrieve
- Returns:
The retrieved component class, can be passed to add_node
- Return type:
Type[Component]
- logger: logging.Logger#
Python logger for both the build and run procedures.
- property node_config: NodeConfig#
Provides the configuration of the current node
- ports_active() bool #
Check if all required ports are active.
Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.
- Returns:
True
if all required ports are active,False
otherwise.- Return type:
- prepare() None #
Prepares the execution environment for run.
Performs the following:
Changing the python environment, if required
Setting of environment variables
Setting of parameters from the config
Loading LMOD modules
Importing python packages listed in required_packages
Checking if software in required_callables is available
- python: FileParameter[Path]#
The path to the python executable to use for this node, allows custom environments
- required_callables: ClassVar[list[str]] = []#
List of external commandline programs that are required for running the component.
- abstract run() None #
This is the main high-level node execution point.
It should be overridden by the user to provide custom node functionality, and should return normally at completion. Exception handling, log message passing, and channel management are handled by the wrapping execute method.
Examples
>>> class Foo(Node): ... def run(self): ... val = self.inp.receive() ... new = val * self.param.value ... self.out.send(new)
- run_command(command: str | list[str], validators: Sequence[Validator] | None = None, verbose: bool = False, raise_on_failure: bool = True, command_input: str | None = None, pre_execution: str | list[str] | None = None, batch_options: JobResourceConfig | None = None, timeout: float | None = None) CompletedProcess[bytes] #
Runs an external command.
- Parameters:
command – Command to run as a single string, or a list of strings
validators – One or more Validator instances that will be called on the result of the command.
verbose – If
True
will also log any STDOUT or STDERR outputraise_on_failure – Whether to raise an exception when encountering a failure
command_input – Text string used as input for command
pre_execution – Command to run directly before the main one
batch_options – Job options for the batch system, if given, will attempt run on the batch system
timeout – Maximum runtime for the command in seconds, or unlimited if
None
- Returns:
Result of the execution, including STDOUT and STDERR
- Return type:
- Raises:
ProcessError – If any of the validators failed or the returncode was not zero
Examples
To run a single command:
>>> self.run_command("echo foo", validators=[SuccessValidator("foo")])
To run on a batch system, if configured:
>>> self.run_command("echo foo", batch_options=JobResourceConfig(nodes=1))
- run_multi(commands: Sequence[str | list[str]], working_dirs: Sequence[Path] | None = None, command_inputs: Sequence[str | None] | None = None, validators: Sequence[Validator] | None = None, verbose: bool = False, raise_on_failure: bool = True, n_jobs: int = 1, pre_execution: str | list[str] | None = None, batch_options: JobResourceConfig | None = None, timeout: float | None = None) list[CompletedProcess[bytes]] #
Runs multiple commands in parallel.
- Parameters:
commands – Commands to run as a list of strings, or a nested list of strings
working_dirs – Working directories for each command
command_inputs – Text string used as input for each command
validators – One or more Validator instances that will be called on the result of the command.
verbose – If
True
will also log any STDOUT or STDERR outputraise_on_failure – Whether to raise an exception when encountering a failure
n_jobs – Number of processes to spawn at once, should generally be compatible with the number of available CPUs
pre_execution – Command to run directly before the main one
batch_options – Job options for the batch system, if given, will attempt run on the batch system
timeout – Maximum runtime for the command in seconds, or unlimited if
None
- Returns:
Result of the execution, including STDOUT and STDERR
- Return type:
- Raises:
ProcessError – If any of the validators failed or a returncode was not zero
Examples
To run multiple commands, but only two at a time:
>>> self.run_multi(["echo foo", "echo bar", "echo baz"], n_jobs=2)
To run on a batch system, if configured (note that batch settings are per-command):
>>> self.run_command(["echo foo", "echo bar"], batch_options=JobResourceConfig(nodes=1))
- scripts: Parameter[ScriptSpecType]#
Additional script specifications require to run.
Examples
>>> node.scripts.set({"interpreter": /path/to/python, "script": /path/to/script})
- classmethod serialized_summary() _SerialType #
Provides a serialized representation of the component type.
- Returns:
Nested dictionary of the component type structure, including I/O and parameters.
- Return type:
Examples
>>> Merge.serialized_summary() {"name": "Merge", "inputs": [{"name": "inp", ...}]}
- shutdown() None #
Shuts down the component gracefully.
This should not be called by the user directly, as it is called at node shutdown by execute().
- status#
Current status of the component.
- update_parameters(**kwargs: dict[str, Any]) None #
Update component parameters.
- Parameters:
**kwargs – Name - value pairs supplied as keyword arguments
- work_dir: Path#
Working directory for the component.