Workflow#
- class maize.core.workflow.Workflow(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10)[source]#
Bases:
Graph
Represents a workflow graph consisting of individual components.
As a user, one will typically instantiate a Workflow and then add individual nodes or subgraphs and connect them together.
- __init__(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10) None #
Methods
__init__
([parent, name, description, ...])add
(component[, name, parameters])Add a component to the graph.
add_all
(*components)Adds all specified components to the graph.
add_arguments
(parser)Adds custom arguments to an existing parser for workflow parameters
as_dict
()Provides a non-recursive dictionary view of the component.
auto_connect
(sending, receiving[, size])Connects component nodes together automatically, based on port availability and datatype.
build
()Builds a subgraph.
chain
(*nodes[, size])Connects an arbitrary number of nodes in sequence using auto_connect.
check
()Checks if the graph was built correctly and warns about possible deadlocks.
Check all contained node dependencies
cleanup
()Cleans up the graph directory if required
combine_parameters
(*parameters[, name, default])Maps multiple low-level parameters to one high-level one.
connect
(sending, receiving[, size, mode])Connects component inputs and outputs together.
connect_all
(*ports)Connect multiple pairs of ports together.
execute
()Run a given graph.
from_checkpoint
(file)Initialize a graph from a checkpoint file.
from_dict
(data, **kwargs)Read a graph definition from a dictionary parsed from a suitable serialization format.
from_file
(file)Reads in a graph definition in JSON, YAML, or TOML format and creates a runnable workflow graph.
from_name
(name)Create a predefined workflow from a previously registered name.
Generates a global configuration template in TOML format.
Returns all available and registered nodes.
Returns all available and registered / exposed workflows.
Returns all inputs available to the node.
get_interfaces
([kind])Returns all interfaces available to the node.
get_node
(*names)Recursively find a node in the graph.
get_node_class
(name)Returns the node class corresponding to the given name.
Returns all outputs available to the node.
get_parameter
(*names)Recursively find a parameter in the graph.
Returns all parameters available to the node.
get_port
(*names)Recursively find a port in the graph.
Provides a one-line summary of the node.
get_workflow_summary
(name)Provides a one-line summary of the workflow.
load_checkpoint
(data)Load checkpoint data from a dictionary.
map
(*interfaces)Map multiple child interfaces (ports or parameters) onto the current graph.
map_port
(port[, name])Maps a port of a component to the graph.
Check if all required ports are active.
register
(name, factory)Register a workflow for global access.
Send a status update to the main process.
Provides a serialized representation of the component type.
set_global_attribute
(_Workflow__name, ...)Set an attribute for all contained components.
setup_directories
([parent_path])Create all work directories for the graph / workflow.
to_checkpoint
([path, fail_ok])Saves the current graph state, including channel data and node liveness to a file.
to_dict
()Create a dictionary from a graph, ready to be saved in a suitable format.
to_file
(file)Save the graph to a file.
update_parameters
(**kwargs)Update component parameters.
Updates the workflow with global settings from the commandline.
update_with_args
(extra_options[, parser])Update the graph with additional options passed from the commandline.
visualize
([max_level, coloring, labels])Visualize the graph using graphviz, if installed.
Attributes
AddableArgType
P
P1
P2
P3
P4
P5
Flattened view of all active nodes in the graph.
Returns all settable parameters and unconnected inputs
Provides the full path to the component as a tuple of names.
The component datatype if it's generic.
Flattened view of all connections in the graph.
Flattened view of all components in the graph.
Flattened view of all nodes in the graph.
Returns the number of items waiting to be received
Returns the number of items waiting to be sent
Provides the configuration of the current node
Provides all parent components.
Provides a convenience iterator for all inputs and outputs.
List of external commandline programs that are required for running the component.
List of required python packages
Provides the root workflow or graph instance.
Python logger for both the build and run procedures.
Timer for the run duration, without waiting for resources or other nodes.
Timer for the full duration, including waiting for resources or other nodes.
Working directory for the component.
Current status of the component.
- add(component: type[U], name: str | None = None, parameters: dict[str, Any] | None = None, **kwargs: Any) U #
Add a component to the graph.
- Parameters:
name – Unique name of the component
component – Node class or subgraph class
kwargs – Additional arguments passed to the component constructor
- Returns:
The initialized component
- Return type:
- Raises:
GraphBuildException – If a node with the same name already exists
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo, name="foo", parameters=dict(val=42)) >>> bar = g.add(Bar)
- add_all(*components: type[Component]) tuple[Component, ...] #
Adds all specified components to the graph.
- Parameters:
components – All component classes to initialize
- Returns:
The initialized component instances
- Return type:
tuple[U, …]
Examples
>>> g = Graph(name="foo") >>> foo, bar = g.add_all(Foo, Bar)
- add_arguments(parser: _A) _A [source]#
Adds custom arguments to an existing parser for workflow parameters
- Parameters:
parser – Pre-initialized parser or group
- Returns:
An parser instance that can be used to read additional commandline arguments specific to the workflow
- Return type:
argparse.ArgumentParser | argparse._ArgumentGroup
See also
Workflow.update_with_args
Sets up a parser for the workflow, uses add_arguments to update it, and parses all arguments with updated parameters for the workflow
Workflow.update_settings_with_args
Updates the workflow with non-parameter settings
- property all_parameters: dict[str, Input[Any] | MultiInput[Any] | Parameter[Any]]#
Returns all settable parameters and unconnected inputs
- auto_connect(sending: Component, receiving: Component, size: int = 10) None #
Connects component nodes together automatically, based on port availability and datatype.
This should really only be used in unambiguous cases, otherwise this will lead to an only partially-connected graph.
- Parameters:
sending – Sending node
receiving – Receiving node
size – Size (in items) of the queue used for communication
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> g.auto_connect(foo, bar)
- build() None #
Builds a subgraph.
Override this method to construct a subgraph encapsulating multiple lower-level nodes, by using the add and connect methods. Additionally use the map, map_port, and map_parameters methods to create a subgraph that can be used just like a node.
Examples
>>> def build(self): ... foo = self.add(Foo) ... bar = self.add(Bar) ... self.map(foo.inp, bar.out, foo.param)
- chain(*nodes: Component, size: int = 10) None #
Connects an arbitrary number of nodes in sequence using auto_connect.
- Parameters:
nodes – Nodes to be connected in sequence
size – Size of each channel connecting the nodes
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> baz = g.add(Baz) >>> g.chain(foo, bar, baz)
- check() None [source]#
Checks if the graph was built correctly and warns about possible deadlocks.
A correctly built graph has no unconnected ports, and all channel types are matched internally.
- Raises:
GraphBuildException – If a port is unconnected
Examples
>>> g = Workflow(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> g.auto_connect(foo, bar) >>> g.check()
- combine_parameters(*parameters: Parameter[T_co], name: str | None = None, default: T_co | None = None) MultiParameter[T_co] #
Maps multiple low-level parameters to one high-level one.
This can be useful when a single parameter needs to be supplied to multiple nodes within a subgraph. This method also handles setting a graph attribute with the given name.
- Parameters:
parameters – Low-level parameters of component nodes
name – Name of the high-level parameter
default – The default parameter value
- Returns:
The combined parameter object
- Return type:
Examples
>>> def build(self): ... foo = self.add(Foo) ... bar = self.add(Bar) ... self.map_parameters( ... foo.param, bar.param, name="param", default=42)
- property component_path: tuple[str, ...]#
Provides the full path to the component as a tuple of names.
- connect(sending: Output[T_co] | MultiOutput[T_co], receiving: Input[T_co] | MultiInput[T_co], size: int | None = None, mode: Literal['copy', 'link', 'move'] | None = None) None #
Connects component inputs and outputs together.
- Parameters:
sending – Output port for sending items
receiving – Input port for receiving items
size – Size (in items) of the queue used for communication, only for serializable data
mode – Whether to link, copy or move files, overrides value specified for the port
- Raises:
GraphBuildException – If the port types don’t match, or the maximum number of channels supported by your OS has been reached
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> g.connect(foo.out, bar.inp)
- connect_all(*ports: tuple[Output[Any] | MultiOutput[Any], Input[Any] | MultiInput[Any]]) None #
Connect multiple pairs of ports together.
- Parameters:
ports – Output - Input pairs to connect
Examples
>>> g = Graph(name="foo") >>> foo = g.add(Foo) >>> bar = g.add(Bar) >>> baz = g.add(Baz) >>> g.connect_all((foo.out, bar.inp), (bar.out, baz.inp))
- datatype: Any#
The component datatype if it’s generic.
- execute() None [source]#
Run a given graph.
This is the top-level entry for maize execution. It creates a separate logging process and general message queue and then starts the execute methods of all nodes. Any node may at some point signal for the full graph to be shut down, for example after a failure. Normal termination of a node is however signalled by an runtime.StatusUpdate instance with finished status. Any exceptions raised in a node are passed through the message queue and re-raised as a runtime.NodeException.
- Raises:
NodeException – If there was an exception in any node child process
- property flat_channels: set[tuple[tuple[str, ...], tuple[str, ...]]]#
Flattened view of all connections in the graph.
- classmethod from_checkpoint(file: Path | str) Workflow [source]#
Initialize a graph from a checkpoint file.
Checkpoints include two additional sections, _data for any data stored in a channel at time of shutdown, and _status for node status information. We need the data for the full graph and thus use a nested implementation.
_data: - bar: input: <binary> _status: - foo: STOPPED - subgraph: baz: FAILED
- Parameters:
file – Path to the checkpoint file
- Returns:
The initialized graph, with statuses set and channels loaded
- Return type:
- Raises:
ParsingException – If the input file doesn’t conform to the expected format
- classmethod from_dict(data: dict[str, Any], **kwargs: Any) Workflow [source]#
Read a graph definition from a dictionary parsed from a suitable serialization format.
- Parameters:
data – Tree structure as a dictionary containing graph and node parameters, as well as connectivity. For format details see the read_input method.
kwargs – Additional arguments passed to the Component constructor
- Returns:
The constructed graph, with all nodes and channels
- Return type:
- Raises:
ParsingException – If the input dictionary doesn’t conform to the expected format
- classmethod from_file(file: Path | str) Workflow [source]#
Reads in a graph definition in JSON, YAML, or TOML format and creates a runnable workflow graph.
This is an example input:
name: graph description: An optional description for the workflow level: INFO nodes: - name: foo type: ExampleNode # Below options are optional description: An optional description fail_ok: false n_attempts: 1 parameters: val: 40 channels: - sending: foo: out receiving: bar: input # Optional parameters: - name: val map: foo: val
- Parameters:
file – File in JSON, YAML, or TOML format
- Returns:
The complete graph with all connections
- Return type:
- Raises:
ParsingException – If the input file doesn’t conform to the expected format
- classmethod from_name(name: str) Workflow [source]#
Create a predefined workflow from a previously registered name.
- generate_config_template() str [source]#
Generates a global configuration template in TOML format.
- Returns:
The config template
- Return type:
- static get_available_workflows() set[Callable[[], Workflow]] [source]#
Returns all available and registered / exposed workflows.
- Returns:
All available workflow factories
- Return type:
set[Callable[[], “Workflow”]]
- classmethod get_interfaces(kind: Literal['input', 'output', 'parameter'] | None = None) set[str] #
Returns all interfaces available to the node.
- get_node(*names: str) Component #
Recursively find a node in the graph.
- Parameters:
names – Names of nodes leading up to the potentially nested target node
- Returns:
The target component
- Return type:
- Raises:
KeyError – When the target cannot be found
Examples
>>> g.get_node("subgraph", "subsubgraph", "foo") Foo(name='foo', parent=SubSubGraph(...))
- static get_node_class(name: str) type[Component] #
Returns the node class corresponding to the given name.
- Parameters:
name – Name of the component class to retrieve
- Returns:
The retrieved component class, can be passed to add_node
- Return type:
Type[Component]
- load_checkpoint(data: dict[str, Any]) None [source]#
Load checkpoint data from a dictionary.
Uses data as generated by read_input to access the special _data and _status fields.
- Parameters:
data – Dictionary data to read in as a checkpoint. Both _data and _status are optional.
See also
from_checkpoint
Load a Graph from a checkpoint file
to_checkpoint
Save a Graph to a checkpoint file
- logger: logging.Logger#
Python logger for both the build and run procedures.
- map(*interfaces: Interface[Any]) None #
Map multiple child interfaces (ports or parameters) onto the current graph. Will also set the graph attributes to the names of the mapped interfaces.
- Parameters:
interfaces – Any number of ports and parameters to map
See also
Graph.map_parameters
If you want to map multiple parameters to a single high-level one
Graph.map_port
If you want more fine-grained control over naming
Examples
>>> def build(self): ... foo = self.add(Foo) ... bar = self.add(Bar) ... self.map(foo.inp, bar.out, foo.param)
- map_port(port: _P, name: str | None = None) _P #
Maps a port of a component to the graph.
This will be required when creating custom subgraphs, ports of individual component nodes will need to be mapped to the subgraph. This method also handles setting a graph attribute with the given name.
- Parameters:
port – The component port
name – Name for the port to be registered as
- Returns:
Mapped port
- Return type:
_P
Examples
>>> def build(self): ... node = self.add(Example) ... self.map_port(node.output, name="output")
- property node_config: NodeConfig#
Provides the configuration of the current node
- ports_active() bool #
Check if all required ports are active.
Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.
- Returns:
True
if all required ports are active,False
otherwise.- Return type:
- classmethod register(name: str, factory: Callable[[], Workflow]) None [source]#
Register a workflow for global access.
- Parameters:
name – Name to register the workflow under
factory – Function returning an initialized and built workflow
- required_callables: ClassVar[list[str]] = []#
List of external commandline programs that are required for running the component.
- classmethod serialized_summary() _SerialType #
Provides a serialized representation of the component type.
- Returns:
Nested dictionary of the component type structure, including I/O and parameters.
- Return type:
Examples
>>> Merge.serialized_summary() {"name": "Merge", "inputs": [{"name": "inp", ...}]}
- set_global_attribute(_Workflow__name: str, _Workflow__value: Any, /) None [source]#
Set an attribute for all contained components.
- setup_directories(parent_path: Path | None = None) None #
Create all work directories for the graph / workflow.
- status#
Current status of the component.
- to_checkpoint(path: Path | str | None = None, fail_ok: bool = True) None [source]#
Saves the current graph state, including channel data and node liveness to a file.
- Parameters:
path – Optional filename for the checkpoint
fail_ok – If
True
, will only log a warning instead of raising an exception when encountering a writing problem.
- Raises:
CheckpointException – Raised for checkpoint writing errors when fail_ok is
False
- to_dict() dict[str, Any] [source]#
Create a dictionary from a graph, ready to be saved in a suitable format.
Examples
>>> g = Workflow(name="foo") ... foo = g.add(Foo) ... bar = g.add(Bar) ... g.auto_connect(foo, bar) ... data = g.to_dict()
- to_file(file: Path | str) None [source]#
Save the graph to a file. The type is inferred from the suffix and can be one of JSON, YAML, or TOML.
- Parameters:
file – Path to the file to save to
Examples
>>> g = Workflow(name="foo") ... foo = g.add(Foo) ... bar = g.add(Bar) ... g.auto_connect(foo, bar) ... g.to_file("graph.yml")
- update_parameters(**kwargs: dict[str, Any]) None #
Update component parameters.
- Parameters:
**kwargs – Name - value pairs supplied as keyword arguments
- update_settings_with_args(args: Namespace) None [source]#
Updates the workflow with global settings from the commandline.
- Parameters:
args – Namespace including the args to use. See maize -h for possible options.
- update_with_args(extra_options: list[str], parser: ArgumentParser | None = None) None [source]#
Update the graph with additional options passed from the commandline.
- Parameters:
extra_options – List of option strings, i.e. the output of
parse_args
parser – Optional parser to reuse
- Raises:
ParsingException – Raised when encountering unexpected commandline options
- visualize(max_level: int = 9223372036854775807, coloring: Literal['nesting', 'status'] = 'nesting', labels: bool = True) Any #
Visualize the graph using graphviz, if installed.
- Parameters:
max_level – Maximum nesting level to show, shows all levels by default
coloring – Whether to color nodes by nesting level or status
labels – Whether to show datatype labels
- Returns:
Graphviz Dot instance, in a Jupyter notebook this will be displayed visually automatically
- Return type:
dot
- work_dir: Path#
Working directory for the component.