Graph#

class maize.core.graph.Graph(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10)[source]#

Bases: Component

Represents a graph (or subgraph) consisting of individual components.

As a user, one will typically instantiate a Graph and then add individual nodes or subgraphs and connect them together. To construct custom subgraphs, create a custom subclass and overwrite the build method, and add nodes and connections there as normal.

Parameters:
  • parent – Parent component, typically the graph in context

  • name – The name of the component

  • description – An optional additional description

  • fail_ok – If True, the failure in the component will not trigger the whole network to shutdown

  • n_attempts – Number of attempts at executing the run() method

  • level – Logging level, if not given or None will use the parent logging level

  • cleanup_temp – Whether to remove any temporary directories after completion

  • resume – Whether to resume from a previous checkpoint

  • logfile – File to output all log messages to, defaults to STDOUT

  • max_cpus – Maximum number of CPUs to use, defaults to the number of available cores in the system

  • max_gpus – Maximum number of GPUs to use, defaults to the number of available GPUs in the system

  • loop – Whether to run the run method in a loop, as opposed to a single time

  • strict – If True (default), will not allow generic node parameterisation and raise an exception instead. You may want to switch this to False if you’re automating subgraph construction.

  • default_channel_size – The maximum number of items to allow for each channel connecting nodes

nodes#

Dictionary of nodes or subgraphs part of the Graph

channels#

Dictionary of channels part of the Graph

Raises:

GraphBuildException – If there was an error building the subgraph, e.g. an unconnected port

Examples

Defining a new subgraph wrapping an output-only example node with a delay node:

>>> class SubGraph(Graph):
...     out: Output[int]
...     delay: Parameter[int]
...
...     def build(self) -> None:
...         node = self.add(Example)
...         delay = self.add(Delay, parameters=dict(delay=2))
...         self.connect(node.out, delay.inp)
...         self.out = self.map_port(delay.out)
...         self.delay = self.map(delay.delay)

It can then be used just like any other node:

>>> subgraph = g.add(SubGraph, name="subgraph", parameters={"delay": 10})
>>> g.connect(subgraph.out, other.inp)
__init__(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10) None[source]#

Methods

__init__([parent, name, description, ...])

add(component[, name, parameters])

Add a component to the graph.

add_all()

Adds all specified components to the graph.

as_dict()

Provides a non-recursive dictionary view of the component.

auto_connect(sending, receiving[, size])

Connects component nodes together automatically, based on port availability and datatype.

build()

Builds a subgraph.

chain(*nodes[, size])

Connects an arbitrary number of nodes in sequence using auto_connect.

check()

Checks if the graph was built correctly and warns about possible deadlocks.

check_dependencies()

Check all contained node dependencies

combine_parameters(*parameters[, name, default])

Maps multiple low-level parameters to one high-level one.

connect(sending, receiving[, size, mode])

Connects component inputs and outputs together.

connect_all()

Connect multiple pairs of ports together.

get_available_nodes()

Returns all available and registered nodes.

get_inputs()

Returns all inputs available to the node.

get_interfaces([kind])

Returns all interfaces available to the node.

get_node(*names)

Recursively find a node in the graph.

get_node_class(name)

Returns the node class corresponding to the given name.

get_outputs()

Returns all outputs available to the node.

get_parameter(*names)

Recursively find a parameter in the graph.

get_parameters()

Returns all parameters available to the node.

get_port(*names)

Recursively find a port in the graph.

get_summary_line()

Provides a one-line summary of the node.

map(*interfaces)

Map multiple child interfaces (ports or parameters) onto the current graph.

map_port(port[, name])

Maps a port of a component to the graph.

ports_active()

Check if all required ports are active.

send_update()

Send a status update to the main process.

serialized_summary()

Provides a serialized representation of the component type.

setup_directories([parent_path])

Create all work directories for the graph / workflow.

update_parameters(**kwargs)

Update component parameters.

