Workflow#

class maize.core.workflow.Workflow(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10)[source]#

Bases: Graph

Represents a workflow graph consisting of individual components.

As a user, one will typically instantiate a Workflow and then add individual nodes or subgraphs and connect them together.

__init__(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, strict: bool = True, default_channel_size: int = 10) None#

Methods

__init__([parent, name, description, ...])

add(component[, name, parameters])

Add a component to the graph.

add_all(*components)

Adds all specified components to the graph.

add_arguments(parser)

Adds custom arguments to an existing parser for workflow parameters

as_dict()

Provides a non-recursive dictionary view of the component.

auto_connect(sending, receiving[, size])

Connects component nodes together automatically, based on port availability and datatype.

build()

Builds a subgraph.

chain(*nodes[, size])

Connects an arbitrary number of nodes in sequence using auto_connect.

check()

Checks if the graph was built correctly and warns about possible deadlocks.

check_dependencies()

Check all contained node dependencies

cleanup()

Cleans up the graph directory if required

combine_parameters(*parameters[, name, default])

Maps multiple low-level parameters to one high-level one.

connect(sending, receiving[, size, mode])

Connects component inputs and outputs together.

connect_all(*ports)

Connect multiple pairs of ports together.

execute()

Run a given graph.

from_checkpoint(file)

Initialize a graph from a checkpoint file.

from_dict(data, **kwargs)

Read a graph definition from a dictionary parsed from a suitable serialization format.

from_file(file)

Reads in a graph definition in JSON, YAML, or TOML format and creates a runnable workflow graph.

from_name(name)

Create a predefined workflow from a previously registered name.

generate_config_template()

Generates a global configuration template in TOML format.

get_available_nodes()

Returns all available and registered nodes.

get_available_workflows()

Returns all available and registered / exposed workflows.

get_inputs()

Returns all inputs available to the node.

get_interfaces([kind])

Returns all interfaces available to the node.

get_node(*names)

Recursively find a node in the graph.

get_node_class(name)

Returns the node class corresponding to the given name.

get_outputs()

Returns all outputs available to the node.

get_parameter(*names)

Recursively find a parameter in the graph.

get_parameters()

Returns all parameters available to the node.

get_port(*names)

Recursively find a port in the graph.

get_summary_line()

Provides a one-line summary of the node.

get_workflow_summary(name)

Provides a one-line summary of the workflow.

load_checkpoint(data)

Load checkpoint data from a dictionary.

map(*interfaces)

Map multiple child interfaces (ports or parameters) onto the current graph.

map_port(port[, name])

Maps a port of a component to the graph.

ports_active()

Check if all required ports are active.

register(name, factory)

Register a workflow for global access.

send_update()

Send a status update to the main process.

serialized_summary()

Provides a serialized representation of the component type.

set_global_attribute(_Workflow__name, ...)

Set an attribute for all contained components.

setup_directories([parent_path])

Create all work directories for the graph / workflow.

to_checkpoint([path, fail_ok])

Saves the current graph state, including channel data and node liveness to a file.

to_dict()

Create a dictionary from a graph, ready to be saved in a suitable format.

to_file(file)

Save the graph to a file.

update_parameters(**kwargs)

Update component parameters.

update_settings_with_args(args)

Updates the workflow with global settings from the commandline.

update_with_args(extra_options[, parser])

Update the graph with additional options passed from the commandline.

visualize([max_level, coloring, labels])

Visualize the graph using graphviz, if installed.

Attributes

AddableArgType

P

P1

P2

P3

P4

P5

active_nodes

Flattened view of all active nodes in the graph.

all_parameters

Returns all settable parameters and unconnected inputs

component_path

Provides the full path to the component as a tuple of names.

datatype

The component datatype if it's generic.

flat_channels

Flattened view of all connections in the graph.

flat_components

Flattened view of all components in the graph.

flat_nodes

Flattened view of all nodes in the graph.

n_inbound

Returns the number of items waiting to be received

n_outbound

