Examples#

Simple DAG#

A very simple directed acyclic graph workflow:

 1"""A simple hello-world-ish example graph."""
 2
 3from maize.core.interface import Parameter, Output, MultiInput
 4from maize.core.node import Node
 5from maize.core.workflow import Workflow
 6
 7# Define the nodes
 8class Example(Node):
 9    data: Parameter[str] = Parameter(default="Hello")
10    out: Output[str] = Output()
11
12    def run(self) -> None:
13        self.out.send(self.data.value)
14
15
16class ConcatAndPrint(Node):
17    inp: MultiInput[str] = MultiInput()
18
19    def run(self) -> None:
20        result = " ".join(inp.receive() for inp in self.inp)
21        self.logger.info("Received: '%s'", result)
22
23
24# Build the graph
25flow = Workflow(name="hello")
26ex1 = flow.add(Example, name="ex1")
27ex2 = flow.add(Example, name="ex2", parameters=dict(data="maize"))
28concat = flow.add(ConcatAndPrint)
29flow.connect(ex1.out, concat.inp)
30flow.connect(ex2.out, concat.inp)
31
32# Check and run!
33flow.check()
34flow.execute()

Simple DCG#

An example of a workflow containing a cycle:

 1"""Simple directed-cyclic-graph example with a subgraph"""
 2
 3from maize.core.graph import Graph
 4from maize.core.interface import Parameter, Input, Output
 5from maize.core.node import Node
 6from maize.core.workflow import Workflow
 7
 8from maize.steps.plumbing import Delay, Merge
 9from maize.steps.io import Return
10
11
12class A(Node):
13    out: Output[int] = Output()
14    send_val: Parameter[int] = Parameter()
15
16    def run(self) -> None:
17        self.out.send(self.send_val.value)
18
19
20class B(Node):
21    inp: Input[int] = Input()
22    out: Output[int] = Output()
23    final: Output[int] = Output()
24
25    def run(self) -> None:
26        val = self.inp.receive()
27        if val > 48:
28            self.logger.debug("%s stopping", self.name)
29            self.final.send(val)
30            return
31        self.out.send(val + 2)
32
33class SubGraph(Graph):
34    def build(self) -> None:
35        a = self.add(A, parameters=dict(send_val=36))
36        d = self.add(Delay, parameters=dict(delay=1))
37        self.connect(a.out, d.inp)
38        self.out = self.map_port(d.out)
39        self.val = self.combine_parameters(a.send_val, name="val")
40
41
42flow = Workflow(name="test")
43sg = flow.add(SubGraph)
44b = flow.add(B, loop=True)
45merge = flow.add(Merge)
46ret = flow.add(Return)
47flow.connect(sg.out, merge.inp)
48flow.connect(merge.out, b.inp)
49flow.connect(b.out, merge.inp)
50flow.connect(b.final, ret.inp)
51flow.combine_parameters(sg.val, name="val")
52flow.check()
53flow.execute()