visualize([max_level, coloring, labels])

Visualize the graph using graphviz, if installed.

Attributes

P

P1

P2

P3

P4

P5

active_nodes

Flattened view of all active nodes in the graph.

all_parameters

Returns all settable parameters and unconnected inputs

component_path

Provides the full path to the component as a tuple of names.

datatype

The component datatype if it's generic.

flat_channels

Flattened view of all connections in the graph.

flat_components

Flattened view of all components in the graph.

flat_nodes

Flattened view of all nodes in the graph.

n_inbound

Returns the number of items waiting to be received

n_outbound

Returns the number of items waiting to be sent

node_config

Provides the configuration of the current node

parents

Provides all parent components.

ports

Provides a convenience iterator for all inputs and outputs.

required_callables

List of external commandline programs that are required for running the component.

required_packages

List of required python packages

root

Provides the root workflow or graph instance.

logger

Python logger for both the build and run procedures.

run_timer

Timer for the run duration, without waiting for resources or other nodes.

full_timer

Timer for the full duration, including waiting for resources or other nodes.

work_dir

Working directory for the component.

status

Current status of the component.

property active_nodes: list[Node]#

Flattened view of all active nodes in the graph.

add(component: type[U], name: str | None = None, parameters: dict[str, Any] | None = None, **kwargs: Any) U[source]#

Add a component to the graph.

Parameters:
  • name – Unique name of the component

  • component – Node class or subgraph class

  • kwargs – Additional arguments passed to the component constructor

Returns:

The initialized component

Return type:

Component

Raises:

GraphBuildException – If a node with the same name already exists

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo, name="foo", parameters=dict(val=42))
>>> bar = g.add(Bar)
add_all(c1: type[_T1], c2: type[_T2], c3: type[_T3], c4: type[_T4], /) tuple[_T1, _T2, _T3, _T4][source]#
add_all(c1: type[_T1], c2: type[_T2], c3: type[_T3], c4: type[_T4], c5: type[_T5], /) tuple[_T1, _T2, _T3, _T4, _T5]
add_all(c1: type[_T1], c2: type[_T2], c3: type[_T3], c4: type[_T4], c5: type[_T5], c6: type[_T6], /) tuple[_T1, _T2, _T3, _T4, _T5, _T6]

Adds all specified components to the graph.

Parameters:

components – All component classes to initialize

Returns:

The initialized component instances

Return type:

tuple[U, …]

Examples

>>> g = Graph(name="foo")
>>> foo, bar = g.add_all(Foo, Bar)
property all_parameters: dict[str, Input[Any] | MultiInput[Any] | Parameter[Any]]#

Returns all settable parameters and unconnected inputs

as_dict() dict[str, Any]#

Provides a non-recursive dictionary view of the component.

auto_connect(sending: Component, receiving: Component, size: int = 10) None[source]#

Connects component nodes together automatically, based on port availability and datatype.

This should really only be used in unambiguous cases, otherwise this will lead to an only partially-connected graph.

Parameters:
  • sending – Sending node

  • receiving – Receiving node

  • size – Size (in items) of the queue used for communication

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> g.auto_connect(foo, bar)
build() None[source]#

Builds a subgraph.

Override this method to construct a subgraph encapsulating multiple lower-level nodes, by using the add and connect methods. Additionally use the map, map_port, and map_parameters methods to create a subgraph that can be used just like a node.

Examples

>>> def build(self):
...     foo = self.add(Foo)
...     bar = self.add(Bar)
...     self.map(foo.inp, bar.out, foo.param)
chain(*nodes: Component, size: int = 10) None[source]#

Connects an arbitrary number of nodes in sequence using auto_connect.

Parameters:
  • nodes – Nodes to be connected in sequence

  • size – Size of each channel connecting the nodes

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> baz = g.add(Baz)
>>> g.chain(foo, bar, baz)
check() None[source]#

Checks if the graph was built correctly and warns about possible deadlocks.