Returns the number of items waiting to be sent

node_config

Provides the configuration of the current node

parents

Provides all parent components.

ports

Provides a convenience iterator for all inputs and outputs.

required_callables

List of external commandline programs that are required for running the component.

required_packages

List of required python packages

root

Provides the root workflow or graph instance.

logger

Python logger for both the build and run procedures.

run_timer

Timer for the run duration, without waiting for resources or other nodes.

full_timer

Timer for the full duration, including waiting for resources or other nodes.

work_dir

Working directory for the component.

status

Current status of the component.

property active_nodes: list[Node]#

Flattened view of all active nodes in the graph.

add(component: type[U], name: str | None = None, parameters: dict[str, Any] | None = None, **kwargs: Any) U#

Add a component to the graph.

Parameters:
  • name – Unique name of the component

  • component – Node class or subgraph class

  • kwargs – Additional arguments passed to the component constructor

Returns:

The initialized component

Return type:

Component

Raises:

GraphBuildException – If a node with the same name already exists

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo, name="foo", parameters=dict(val=42))
>>> bar = g.add(Bar)
add_all(*components: type[Component]) tuple[Component, ...]#

Adds all specified components to the graph.

Parameters:

components – All component classes to initialize

Returns:

The initialized component instances

Return type:

tuple[U, …]

Examples

>>> g = Graph(name="foo")
>>> foo, bar = g.add_all(Foo, Bar)
add_arguments(parser: _A) _A[source]#

Adds custom arguments to an existing parser for workflow parameters

Parameters:

parser – Pre-initialized parser or group

Returns:

An parser instance that can be used to read additional commandline arguments specific to the workflow

Return type:

argparse.ArgumentParser | argparse._ArgumentGroup

See also

Workflow.update_with_args

Sets up a parser for the workflow, uses add_arguments to update it, and parses all arguments with updated parameters for the workflow

Workflow.update_settings_with_args

Updates the workflow with non-parameter settings

property all_parameters: dict[str, Input[Any] | MultiInput[Any] | Parameter[Any]]#

Returns all settable parameters and unconnected inputs

as_dict() dict[str, Any][source]#

Provides a non-recursive dictionary view of the component.

auto_connect(sending: Component, receiving: Component, size: int = 10) None#

Connects component nodes together automatically, based on port availability and datatype.

This should really only be used in unambiguous cases, otherwise this will lead to an only partially-connected graph.

Parameters:
  • sending – Sending node

  • receiving – Receiving node

  • size – Size (in items) of the queue used for communication

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> g.auto_connect(foo, bar)
build() None#

Builds a subgraph.

Override this method to construct a subgraph encapsulating multiple lower-level nodes, by using the add and connect methods. Additionally use the map, map_port, and map_parameters methods to create a subgraph that can be used just like a node.

Examples

>>> def build(self):
...     foo = self.add(Foo)
...     bar = self.add(Bar)
...     self.map(foo.inp, bar.out, foo.param)
chain(*nodes: Component, size: int = 10) None#

Connects an arbitrary number of nodes in sequence using auto_connect.

Parameters:
  • nodes – Nodes to be connected in sequence

  • size – Size of each channel connecting the nodes

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> baz = g.add(Baz)
>>> g.chain(foo, bar, baz)
check() None[source]#

Checks if the graph was built correctly and warns about possible deadlocks.

A correctly built graph has no unconnected ports, and all channel types are matched internally.

Raises:

GraphBuildException – If a port is unconnected

Examples

