Plumbing#

These nodes make piping data between varying numbers of other steps a bit easier.

General purpose tasks for data-flow control.

class maize.steps.plumbing.Accumulate(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Accumulate multiple independent packets into one large packet.

inp: Input[T]

Packets to accumulate

n_packets: Parameter[int]

Number of packets to receive before sending one large packet

out: Output[list[T]]

Output for accumulated packets

class maize.steps.plumbing.Barrier(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Only sends data onwards if a signal is received

inp: Input[T]

Data input

inp_signal: Input[bool]

Signal data, upon receiving True will send the held data onwards

out: Output[T]

Data output

class maize.steps.plumbing.Batch(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]

Create batches of data from a single large input

inp: Input[list[T]]

Input data

n_batches: Parameter[int]

Number of chunks

out: Output[list[T]]

Stream of output data chunks

class maize.steps.plumbing.Combine(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]

Combine multiple batches of data into a single dataset

inp: Input[list[T] | ndarray[Any, dtype[Any]]]

Input data chunks to combine

n_batches: Parameter[int]

Number of chunks

out: Output[list[T]]

Single combined output data

class maize.steps.plumbing.Copy(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Copy a single input packet to multiple output channels.

inp: Input[T]

Single input to broadcast

out: MultiOutput[T]

Multiple outputs to broadcast over

class maize.steps.plumbing.Delay(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Pass on a packet with a custom delay.

delay: Parameter[float | int]

Delay in seconds

inp: Input[T]

Data input

out: Output[T]

Data output

class maize.steps.plumbing.Merge(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Collect inputs from multiple channels and send them to a single output port on a first-in-first-out (FIFO) basis.

inp: MultiInput[T]

Flexible number of input channels to be merged

out: Output[T]

Single output for all merged data

class maize.steps.plumbing.Multiply(parent: Graph | None = None, name: str | None = None, description: str | None = None, fail_ok: bool = False, n_attempts: int = 1, level: int | str | None = None, cleanup_temp: bool = True, resume: bool = False, logfile: Path | None = None, max_cpus: int | None = None, max_gpus: int | None = None, loop: bool | None = None, max_loops: int = -1, initial_status: Status = Status.NOT_READY)[source]

Creates a list of multiple of the same item

inp: Input[T]

Data input

n_packages: Parameter[int]

Number of times to multiply the data

out: Output[list[T]]

Data output

class maize.steps.plumbing.RoundRobin(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Outputs a single input packet to a single output port at a time, cycling through output ports.

inp: Input[T]

Single input to alternatingly send on

out: MultiOutput[T]

Multiple outputs to distribute over

prepare() None[source]

Prepares the execution environment for run.

Performs the following:

  • Changing the python environment, if required

  • Setting of environment variables

  • Setting of parameters from the config

  • Loading LMOD modules

  • Importing python packages listed in required_packages

  • Checking if software in required_callables is available

class maize.steps.plumbing.Scatter(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Decompose one large packet into it’s constituent items and send them separately.

inp: Input[list[T]]

Packets of sequences that allow unpacking

out: Output[T]

Unpacked data

class maize.steps.plumbing.Yes(max_loops: int = -1, initial_status: Status = Status.NOT_READY, **kwargs: Any)[source]

Sends a single received value multiple times

inp: Input[T]

Data input

out: Output[T]

Data output

ports_active() bool[source]

Check if all required ports are active.

Can be overridden by the user to allow custom shutdown scenarios, for example in the case of complex inter-port dependencies. By default only checks if any mandatory ports are inactive.

Returns:

True if all required ports are active, False otherwise.

Return type:

bool