Node#

class maize.core.node.Node(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]#

Bases: Component, Runnable

Base class for all atomic (non-subgraph) nodes of a graph. Create a subclass to implement your own custom tasks.

Parameters:
  • parent – Parent component, typically the graph in context

  • name – The name of the component

  • description – An optional additional description

  • fail_ok – If True, the failure in the component will not trigger the whole network to shutdown

  • n_attempts – Number of attempts at executing the run() method

  • level – Logging level, if not given or None will use the parent logging level

  • cleanup_temp – Whether to remove any temporary directories after completion

  • resume – Whether to resume from a previous checkpoint

  • logfile – File to output all log messages to, defaults to STDOUT

  • max_cpus – Maximum number of CPUs to use, defaults to the number of available cores in the system

  • max_gpus – Maximum number of GPUs to use, defaults to the number of available GPUs in the system

  • loop – Whether to run the run method in a loop, as opposed to a single time

  • initial_status – The initial status of the node, will be NOT_READY by default, but can be set otherwise to indicate that the node should not be run. This would be useful when starting from a partially completed graph.

  • max_loops – Run the internal loop method a maximum number of max_loops times

cpus#

Resource semaphore allowing the reservation of multiple CPUs

gpus#

Resource semaphore allowing the reservation of multiple GPUs

Examples

Subclassing can be done the following way:

>>> class Foo(Node):
...     out: Output[int] = Output()
...
...     def run(self):
...         self.out.send(42)
__init__(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY) None[source]#

Methods

__init__([parent, name, description, ...])

as_dict()

Provides a non-recursive dictionary view of the component.

build()

Builds the node by instantiating all interfaces from descriptions.

check()

Checks if the node was built correctly.

check_dependencies()

Check if all node dependencies are met by running the prepare method

check_parameters()

Check if all required node parameters are set

cleanup()

Method run on component shutdown in the main process.

execute()

This is the main entrypoint for node execution.

get_available_nodes()

Returns all available and registered nodes.

get_inputs()

Returns all inputs available to the node.

get_interfaces([kind])

Returns all interfaces available to the node.

get_node_class(name)

Returns the node class corresponding to the given name.

get_outputs()

Returns all outputs available to the node.

get_parameters()

Returns all parameters available to the node.

get_summary_line()

Provides a one-line summary of the node.

ports_active()

Check if all required ports are active.

prepare()

Prepares the execution environment for run.

run()

This is the main high-level node execution point.

run_command(command[, validators, verbose, ...])

Runs an external command.

run_multi(commands[, working_dirs, ...])

Runs multiple commands in parallel.

send_update()

Send a status update to the main process.

serialized_summary()

Provides a serialized representation of the component type.

setup_directories([parent_path])

Sets up the required directories.

shutdown()

Shuts down the component gracefully.

update_parameters(**kwargs)

Update component parameters.

Attributes

all_parameters

Returns all settable parameters and unconnected inputs

batch_options

If given, will run commands on the batch system instead of locally

commands

Custom paths to any commands

component_path

Provides the full path to the component as a tuple of names.

datatype

The component datatype if it's generic.

modules

Modules to load in addition to ones defined in the configuration

n_inbound

Returns the number of items waiting to be received

n_outbound

Returns the number of items waiting to be sent

node_config

Provides the configuration of the current node

parents

Provides all parent components.

ports

Provides a convenience iterator for all inputs and outputs.

python

The path to the python executable to use for this node, allows custom environments

required_callables

List of external commandline programs that are required for running the component.

required_packages

List of required python packages

root

Provides the root workflow or graph instance.

scripts

Additional script specifications require to run.

status

Current status of the component.

logger

Python logger for both the build and run procedures.

run_timer

Timer for the run duration, without waiting for resources or other nodes.

full_timer

Timer for the full duration, including waiting for resources or other nodes.

work_dir

Working directory for the component.

name

signal

n_signals