A correctly built graph has no unconnected ports, and all channel types are matched internally.

Raises:

GraphBuildException – If a port is unconnected

Examples

>>> g = Workflow(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> g.auto_connect(foo, bar)
>>> g.check()
check_dependencies() None[source]#

Check all contained node dependencies

combine_parameters(*parameters: Parameter[T_co], name: str | None = None, default: T_co | None = None) MultiParameter[T_co][source]#

Maps multiple low-level parameters to one high-level one.

This can be useful when a single parameter needs to be supplied to multiple nodes within a subgraph. This method also handles setting a graph attribute with the given name.

Parameters:
  • parameters – Low-level parameters of component nodes

  • name – Name of the high-level parameter

  • default – The default parameter value

Returns:

The combined parameter object

Return type:

MultiParameter

Examples

>>> def build(self):
...     foo = self.add(Foo)
...     bar = self.add(Bar)
...     self.map_parameters(
...         foo.param, bar.param, name="param", default=42)
property component_path: tuple[str, ...]#

Provides the full path to the component as a tuple of names.

connect(sending: Output[T_co] | MultiOutput[T_co], receiving: Input[T_co] | MultiInput[T_co], size: int | None = None, mode: Literal['copy', 'link', 'move'] | None = None) None[source]#

Connects component inputs and outputs together.

Parameters:
  • sending – Output port for sending items

  • receiving – Input port for receiving items

  • size – Size (in items) of the queue used for communication, only for serializable data

  • mode – Whether to link, copy or move files, overrides value specified for the port

Raises:

GraphBuildException – If the port types don’t match, or the maximum number of channels supported by your OS has been reached

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> g.connect(foo.out, bar.inp)
connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], /) None[source]#
connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], /) None
connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], p4: tuple[Output[P3] | MultiOutput[P3], Input[P3] | MultiInput[P3]], /) None
connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], p4: tuple[Output[P3] | MultiOutput[P3], Input[P3] | MultiInput[P3]], p5: tuple[Output[P4] | MultiOutput[P4], Input[P4] | MultiInput[P4]], /) None
connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], p4: tuple[Output[P3] | MultiOutput[P3], Input[P3] | MultiInput[P3]], p5: tuple[Output[P4] | MultiOutput[P4], Input[P4] | MultiInput[P4]], p6: tuple[Output[P5] | MultiOutput[P5], Input[P5] | MultiInput[P5]], /) None

Connect multiple pairs of ports together.

Parameters:

ports – Output - Input pairs to connect

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> baz = g.add(Baz)
>>> g.connect_all((foo.out, bar.inp), (bar.out, baz.inp))
datatype: Any#

The component datatype if it’s generic.

property flat_channels: set[tuple[tuple[str, ...], tuple[str, ...]]]#

Flattened view of all connections in the graph.

property flat_components: list[Component]#

Flattened view of all components in the graph.

property flat_nodes: list[Node]#

Flattened view of all nodes in the graph.

full_timer: Timer#

Timer for the full duration, including waiting for resources or other nodes.

static get_available_nodes() set[type[Component]]#

Returns all available and registered nodes.

Returns:

All available node names

Return type:

set[str]

classmethod get_inputs() set[str]#

Returns all inputs available to the node.

classmethod get_interfaces(kind: Literal['input', 'output', 'parameter'] | None = None) set[str]#

Returns all interfaces available to the node.

Parameters:

kind – Kind of interface to retrieve

Returns:

Interface names

Return type:

set[str]

get_node(*names: str) Component[source]#

Recursively find a node in the graph.

Parameters:

names – Names of nodes leading up to the potentially nested target node

Returns:

The target component

Return type:

Component

Raises:

KeyError – When the target cannot be found

Examples

>>> g.get_node("subgraph", "subsubgraph", "foo")
Foo(name='foo', parent=SubSubGraph(...))
static get_node_class(name: str) type[Component]#

