Source code for icolos.utils.execute_external.license_token_guard

import time
from typing import Dict

from pydantic import BaseModel, PrivateAttr

from icolos.utils.execute_external.execute import Executor

from icolos.loggers.steplogger import StepLogger

from icolos.utils.enums.logging_enums import LoggingConfigEnum
from icolos.utils.enums.program_parameters import SchrodingerExecutablesEnum

_EE = SchrodingerExecutablesEnum()
_LE = LoggingConfigEnum()


[docs]class TokenGuardParameters(BaseModel): prefix_execution: str = None binary_location: str = None token_pools: Dict wait_interval_seconds: int = 30 wait_limit_seconds: int = 0
[docs]class SchrodingerLicenseTokenGuard(BaseModel): """Class that checks, whether enough tokens to execute Schrodinger binaries are available.""" token_guard: TokenGuardParameters
[docs] class Config: underscore_attrs_are_private = True
_logger = PrivateAttr() _executor = PrivateAttr() def __init__(self, **data): super().__init__(**data) self._logger = StepLogger() # initialize the executor for all "Schrodinger" related calls and also check if it is available self._executor = Executor( prefix_execution=self.token_guard.prefix_execution, binary_location=self.token_guard.binary_location, ) def _get_token_pool_info(self, licadmin_output: list, token_pool: str) -> dict: result = {"found": False} for line in licadmin_output: if token_pool in line: parts = line.split(" ") if len(parts) == 16: result["total"] = int(parts[6]) result["available"] = int(parts[6]) - int(parts[12]) result["found"] = True break return result def _check_licstat_output(self, licadmin_output: list) -> bool: all_pools_available = True for pool_key, pool_token_numbers in self.token_guard.token_pools.items(): pool_status = self._get_token_pool_info(licadmin_output, pool_key) if pool_status["found"]: if pool_status["available"] >= pool_token_numbers: self._logger.log( f"Enough tokens available ({pool_status['available']}) to satisfy requirement ({pool_token_numbers} free tokens) for pool {pool_key}.", _LE.DEBUG, ) else: self._logger.log( f"Not enough tokens available ({pool_status['available']}) to satisfy requirement ({pool_token_numbers} free tokens) for pool {pool_key}.", _LE.DEBUG, ) all_pools_available = False else: all_pools_available = False self._logger.log( f"Could not find information on token pool {pool_key}.", _LE.WARNING ) return all_pools_available def _get_licstat_output(self): result = self._executor.execute( command=_EE.LICADMIN, arguments=[_EE.LICADMIN_STAT], check=True ) if result.returncode != 0: self._logger.log( f"Could not execute the Schrodinger license token guard - do you need to export the licadmin path?", _LE.WARNING, ) return result.stdout.split("\n")
[docs] def guard(self) -> bool: # loop over the token pools until they are all satisfied or the time limit has run out counter = 0 success = False while True: if ( self.token_guard.wait_limit_seconds != 0 and (counter * self.token_guard.wait_interval_seconds) >= self.token_guard.wait_limit_seconds ): self._logger.log( f"Wait period ({self.token_guard.wait_limit_seconds} seconds) set for Schrodinger token guard has been exceeded.", _LE.ERROR, ) break # reload the output from "licadmin" # at this stage, the output from licadmin is a list of strings licadmin_output = self._get_licstat_output() all_pools_available = self._check_licstat_output( licadmin_output=licadmin_output ) if all_pools_available: self._logger.log( "All token pool requirements for Schrodinger have been met - proceeding.", _LE.DEBUG, ) success = True break else: time.sleep(self.token_guard.wait_interval_seconds) counter = counter + 1 return success