Source code for icolos.core.workflow_steps.fpocket.mdpocket

from icolos.utils.general.parallelization import Parallelizer, SubtaskContainer
from pydantic import BaseModel
from icolos.core.workflow_steps.fpocket.base import StepFpocketBase
from icolos.utils.enums.step_enums import StepCavExploreEnum
from icolos.utils.execute_external.execute import Executor
from icolos.core.workflow_steps.step import _LE
from sklearn.cluster import DBSCAN
from collections import Counter
import numpy as np
import re
import os

_SFP = StepCavExploreEnum()


[docs]class StepMDpocket(StepFpocketBase, BaseModel): def __init__(self, **data): super().__init__(**data) # self._initialize_backend(executor=MPIExecutor) self._initialize_backend(executor=Executor) # set max_length_sublists to 1 self.execution.parallelization.max_length_sublists = 1 def _create_density_grid_file(self, tmp_dir: str, iso_value: float): """creates a density grid from the .dx-file into a .pdb-file, heavily influenced by extractISOPdb.py provided by fpocket""" pass def _cluster_pockets(self, tmp_dir, eps, min_samples, threshold, iso_value): """ Clusters points from the initial MDpocket density grid, at a certain iso value """ pass def _save_pocket_files(self, tmp_dir, data, labels): """saves the individual pockets as individual pdbs to be used with mdpocket""" pass def _run_mdpocket_selected_pocket(self, tmp_dir): """runs the second mdpocket command for fpocket3""" pass def _prepare_batch_inputs(self, batch, tmp_dir): tmp_dirs = [] args = [] for next_subtask_list in batch: tmp_dirs.append(tmp_dir) for ( subtask ) in ( next_subtask_list ): # enforced only one task per subtask, otherwise it makes no sense args.append(subtask.data) # append the arguments list return tmp_dirs, args def _execute_mdpocket(self, tmp_dir, arguments): self._backend_executor.execute( command=_SFP.MDPOCKET_COMMAND, arguments=arguments, location=tmp_dir, check=True, )
[docs] def execute(self): pass