property all_parameters: dict[str, Input[Any] | MultiInput[Any] | Parameter[Any]]#

Returns all settable parameters and unconnected inputs

as_dict() dict[str, Any]#

Provides a non-recursive dictionary view of the component.

batch_options: Parameter[JobResourceConfig | None]#

If given, will run commands on the batch system instead of locally

build() None[source]#

Builds the node by instantiating all interfaces from descriptions.

Examples

>>> class Foo(Node):
...     def build(self):
...         self.inp = self.add_input(
...             "inp", datatype="pdb", description="Example input")
...         self.param = self.add_parameter("param", default=42)
check() None[source]#

Checks if the node was built correctly.

Raises:

NodeBuildException – If the node didn’t declare at least one port

check_dependencies() None[source]#

Check if all node dependencies are met by running the prepare method

Raises:
check_parameters() None[source]#

Check if all required node parameters are set

Raises:

NodeBuildException – If parameters were not set

cleanup() None[source]#

Method run on component shutdown in the main process.

commands: Parameter[dict[str, Path]]#

Custom paths to any commands

property component_path: tuple[str, ...]#

Provides the full path to the component as a tuple of names.

datatype: Any#

The component datatype if it’s generic.

execute() None[source]#

This is the main entrypoint for node execution.

Raises:

KeyboardInterrupt – If the underlying process gets interrupted or receives SIGINT

full_timer: Timer#

Timer for the full duration, including waiting for resources or other nodes.

static get_available_nodes() set[type[Component]]#

Returns all available and registered nodes.

Returns:

All available node names

Return type:

set[str]

classmethod get_inputs() set[str]#

Returns all inputs available to the node.

classmethod get_interfaces(kind: Literal['input', 'output', 'parameter'] | None = None) set[str]#

Returns all interfaces available to the node.

Parameters:

kind – Kind of interface to retrieve

Returns:

Interface names

Return type:

set[str]

static get_node_class(name: str) type[Component]#

Returns the node class corresponding to the given name.

Parameters:

name – Name of the component class to retrieve

Returns:

The retrieved component class, can be passed to add_node

Return type:

Type[Component]

classmethod get_outputs() set[str]#

Returns all outputs available to the node.

classmethod get_parameters() set[str]#

Returns all parameters available to the node.

classmethod get_summary_line() str#

Provides a one-line summary of the node.

logger: logging.Logger#

Python logger for both the build and run procedures.

modules: Parameter[list[str]]#

Modules to load in addition to ones defined in the configuration

property n_inbound: int#

Returns the number of items waiting to be received

property n_outbound: int#

Returns the number of items waiting to be sent

property node_config: NodeConfig#

Provides the configuration of the current node

property parents: tuple[Component, ...] | None#

Provides all parent components.

property ports: dict[str, Port[Any]]#

Provides a convenience iterator for all inputs and outputs.

ports_active() bool#

Check if all required ports are active.

Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.

Returns:

True if all required ports are active, False otherwise.

Return type:

bool

prepare() None[source]#

Prepares the execution environment for run.

Performs the following:

  • Changing the python environment, if required

  • Setting of environment variables

  • Setting of parameters from the config

  • Loading LMOD modules

  • Importing python packages listed in required_packages

  • Checking if software in required_callables is available

python: FileParameter[Path]#

The path to the python executable to use for this node, allows custom environments

required_callables: ClassVar[list[str]] = []#

List of external commandline programs that are required for running the component.

required_packages: ClassVar[list[str]] = []#

List of required python packages

property root: Graph#

Provides the root workflow or graph instance.

abstract run() None[source]#

This is the main high-level node execution point.

It should be overridden by the user to provide custom node functionality, and should return normally at completion. Exception handling, log message passing, and channel management are handled by the wrapping execute method.

Examples

