Graph#
- class maize.core.graph.Graph(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10)[source]#
Bases:
Component
Represents a graph (or subgraph) consisting of individual components.
As a user, one will typically instantiate a Graph and then add individual nodes or subgraphs and connect them together. To construct custom subgraphs, create a custom subclass and overwrite the build method, and add nodes and connections there as normal.
- Parameters:
parent – Parent component, typically the graph in context
name – The name of the component
description – An optional additional description
fail_ok – If True, the failure in the component will not trigger the whole network to shutdown
n_attempts – Number of attempts at executing the run() method
level – Logging level, if not given or
None
will use the parent logging levelcleanup_temp – Whether to remove any temporary directories after completion
resume – Whether to resume from a previous checkpoint
logfile – File to output all log messages to, defaults to STDOUT
max_cpus – Maximum number of CPUs to use, defaults to the number of available cores in the system
max_gpus – Maximum number of GPUs to use, defaults to the number of available GPUs in the system
loop – Whether to run the run method in a loop, as opposed to a single time
strict – If
True
(default), will not allow generic node parameterisation and raise an exception instead. You may want to switch this toFalse
if you’re automating subgraph construction.default_channel_size – The maximum number of items to allow for each channel connecting nodes
- nodes#
Dictionary of nodes or subgraphs part of the Graph
- channels#
Dictionary of channels part of the Graph
- Raises:
GraphBuildException – If there was an error building the subgraph, e.g. an unconnected port
Examples
Defining a new subgraph wrapping an output-only example node with a delay node:
>>> class SubGraph(Graph): ... out: Output[int] ... delay: Parameter[int] ... ... def build(self) -> None: ... node = self.add(Example) ... delay = self.add(Delay, parameters=dict(delay=2)) ... self.connect(node.out, delay.inp) ... self.out = self.map_port(delay.out) ... self.delay = self.map(delay.delay)
It can then be used just like any other node:
>>> subgraph = g.add(SubGraph, name="subgraph", parameters={"delay": 10}) >>> g.connect(subgraph.out, other.inp)
- __init__(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10) None [source]#
Methods
__init__
([parent, name, description, ...])add
(component[, name, parameters])Add a component to the graph.
add_all
()Adds all specified components to the graph.
as_dict
()Provides a non-recursive dictionary view of the component.
auto_connect
(sending, receiving[, size])Connects component nodes together automatically, based on port availability and datatype.
build
()Builds a subgraph.
chain
(*nodes[, size])Connects an arbitrary number of nodes in sequence using auto_connect.
check
()Checks if the graph was built correctly and warns about possible deadlocks.
Check all contained node dependencies
combine_parameters
(*parameters[, name, default])Maps multiple low-level parameters to one high-level one.
connect
(sending, receiving[, size, mode])Connects component inputs and outputs together.
Connect multiple pairs of ports together.
Returns all available and registered nodes.
Returns all inputs available to the node.
get_interfaces
([kind])Returns all interfaces available to the node.
get_node
(*names)Recursively find a node in the graph.
get_node_class
(name)Returns the node class corresponding to the given name.
Returns all outputs available to the node.
get_parameter
(*names)Recursively find a parameter in the graph.
Returns all parameters available to the node.
get_port
(*names)Recursively find a port in the graph.
Provides a one-line summary of the node.
map
(*interfaces)Map multiple child interfaces (ports or parameters) onto the current graph.
map_port
(port[, name])Maps a port of a component to the graph.
Check if all required ports are active.
Send a status update to the main process.
Provides a serialized representation of the component type.
setup_directories
([parent_path])Create all work directories for the graph / workflow.
update_parameters
(**kwargs)Update component parameters.
visualize
([max_level, coloring, labels])Visualize the graph using graphviz, if installed.
Attributes
P
P1
P2
P3
P4
P5
Flattened view of all active nodes in the graph.
Returns all settable parameters and unconnected inputs
Provides the full path to the component as a tuple of names.
The component datatype if it's generic.
Flattened view of all connections in the graph.
Flattened view of all components in the graph.
Flattened view of all nodes in the graph.
Returns the number of items waiting to be received
Returns the number of items waiting to be sent
Provides the configuration of the current node
Provides all parent components.
Provides a convenience iterator for all inputs and outputs.
List of external commandline programs that are required for running the component.
List of required python packages
Provides the root workflow or graph instance.
Python logger for both the build and run procedures.
Timer for the run duration, without waiting for resources or other nodes.
Timer for the full duration, including waiting for resources or other nodes.
Working directory for the component.
Current status of the component.
- add(component: type[U], name: str | None = None, parameters: dict[str, Any] | None = None, **kwargs: Any) U [source]#
Add a component to the graph.
- Parameters:
name – Unique name of the component
component – Node class or subgraph class
kwargs – Additional arguments passed to the component constructor
- Returns:
The initialized component
- Return type:
- Raises:
GraphBuildException – If a node with the same name already exists
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo, name="foo", parameters=dict(val=42)) >>> bar = g.add(Bar)
- add_all(c1: type[_T1], c2: type[_T2], c3: type[_T3], c4: type[_T4], /) tuple[_T1, _T2, _T3, _T4] [source]#
- add_all(c1: type[_T1], c2: type[_T2], c3: type[_T3], c4: type[_T4], c5: type[_T5], /) tuple[_T1, _T2, _T3, _T4, _T5]
- add_all(c1: type[_T1], c2: type[_T2], c3: type[_T3], c4: type[_T4], c5: type[_T5], c6: type[_T6], /) tuple[_T1, _T2, _T3, _T4, _T5, _T6]
Adds all specified components to the graph.
- Parameters:
components – All component classes to initialize
- Returns:
The initialized component instances
- Return type:
tuple[U, …]
Examples
>>> g = Graph(name="foo") >>> foo, bar = g.add_all(Foo, Bar)
- property all_parameters: dict[str, Input[Any] | MultiInput[Any] | Parameter[Any]]#
Returns all settable parameters and unconnected inputs
- auto_connect(sending: Component, receiving: Component, size: int = 10) None [source]#
Connects component nodes together automatically, based on port availability and datatype.
This should really only be used in unambiguous cases, otherwise this will lead to an only partially-connected graph.
- Parameters:
sending – Sending node
receiving – Receiving node
size – Size (in items) of the queue used for communication
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> g.auto_connect(foo, bar)
- build() None [source]#
Builds a subgraph.
Override this method to construct a subgraph encapsulating multiple lower-level nodes, by using the add and connect methods. Additionally use the map, map_port, and map_parameters methods to create a subgraph that can be used just like a node.
Examples
>>> def build(self): ... foo = self.add(Foo) ... bar = self.add(Bar) ... self.map(foo.inp, bar.out, foo.param)
- chain(*nodes: Component, size: int = 10) None [source]#
Connects an arbitrary number of nodes in sequence using auto_connect.
- Parameters:
nodes – Nodes to be connected in sequence
size – Size of each channel connecting the nodes
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> baz = g.add(Baz) >>> g.chain(foo, bar, baz)
- check() None [source]#
Checks if the graph was built correctly and warns about possible deadlocks.
A correctly built graph has no unconnected ports, and all channel types are matched internally.
- Raises:
GraphBuildException – If a port is unconnected
Examples
>>> g = Workflow(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> g.auto_connect(foo, bar) >>> g.check()
- combine_parameters(*parameters: Parameter[T_co], name: str | None = None, default: T_co | None = None) MultiParameter[T_co] [source]#
Maps multiple low-level parameters to one high-level one.
This can be useful when a single parameter needs to be supplied to multiple nodes within a subgraph. This method also handles setting a graph attribute with the given name.
- Parameters:
parameters – Low-level parameters of component nodes
name – Name of the high-level parameter
default – The default parameter value
- Returns:
The combined parameter object
- Return type:
Examples
>>> def build(self): ... foo = self.add(Foo) ... bar = self.add(Bar) ... self.map_parameters( ... foo.param, bar.param, name="param", default=42)
- property component_path: tuple[str, ...]#
Provides the full path to the component as a tuple of names.
- connect(sending: Output[T_co] | MultiOutput[T_co], receiving: Input[T_co] | MultiInput[T_co], size: int | None = None, mode: Literal['copy', 'link', 'move'] | None = None) None [source]#
Connects component inputs and outputs together.
- Parameters:
sending – Output port for sending items
receiving – Input port for receiving items
size – Size (in items) of the queue used for communication, only for serializable data
mode – Whether to link, copy or move files, overrides value specified for the port
- Raises:
GraphBuildException – If the port types don’t match, or the maximum number of channels supported by your OS has been reached
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> g.connect(foo.out, bar.inp)
- connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], /) None [source]#
- connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], /) None
- connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], p4: tuple[Output[P3] | MultiOutput[P3], Input[P3] | MultiInput[P3]], /) None
- connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], p4: tuple[Output[P3] | MultiOutput[P3], Input[P3] | MultiInput[P3]], p5: tuple[Output[P4] | MultiOutput[P4], Input[P4] | MultiInput[P4]], /) None
- connect_all(p1: tuple[Output[P] | MultiOutput[P], Input[P] | MultiInput[P]], p2: tuple[Output[P1] | MultiOutput[P1], Input[P1] | MultiInput[P1]], p3: tuple[Output[P2] | MultiOutput[P2], Input[P2] | MultiInput[P2]], p4: tuple[Output[P3] | MultiOutput[P3], Input[P3] | MultiInput[P3]], p5: tuple[Output[P4] | MultiOutput[P4], Input[P4] | MultiInput[P4]], p6: tuple[Output[P5] | MultiOutput[P5], Input[P5] | MultiInput[P5]], /) None
Connect multiple pairs of ports together.
- Parameters:
ports – Output - Input pairs to connect
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> baz = g.add(Baz) >>> g.connect_all((foo.out, bar.inp), (bar.out, baz.inp))
- datatype: Any#
The component datatype if it’s generic.
- property flat_channels: set[tuple[tuple[str, ...], tuple[str, ...]]]#
Flattened view of all connections in the graph.
- classmethod get_interfaces(kind: Literal['input', 'output', 'parameter'] | None = None) set[str] #
Returns all interfaces available to the node.
- get_node(*names: str) Component [source]#
Recursively find a node in the graph.
- Parameters:
names – Names of nodes leading up to the potentially nested target node
- Returns:
The target component
- Return type:
- Raises:
KeyError – When the target cannot be found
Examples
>>> g.get_node("subgraph", "subsubgraph", "foo") Foo(name='foo', parent=SubSubGraph(...))
- static get_node_class(name: str) type[Component] #
Returns the node class corresponding to the given name.
- Parameters:
name – Name of the component class to retrieve
- Returns:
The retrieved component class, can be passed to add_node
- Return type:
Type[Component]
- logger: logging.Logger#
Python logger for both the build and run procedures.
- map(*interfaces: Interface[Any]) None [source]#
Map multiple child interfaces (ports or parameters) onto the current graph. Will also set the graph attributes to the names of the mapped interfaces.
- Parameters:
interfaces – Any number of ports and parameters to map
See also
Graph.map_parameters
If you want to map multiple parameters to a single high-level one
Graph.map_port
If you want more fine-grained control over naming
Examples
>>> def build(self): ... foo = self.add(Foo) ... bar = self.add(Bar) ... self.map(foo.inp, bar.out, foo.param)
- map_port(port: _P, name: str | None = None) _P [source]#
Maps a port of a component to the graph.
This will be required when creating custom subgraphs, ports of individual component nodes will need to be mapped to the subgraph. This method also handles setting a graph attribute with the given name.
- Parameters:
port – The component port
name – Name for the port to be registered as
- Returns:
Mapped port
- Return type:
_P
Examples
>>> def build(self): ... node = self.add(Example) ... self.map_port(node.output, name="output")
- property node_config: NodeConfig#
Provides the configuration of the current node
- ports_active() bool #
Check if all required ports are active.
Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.
- Returns:
True
if all required ports are active,False
otherwise.- Return type:
- required_callables: ClassVar[list[str]] = []#
List of external commandline programs that are required for running the component.
- classmethod serialized_summary() _SerialType #
Provides a serialized representation of the component type.
- Returns:
Nested dictionary of the component type structure, including I/O and parameters.
- Return type:
Examples
>>> Merge.serialized_summary() {"name": "Merge", "inputs": [{"name": "inp", ...}]}
- setup_directories(parent_path: Path | None = None) None [source]#
Create all work directories for the graph / workflow.
- status#
Current status of the component.
- update_parameters(**kwargs: dict[str, Any]) None #
Update component parameters.
- Parameters:
**kwargs – Name - value pairs supplied as keyword arguments
- visualize(max_level: int = 9223372036854775807, coloring: Literal['nesting', 'status'] = 'nesting', labels: bool = True) Any [source]#
Visualize the graph using graphviz, if installed.
- Parameters:
max_level – Maximum nesting level to show, shows all levels by default
coloring – Whether to color nodes by nesting level or status
labels – Whether to show datatype labels
- Returns:
Graphviz Dot instance, in a Jupyter notebook this will be displayed visually automatically
- Return type:
dot
- work_dir: Path#
Working directory for the component.