Returns the node class corresponding to the given name.

Parameters:

name – Name of the component class to retrieve

Returns:

The retrieved component class, can be passed to add_node

Return type:

Type[Component]

classmethod get_outputs() set[str]#

Returns all outputs available to the node.

get_parameter(*names: str) Parameter[Any][source]#

Recursively find a parameter in the graph.

Parameters:

names – Names of components leading up to the target parameter

Returns:

The target parameter

Return type:

Parameter

Raises:

KeyError – When the parameter cannot be found

classmethod get_parameters() set[str]#

Returns all parameters available to the node.

get_port(*names: str) Port[Any][source]#

Recursively find a port in the graph.

Parameters:

names – Names of components leading up to the target port

Returns:

The target port

Return type:

Port

Raises:

KeyError – When the target cannot be found

classmethod get_summary_line() str#

Provides a one-line summary of the node.

logger: logging.Logger#

Python logger for both the build and run procedures.

map(*interfaces: Interface[Any]) None[source]#

Map multiple child interfaces (ports or parameters) onto the current graph. Will also set the graph attributes to the names of the mapped interfaces.

Parameters:

interfaces – Any number of ports and parameters to map

See also

Graph.map_parameters

If you want to map multiple parameters to a single high-level one

Graph.map_port

If you want more fine-grained control over naming

Examples

>>> def build(self):
...     foo = self.add(Foo)
...     bar = self.add(Bar)
...     self.map(foo.inp, bar.out, foo.param)
map_port(port: _P, name: str | None = None) _P[source]#

Maps a port of a component to the graph.

This will be required when creating custom subgraphs, ports of individual component nodes will need to be mapped to the subgraph. This method also handles setting a graph attribute with the given name.

Parameters:
  • port – The component port

  • name – Name for the port to be registered as

Returns:

Mapped port

Return type:

_P

Examples

>>> def build(self):
...     node = self.add(Example)
...     self.map_port(node.output, name="output")
property n_inbound: int#

Returns the number of items waiting to be received

property n_outbound: int#

Returns the number of items waiting to be sent

property node_config: NodeConfig#

Provides the configuration of the current node

property parents: tuple[Component, ...] | None#

Provides all parent components.

property ports: dict[str, Port[Any]]#

Provides a convenience iterator for all inputs and outputs.

ports_active() bool#

Check if all required ports are active.

Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.

Returns:

True if all required ports are active, False otherwise.

Return type:

bool

required_callables: ClassVar[list[str]] = []#

List of external commandline programs that are required for running the component.

required_packages: ClassVar[list[str]] = []#

List of required python packages

property root: Graph#

Provides the root workflow or graph instance.

run_timer: Timer#

Timer for the run duration, without waiting for resources or other nodes.

send_update() None#

Send a status update to the main process.

classmethod serialized_summary() _SerialType#

Provides a serialized representation of the component type.

Returns:

Nested dictionary of the component type structure, including I/O and parameters.

Return type:

dict[str, Any]

Examples

>>> Merge.serialized_summary()
{"name": "Merge", "inputs": [{"name": "inp", ...}]}
setup_directories(parent_path: Path | None = None) None[source]#

Create all work directories for the graph / workflow.

status#

Current status of the component.

update_parameters(**kwargs: dict[str, Any]) None#

Update component parameters.

Parameters:

**kwargs – Name - value pairs supplied as keyword arguments

visualize(max_level: int = 9223372036854775807, coloring: Literal['nesting', 'status'] = 'nesting', labels: bool = True) Any[source]#

Visualize the graph using graphviz, if installed.

Parameters:
  • max_level – Maximum nesting level to show, shows all levels by default

  • coloring – Whether to color nodes by nesting level or status

  • labels – Whether to show datatype labels

Returns:

Graphviz Dot instance, in a Jupyter notebook this will be displayed visually automatically

Return type:

dot

work_dir: Path#

Working directory for the component.