Examples#
Simple DAG#
A very simple directed acyclic graph workflow:
1"""A simple hello-world-ish example graph."""
2
3from maize.core.interface import Parameter, Output, MultiInput
4from maize.core.node import Node
5from maize.core.workflow import Workflow
6
7# Define the nodes
8class Example(Node):
9 data: Parameter[str] = Parameter(default="Hello")
10 out: Output[str] = Output()
11
12 def run(self) -> None:
13 self.out.send(self.data.value)
14
15
16class ConcatAndPrint(Node):
17 inp: MultiInput[str] = MultiInput()
18
19 def run(self) -> None:
20 result = " ".join(inp.receive() for inp in self.inp)
21 self.logger.info("Received: '%s'", result)
22
23
24# Build the graph
25flow = Workflow(name="hello")
26ex1 = flow.add(Example, name="ex1")
27ex2 = flow.add(Example, name="ex2", parameters=dict(data="maize"))
28concat = flow.add(ConcatAndPrint)
29flow.connect(ex1.out, concat.inp)
30flow.connect(ex2.out, concat.inp)
31
32# Check and run!
33flow.check()
34flow.execute()
Simple DCG#
An example of a workflow containing a cycle:
1"""Simple directed-cyclic-graph example with a subgraph"""
2
3from maize.core.graph import Graph
4from maize.core.interface import Parameter, Input, Output
5from maize.core.node import Node
6from maize.core.workflow import Workflow
7
8from maize.steps.plumbing import Delay, Merge
9from maize.steps.io import Return
10
11
12class A(Node):
13 out: Output[int] = Output()
14 send_val: Parameter[int] = Parameter()
15
16 def run(self) -> None:
17 self.out.send(self.send_val.value)
18
19
20class B(Node):
21 inp: Input[int] = Input()
22 out: Output[int] = Output()
23 final: Output[int] = Output()
24
25 def run(self) -> None:
26 val = self.inp.receive()
27 if val > 48:
28 self.logger.debug("%s stopping", self.name)
29 self.final.send(val)
30 return
31 self.out.send(val + 2)
32
33class SubGraph(Graph):
34 def build(self) -> None:
35 a = self.add(A, parameters=dict(send_val=36))
36 d = self.add(Delay, parameters=dict(delay=1))
37 self.connect(a.out, d.inp)
38 self.out = self.map_port(d.out)
39 self.val = self.combine_parameters(a.send_val, name="val")
40
41
42flow = Workflow(name="test")
43sg = flow.add(SubGraph)
44b = flow.add(B, loop=True)
45merge = flow.add(Merge)
46ret = flow.add(Return)
47flow.connect(sg.out, merge.inp)
48flow.connect(merge.out, b.inp)
49flow.connect(b.out, merge.inp)
50flow.connect(b.final, ret.inp)
51flow.combine_parameters(sg.val, name="val")
52flow.check()
53flow.execute()