Plumbing#
These nodes make piping data between varying numbers of other steps a bit easier.
General purpose tasks for data-flow control.
- class maize.steps.plumbing.Accumulate(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Accumulate multiple independent packets into one large packet.
- inp: Input[T]
Packets to accumulate
- class maize.steps.plumbing.Barrier(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Only sends data onwards if a signal is received
- inp: Input[T]
Data input
- out: Output[T]
Data output
- class maize.steps.plumbing.Batch(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]
Create batches of data from a single large input
- class maize.steps.plumbing.Combine(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]
Combine multiple batches of data into a single dataset
- class maize.steps.plumbing.Copy(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Copy a single input packet to multiple output channels.
- inp: Input[T]
Single input to broadcast
- out: MultiOutput[T]
Multiple outputs to broadcast over
- class maize.steps.plumbing.Delay(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Pass on a packet with a custom delay.
- inp: Input[T]
Data input
- out: Output[T]
Data output
- class maize.steps.plumbing.Merge(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Collect inputs from multiple channels and send them to a single output port on a first-in-first-out (FIFO) basis.
- inp: MultiInput[T]
Flexible number of input channels to be merged
- out: Output[T]
Single output for all merged data
- class maize.steps.plumbing.Multiply(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]
Creates a list of multiple of the same item
- inp: Input[T]
Data input
- class maize.steps.plumbing.RoundRobin(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Outputs a single input packet to a single output port at a time, cycling through output ports.
- inp: Input[T]
Single input to alternatingly send on
- out: MultiOutput[T]
Multiple outputs to distribute over
- prepare() None [source]
Prepares the execution environment for run.
Performs the following:
Changing the python environment, if required
Setting of environment variables
Setting of parameters from the config
Loading LMOD modules
Importing python packages listed in required_packages
Checking if software in required_callables is available
- class maize.steps.plumbing.Scatter(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Decompose one large packet into it’s constituent items and send them separately.
- out: Output[T]
Unpacked data
- class maize.steps.plumbing.Yes(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]
Sends a single received value multiple times
- inp: Input[T]
Data input
- out: Output[T]
Data output
- ports_active() bool [source]
Check if all required ports are active.
Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.
- Returns:
True
if all required ports are active,False
otherwise.- Return type: