Source code for icolos.utils.general.parallelization

import math
import multiprocessing
from typing import List, Callable, Dict, Any
from pydantic import BaseModel
from icolos.utils.enums.parallelization import ParallelizationEnum

_PE = ParallelizationEnum


[docs]class Subtask(BaseModel): status: _PE = _PE.STATUS_READY times_tried: int = 0 data: Any job_id: str = None
[docs] def increment_tries(self): self.times_tried += 1
[docs] def set_status(self, status: str): self.status = status
[docs] def set_job_id(self, job_id: str): self.job_id = job_id
[docs] def set_status_failed(self): self.set_status(_PE.STATUS_FAILED)
[docs] def set_status_success(self): self.set_status(_PE.STATUS_SUCCESS)
[docs]class SubtaskContainer(BaseModel): max_tries: int subtasks: List[Subtask] = [] def __init__(self, **data): super().__init__(**data)
[docs] def clear(self): self.subtasks = []
[docs] def load_data(self, data: List[Any]): self.clear() self.add_data(data=data)
[docs] def add_data(self, data: List[Any]): for data_element in data: self.subtasks.append( Subtask(status=_PE.STATUS_READY, times_tried=0, data=data_element) )
[docs] def get_todo_tasks(self) -> List[Subtask]: todo_subtasks = [] for subtask in self.subtasks: if ( subtask.status == _PE.STATUS_READY or subtask.status == _PE.STATUS_FAILED ) and subtask.times_tried < self.max_tries: todo_subtasks.append(subtask) return todo_subtasks
[docs] def get_done_tasks(self) -> List[Subtask]: done_subtasks = [] for subtask in self.subtasks: if subtask.status == _PE.STATUS_SUCCESS or ( subtask.times_tried >= self.max_tries and subtask.status not in (_PE.STATUS_RUNNING, _PE.STATUS_READY) ): done_subtasks.append(subtask) return done_subtasks
[docs] def get_running_tasks(self) -> List[Subtask]: running_subtasks = [] for subtask in self.subtasks: if subtask.status == _PE.STATUS_RUNNING: running_subtasks.append(subtask) return running_subtasks
[docs] def get_sublists( self, partitions=None, slice_size=None, get_first_n_lists=None ) -> List[List[Subtask]]: if partitions is None and slice_size is None: raise ValueError("Either specify partitions or slice size.") # only get tasks that are not yet completed or have some tries left subtasks = self.get_todo_tasks() # decide on the chunk size, either by doing partitions or by specifying the slice size directly sublists = [] if partitions is not None: chunk_size = int(math.ceil(len(subtasks) / partitions)) else: chunk_size = slice_size # wrap the tasks in lists as required for i in range(0, len(subtasks), chunk_size): sublist = [] for j in range(i, min(i + chunk_size, len(subtasks))): sublist.append(subtasks[j]) sublists.append(sublist) if get_first_n_lists is not None and len(sublists) > get_first_n_lists: return sublists[:get_first_n_lists] else: return sublists
[docs] def done(self) -> bool: for subtask in self.subtasks: if subtask.status == _PE.STATUS_SUCCESS: continue if subtask.status == _PE.STATUS_READY or ( subtask.status == _PE.STATUS_FAILED and subtask.times_tried < self.max_tries ): return False return True
[docs] def any_failed(self) -> bool: if len( [True for subtask in self.subtasks if subtask.status == _PE.STATUS_FAILED] ): return True return False
[docs] def set_max_tries(self, max_tries: int): self.max_tries = max_tries
def __len__(self) -> int: return len(self.subtasks)
[docs]class Parallelizer(BaseModel): func: Callable def __init__(self, **data): super().__init__(**data)
[docs] def rearrange_input(self, inp_dict: Dict[str, List]) -> List[Dict]: return [dict(zip(inp_dict, ele)) for ele in zip(*inp_dict.values())]
[docs] def execute_parallel(self, **kwargs): # translate the dictionary with the lists of arguments into a list of individual dictionaries # e.g. {'one': [1, 2, 3], 'two': ['aaaa', 'bbb', 'cc'], 'three': [0.2, 0.2, 0.1]} ---> # [{'one': 1, 'two': 'aaaa', 'three': 0.2}, # {'one': 2, 'two': 'bbb', 'three': 0.2}, # {'one': 3, 'two': 'cc', 'three': 0.1}] list_exec = self.rearrange_input(kwargs) # # run in parallel; wait for all subjobs to finish before proceeding processes = [] for subprocess_args in list_exec: p = multiprocessing.Process(target=self.func, kwargs=subprocess_args) processes.append(p) p.start() for p in processes: p.join()