>>> class Foo(Node):
...     def run(self):
...         val = self.inp.receive()
...         new = val * self.param.value
...         self.out.send(new)
run_command(command: str | list[str], validators: Sequence[Validator] | None = None, verbose: bool = False, raise_on_failure: bool = True, command_input: str | None = None, pre_execution: str | list[str] | None = None, batch_options: JobResourceConfig | None = None, timeout: float | None = None) CompletedProcess[bytes][source]#

Runs an external command.

Parameters:
  • command – Command to run as a single string, or a list of strings

  • validators – One or more Validator instances that will be called on the result of the command.

  • verbose – If True will also log any STDOUT or STDERR output

  • raise_on_failure – Whether to raise an exception when encountering a failure

  • command_input – Text string used as input for command

  • pre_execution – Command to run directly before the main one

  • batch_options – Job options for the batch system, if given, will attempt run on the batch system

  • timeout – Maximum runtime for the command in seconds, or unlimited if None

Returns:

Result of the execution, including STDOUT and STDERR

Return type:

subprocess.CompletedProcess[bytes]

Raises:

ProcessError – If any of the validators failed or the returncode was not zero

Examples

To run a single command:

>>> self.run_command("echo foo", validators=[SuccessValidator("foo")])

To run on a batch system, if configured:

>>> self.run_command("echo foo", batch_options=JobResourceConfig(nodes=1))
run_multi(commands: Sequence[str | list[str]], working_dirs: Sequence[Path] | None = None, command_inputs: Sequence[str | None] | None = None, validators: Sequence[Validator] | None = None, verbose: bool = False, raise_on_failure: bool = True, n_jobs: int = 1, pre_execution: str | list[str] | None = None, batch_options: JobResourceConfig | None = None, timeout: float | None = None) list[CompletedProcess[bytes]][source]#

Runs multiple commands in parallel.

Parameters:
  • commands – Commands to run as a list of strings, or a nested list of strings

  • working_dirs – Working directories for each command

  • command_inputs – Text string used as input for each command

  • validators – One or more Validator instances that will be called on the result of the command.

  • verbose – If True will also log any STDOUT or STDERR output

  • raise_on_failure – Whether to raise an exception when encountering a failure

  • n_jobs – Number of processes to spawn at once, should generally be compatible with the number of available CPUs

  • pre_execution – Command to run directly before the main one

  • batch_options – Job options for the batch system, if given, will attempt run on the batch system

  • timeout – Maximum runtime for the command in seconds, or unlimited if None

Returns:

Result of the execution, including STDOUT and STDERR

Return type:

list[subprocess.CompletedProcess[bytes]]

Raises:

ProcessError – If any of the validators failed or a returncode was not zero

Examples

To run multiple commands, but only two at a time:

>>> self.run_multi(["echo foo", "echo bar", "echo baz"], n_jobs=2)

To run on a batch system, if configured (note that batch settings are per-command):

>>> self.run_command(["echo foo", "echo bar"], batch_options=JobResourceConfig(nodes=1))
run_timer: Timer#

Timer for the run duration, without waiting for resources or other nodes.

scripts: Parameter[dict[str, dict[Literal['interpreter', 'location'], Path]]]#

Additional script specifications require to run.

Examples

>>> node.scripts.set({"interpreter": /path/to/python, "script": /path/to/script})
send_update() None#

Send a status update to the main process.

classmethod serialized_summary() _SerialType#

Provides a serialized representation of the component type.

Returns:

Nested dictionary of the component type structure, including I/O and parameters.

Return type:

dict[str, Any]

Examples

>>> Merge.serialized_summary()
{"name": "Merge", "inputs": [{"name": "inp", ...}]}
setup_directories(parent_path: Path | None = None) None[source]#

Sets up the required directories.

shutdown() None[source]#

Shuts down the component gracefully.

This should not be called by the user directly, as it is called at node shutdown by execute().

status#

Current status of the component.

update_parameters(**kwargs: dict[str, Any]) None#

Update component parameters.

Parameters:

**kwargs – Name - value pairs supplied as keyword arguments

work_dir: Path#

Working directory for the component.