>>> g = Workflow(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> g.auto_connect(foo, bar)
>>> g.check()
check_dependencies() None#

Check all contained node dependencies

cleanup() None[source]#

Cleans up the graph directory if required

combine_parameters(*parameters: Parameter[T_co], name: str | None = None, default: T_co | None = None) MultiParameter[T_co]#

Maps multiple low-level parameters to one high-level one.

This can be useful when a single parameter needs to be supplied to multiple nodes within a subgraph. This method also handles setting a graph attribute with the given name.

Parameters:
  • parameters – Low-level parameters of component nodes

  • name – Name of the high-level parameter

  • default – The default parameter value

Returns:

The combined parameter object

Return type:

MultiParameter

Examples

>>> def build(self):
...     foo = self.add(Foo)
...     bar = self.add(Bar)
...     self.map_parameters(
...         foo.param, bar.param, name="param", default=42)
property component_path: tuple[str, ...]#

Provides the full path to the component as a tuple of names.

connect(sending: Output[T_co] | MultiOutput[T_co], receiving: Input[T_co] | MultiInput[T_co], size: int | None = None, mode: Literal['copy', 'link', 'move'] | None = None) None#

Connects component inputs and outputs together.

Parameters:
  • sending – Output port for sending items

  • receiving – Input port for receiving items

  • size – Size (in items) of the queue used for communication, only for serializable data

  • mode – Whether to link, copy or move files, overrides value specified for the port

Raises:

GraphBuildException – If the port types don’t match, or the maximum number of channels supported by your OS has been reached

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> g.connect(foo.out, bar.inp)
connect_all(*ports: tuple[Output[Any] | MultiOutput[Any], Input[Any] | MultiInput[Any]]) None#

Connect multiple pairs of ports together.

Parameters:

ports – Output - Input pairs to connect

Examples

>>> g = Graph(name="foo")
>>> foo = g.add(Foo)
>>> bar = g.add(Bar)
>>> baz = g.add(Baz)
>>> g.connect_all((foo.out, bar.inp), (bar.out, baz.inp))
datatype: Any#

The component datatype if it’s generic.

execute() None[source]#

Run a given graph.

This is the top-level entry for maize execution. It creates a separate logging process and general message queue and then starts the execute methods of all nodes. Any node may at some point signal for the full graph to be shut down, for example after a failure. Normal termination of a node is however signalled by an runtime.StatusUpdate instance with finished status. Any exceptions raised in a node are passed through the message queue and re-raised as a runtime.NodeException.

Raises:

NodeException – If there was an exception in any node child process

property flat_channels: set[tuple[tuple[str, ...], tuple[str, ...]]]#

Flattened view of all connections in the graph.

property flat_components: list[Component]#

Flattened view of all components in the graph.

property flat_nodes: list[Node]#

Flattened view of all nodes in the graph.

classmethod from_checkpoint(file: Path | str) Workflow[source]#

Initialize a graph from a checkpoint file.

Checkpoints include two additional sections, _data for any data stored in a channel at time of shutdown, and _status for node status information. We need the data for the full graph and thus use a nested implementation.

_data:
  - bar: input: <binary>
_status:
  - foo: STOPPED
  - subgraph: baz: FAILED
Parameters:

file – Path to the checkpoint file

Returns:

The initialized graph, with statuses set and channels loaded

Return type:

Graph

Raises:

ParsingException – If the input file doesn’t conform to the expected format

classmethod from_dict(data: dict[str, Any], **kwargs: Any) Workflow[source]#

Read a graph definition from a dictionary parsed from a suitable serialization format.

Parameters:
  • data – Tree structure as a dictionary containing graph and node parameters, as well as connectivity. For format details see the read_input method.

  • kwargs – Additional arguments passed to the Component constructor

Returns:

The constructed graph, with all nodes and channels

Return type:

Graph

Raises:

ParsingException – If the input dictionary doesn’t conform to the expected format

classmethod from_file(file: Path | str) Workflow[source]#

Reads in a graph definition in JSON, YAML, or TOML format and creates a runnable workflow graph.

This is an example input:

name: graph
description: An optional description for the workflow
level: INFO
nodes:
  - name: foo
    type: ExampleNode

    # Below options are optional
    description: An optional description
    fail_ok: false
    n_attempts: 1
    parameters:
      val: 40

channels:
  - sending: foo: out
    receiving: bar: input

# Optional
parameters:
  - name: val
    map:
      foo: val
Parameters:

file – File in JSON, YAML, or TOML format

Returns:

The complete graph with all connections

Return type:

Workflow

Raises:

ParsingException – If the input file doesn’t conform to the expected format

classmethod from_name(name: str) Workflow[source]#

Create a predefined workflow from a previously registered name.

Parameters:

name – Name the workflow is registered under

Returns:

The constructed workflow, with all nodes and channels

Return type:

Workflow

Raises:

KeyError – If a workflow under that name cannot be found

full_timer: Timer#

Timer for the full duration, including waiting for resources or other nodes.

generate_config_template() str[source]#

Generates a global configuration template in TOML format.

Returns:

The config template

Return type:

str

static get_available_nodes() set[type[Component]]#

Returns all available and registered nodes.

Returns:

All available node names

Return type:

set[str]

static get_available_workflows() set[Callable[[], Workflow]][source]#

Returns all available and registered / exposed workflows.

Returns:

All available workflow factories

Return type:

set[Callable[[], “Workflow”]]

classmethod get_inputs() set[str]#

Returns all inputs available to the node.

classmethod get_interfaces(kind: Literal['input', 'output', 'parameter'] | None = None) set[str]#

Returns all interfaces available to the node.

Parameters:

kind – Kind of interface to retrieve

Returns:

Interface names

Return type:

set[str]

get_node(*names: str) Component#

Recursively find a node in the graph.

Parameters:

names – Names of nodes leading up to the potentially nested target node

Returns:

The target component

Return type:

Component

Raises:

KeyError – When the target cannot be found

Examples

>>> g.get_node("subgraph", "subsubgraph", "foo")
Foo(name='foo', parent=SubSubGraph(...))
static get_node_class(name: str) type[Component]#

Returns the node class corresponding to the given name.

Parameters:

name – Name of the component class to retrieve

Returns:

The retrieved component class, can be passed to add_node

Return type:

Type[Component]

classmethod get_outputs() set[str]#

Returns all outputs available to the node.

get_parameter(*names: str) Parameter[Any]#

Recursively find a parameter in the graph.

Parameters:

names – Names of components leading up to the target parameter

Returns:

The target parameter

Return type:

Parameter

Raises:

KeyError – When the parameter cannot be found

classmethod get_parameters() set[str]#

Returns all parameters available to the node.

get_port(*names: str) Port[Any]#

Recursively find a port in the graph.

Parameters:

names – Names of components leading up to the target port

Returns:

The target port

Return type:

Port

Raises:

KeyError – When the target cannot be found

classmethod get_summary_line() str#

Provides a one-line summary of the node.

static get_workflow_summary(name: str) str[source]#

Provides a one-line summary of the workflow.

load_checkpoint(data: dict[str, Any]) None[source]#

Load checkpoint data from a dictionary.

Uses data as generated by read_input to access the special _data and _status fields.

Parameters:

data – Dictionary data to read in as a checkpoint. Both _data and _status are optional.

See also

from_checkpoint

Load a Graph from a checkpoint file

to_checkpoint

Save a Graph to a checkpoint file

logger: logging.Logger#

Python logger for both the build and run procedures.

map(*interfaces: Interface[Any]) None#

Map multiple child interfaces (ports or parameters) onto the current graph. Will also set the graph attributes to the names of the mapped interfaces.

Parameters:

interfaces – Any number of ports and parameters to map

See also

Graph.map_parameters

If you want to map multiple parameters to a single high-level one

Graph.map_port

If you want more fine-grained control over naming

Examples

>>> def build(self):
...     foo = self.add(Foo)
...     bar = self.add(Bar)
...     self.map(foo.inp, bar.out, foo.param)
map_port(port: _P, name: str | None = None) _P#

Maps a port of a component to the graph.

This will be required when creating custom subgraphs, ports of individual component nodes will need to be mapped to the subgraph. This method also handles setting a graph attribute with the given name.

Parameters:
  • port – The component port

  • name – Name for the port to be registered as

Returns:

Mapped port

Return type:

_P

Examples

>>> def build(self):
...     node = self.add(Example)
...     self.map_port(node.output, name="output")
property n_inbound: int#

Returns the number of items waiting to be received

property n_outbound: int#

Returns the number of items waiting to be sent

property node_config: NodeConfig#

Provides the configuration of the current node

property parents: tuple[Component, ...] | None#

Provides all parent components.

property ports: dict[str, Port[Any]]#

Provides a convenience iterator for all inputs and outputs.

ports_active() bool#

Check if all required ports are active.

Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.

Returns:

True if all required ports are active, False otherwise.

Return type:

bool

classmethod register(name: str, factory: Callable[[], Workflow]) None[source]#

Register a workflow for global access.

Parameters:
  • name – Name to register the workflow under

  • factory – Function returning an initialized and built workflow

required_callables: ClassVar[list[str]] = []#

List of external commandline programs that are required for running the component.

required_packages: ClassVar[list[str]] = []#

List of required python packages

property root: Graph#

Provides the root workflow or graph instance.

run_timer: Timer#

Timer for the run duration, without waiting for resources or other nodes.

send_update() None#

Send a status update to the main process.

classmethod serialized_summary() _SerialType#

Provides a serialized representation of the component type.

Returns:

Nested dictionary of the component type structure, including I/O and parameters.

Return type:

dict[str, Any]

Examples

>>> Merge.serialized_summary()
{"name": "Merge", "inputs": [{"name": "inp", ...}]}
set_global_attribute(_Workflow__name: str, _Workflow__value: Any, /) None[source]#

Set an attribute for all contained components.

setup_directories(parent_path: Path | None = None) None#

Create all work directories for the graph / workflow.

status#

Current status of the component.

to_checkpoint(path: Path | str | None = None, fail_ok: bool = True) None[source]#

Saves the current graph state, including channel data and node liveness to a file.

Parameters:
  • path – Optional filename for the checkpoint

  • fail_ok – If True, will only log a warning instead of raising an exception when encountering a writing problem.

Raises:

CheckpointException – Raised for checkpoint writing errors when fail_ok is False

to_dict() dict[str, Any][source]#

Create a dictionary from a graph, ready to be saved in a suitable format.

Returns:

Nested dictionary equivalent to the input format

Return type:

dict[str, Any]

Examples

>>> g = Workflow(name="foo")
... foo = g.add(Foo)
... bar = g.add(Bar)
... g.auto_connect(foo, bar)
... data = g.to_dict()
to_file(file: Path | str) None[source]#

Save the graph to a file. The type is inferred from the suffix and can be one of JSON, YAML, or TOML.

Parameters:

file – Path to the file to save to

Examples

>>> g = Workflow(name="foo")
... foo = g.add(Foo)
... bar = g.add(Bar)
... g.auto_connect(foo, bar)
... g.to_file("graph.yml")
update_parameters(**kwargs: dict[str, Any]) None#

Update component parameters.

Parameters:

**kwargs – Name - value pairs supplied as keyword arguments

update_settings_with_args(args: Namespace) None[source]#

Updates the workflow with global settings from the commandline.

Parameters:

args – Namespace including the args to use. See maize -h for possible options.

update_with_args(extra_options: list[str], parser: ArgumentParser | None = None) None[source]#

Update the graph with additional options passed from the commandline.

Parameters:
  • extra_options – List of option strings, i.e. the output of parse_args

  • parser – Optional parser to reuse

Raises:

ParsingException – Raised when encountering unexpected commandline options

visualize(max_level: int = 9223372036854775807, coloring: Literal['nesting', 'status'] = 'nesting', labels: bool = True) Any#

Visualize the graph using graphviz, if installed.

Parameters:
  • max_level – Maximum nesting level to show, shows all levels by default

  • coloring – Whether to color nodes by nesting level or status

  • labels – Whether to show datatype labels

Returns:

Graphviz Dot instance, in a Jupyter notebook this will be displayed visually automatically

Return type:

dot

work_dir: Path#

Working directory for the component.