/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
cllimits_validator
/
Upload Filee
HOME
# coding: utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from collections import defaultdict from itertools import chain from typing import Any, Dict, List, Optional, Tuple, Union # NOQA from cllimits import LveCtl from cllimitslib_v2 import DEFAULTS, LimitsDataStorage from clveconfig.ve_config_reader import DEFAULT_PROVIDER from .base import ENTITY_DEFAULTS, ENTITY_PACKAGE, ENTITY_RESELLER, ENTITY_USER, BaseValidator # NOQA from .ep_nproc_validator import EpNprocValidator class LimitsValidator: # This dict contains keys which are old names of entities # and contais values which are new names of entities. # Old names will be replaced by new names for improve readability of error message dict_of_replaced_names_of_entities = { 'user': 'User(s)', 'package': 'Package(s)', 'reseller': 'Reseller(s)', 'defaults': 'Defaults of resellers' } # This dict contains keys which are old ids of entities # and contais values which are new ids of entities. # Old ids will be replaced by ids names for improve readability of error message dict_of_replaced_ids_of_entities = { 'defaults': 'Hoster defaults limits' } # This list contains ids entities which will be deleted # for improve readability of error message list_of_deleted_ids_of_entities = [ DEFAULTS ] # This dict contais keys which are old specific messages and # contains values which are new specific message # Old specific msgs will be replaced by new for improve readability of error message dict_of_replaced_specific_messages = { 'inherited from Global': 'inherited from Hoster' } def __init__(self, _limits_data_storage=None, validators=None): # type: (Optional[LimitsDataStorage], Optional[List[BaseValidator]]) -> None self.limits_provider = _limits_data_storage or LimitsDataStorage() self._list_validators = validators or [ EpNprocValidator(self.limits_provider), ] # type: List[BaseValidator] self.message_dict = { 'common_msg': None, 'specific_msg': None, 'affected_entity': None, 'entity_id': None, } @staticmethod def _format_message_string_from_single_msg_dict(msg_dict): # type: (Dict[str, str, str, str]) -> str """ Format message string from single message dict. That function should be used for formatting message while validation input limits """ common_msg = msg_dict['common_msg'] specific_msg = msg_dict['specific_msg'] affected_entity = msg_dict['affected_entity'] entity_id = msg_dict['entity_id'] if affected_entity is not None and entity_id is not None: if entity_id == DEFAULTS: msg_about_affect = 'Hoster defaults limits is affected' elif affected_entity == ENTITY_DEFAULTS: msg_about_affect = f'Defaults limits of Reseller "{entity_id}" are affected.' elif affected_entity == ENTITY_USER: msg_about_affect = f'Limits of user with LVE ID "{entity_id}" are affected.' elif affected_entity == ENTITY_RESELLER: msg_about_affect = f'Limits of Reseller "{entity_id}" are affected.' else: msg_about_affect = f'Limits of package "{entity_id}" are affected.' else: msg_about_affect = '' if specific_msg is None: specific_msg = '' result_msg = f'{common_msg} {msg_about_affect} {specific_msg}' return result_msg def _call_validators_and_process_result(self, only_input_limits, *args, **kwargs): # type: (bool, *Any, **Any) -> Tuple[bool, List] """ Call each validator from validators list and processing results args and kwargs are argumets for calling of validator We validate limits which already are recorded in ve.cfg if only_input_limits is False. We validate limits which we want to set if only_input_limits is True We return aggregated result as bool value and list of messages which explain which limits are wrong and who holds those """ msg_list = [] total_result = True for validator in self._list_validators: if only_input_limits: result, msg_dict = validator.validate_input_limits(*args, **kwargs) else: result, msg_dict = validator.validate_existing_limits(*args, **kwargs) total_result &= result msg_list.append(msg_dict) return total_result, msg_list def validate_input_limits(self, entity_type, entity_id, input_limits, reseller=DEFAULT_PROVIDER): # type: (str, Union[str, int], Dict, str) -> Tuple[bool, List] check_reseller_defaults = False if entity_type == ENTITY_USER: validated_entity = self.limits_provider.get_user_by_uid(int(entity_id)) elif entity_type == ENTITY_PACKAGE: validated_entity = self.limits_provider.get_package_by_name_and_reseller(str(entity_id), str(reseller)) elif entity_type == ENTITY_RESELLER: validated_entity = self.limits_provider.get_reseller_by_name(str(entity_id)) elif entity_type == ENTITY_DEFAULTS: if entity_id != DEFAULTS: validated_entity = self.limits_provider.get_reseller_by_name(str(entity_id)) else: validated_entity = entity_id check_reseller_defaults = True else: validated_entity = None total_result, msg_list = self._call_validators_and_process_result( True, validated_entity, input_limits, check_reseller_defaults, ) msg_list = [self._format_message_string_from_single_msg_dict(msg_dict) for msg_dict in msg_list] return total_result, msg_list def _validate_users(self): # type: () -> List[Tuple[bool, List[Dict]]] users = self.limits_provider.users.values() result = [self._call_validators_and_process_result(False, user) for user in users] return result def _validate_packages(self): # type: () -> List[Tuple[bool, List[Dict]]] packages = self.limits_provider.packages.values() result = [self._call_validators_and_process_result(False, package) for package in packages] return result def _validate_resellers(self): # type: () -> List[Tuple[bool, List[Dict]]] resellers = self.limits_provider.resellers.values() # We should validate only activated resellers, # because not activated resellers don't have any limits. result = [self._call_validators_and_process_result(False, reseller) for reseller in resellers if reseller.defaults is not None] return result def _validate_defaults(self): # type: () -> List[Tuple[bool, List[Dict]]] result = [] for default_name in self.limits_provider.defaults.keys(): default_entity = self.limits_provider.resellers.get(default_name, DEFAULTS) # We should validate only defaults which belong to activated resellers, # because not activated resellers don't have any limits. if default_entity != DEFAULTS and default_entity.defaults is None: continue result.append(self._call_validators_and_process_result(False, default_entity)) return result def _replace_specific_msgs(self, specific_msg): # type: (str) -> str """ Replacing some specific messages for improve readability of error message """ for old_spec_msg, new_spec_msg in self.dict_of_replaced_specific_messages.items(): if specific_msg is None: break specific_msg = specific_msg.replace(old_spec_msg, new_spec_msg) return specific_msg def _replace_ids_of_entities(self, entity_id): # type: (str) -> str """ Replacing some ids of entities for improve readability of error message """ if entity_id in self.dict_of_replaced_ids_of_entities: return self.dict_of_replaced_ids_of_entities[entity_id] return entity_id def _replace_names_of_entities(self, entity_name): # type: (str) -> str """ Replacing some names of entities for improve readability of error message """ if entity_name in self.dict_of_replaced_names_of_entities: return self.dict_of_replaced_names_of_entities[entity_name] return entity_name def _format_message_string_for_existing_limits(self, total_result_list): # type: (List[Dict[str, str, str, str]]) -> str """ Format message string from result of execution of validation the existing limits """ total_result_dict = defaultdict(lambda: defaultdict(list)) total_msg = None # We are groupping affected entities (entity_id) by name of check (common_msg) # and type of affected entity (affected_entity). for msg_dict in total_result_list: name_of_check = msg_dict['common_msg'] type_of_affected_entity = self._replace_names_of_entities(msg_dict['affected_entity']) entity_id = self._replace_ids_of_entities(msg_dict['entity_id']) # Deleting some ids of entities for improve readability of error message if entity_id in self.list_of_deleted_ids_of_entities: continue total_result_dict[name_of_check][type_of_affected_entity].append({ 'entity_id': entity_id, 'specific_msg': self._replace_specific_msgs(msg_dict['specific_msg']) }) for common_msg, entities_dict in total_result_dict.items(): total_msg = f'{"" if total_msg is None else total_msg}{common_msg}\n' for entity_type, list_entity_dicts in entities_dict.items(): affected_entities = [] # make string as `entity_1 (specific_msg_1), entity_2, entity_3, entity_4 (specific_msg) for entity_dict in list_entity_dicts: if entity_dict['specific_msg'] is None: affected_entity = entity_dict['entity_id'] else: affected_entity = f'{entity_dict["entity_id"]} ({entity_dict["specific_msg"]})' affected_entities.append(affected_entity) affected_entities = ', '.join(str(item) for item in set(affected_entities)) total_msg = f'{total_msg}{entity_type.capitalize()}: {affected_entities}\n' return total_msg def validate_existing_limits(self): # type: () -> Optional[str] total_result_list = [ msg_dict for result, msg_dict_list in chain( self._validate_users(), self._validate_packages(), self._validate_resellers(), self._validate_defaults(), ) for msg_dict in msg_dict_list if not result ] total_msg = self._format_message_string_for_existing_limits(total_result_list) return total_msg @staticmethod def is_low_pmem_limit_present() -> bool: """ Check is Low PMEM limit present :return True/False """ _lvectl = LveCtl() _panel_uids_list = _lvectl.get_panel_users_uid_list() _lvectl._load_info(False) for uid in _panel_uids_list: if uid == 0: # skip defaults continue limits = _lvectl.get_limits_by_user_id(uid) # limits example: # {'cpu': {'all': '100'}, 'io': {'all': '1024'}, 'vmem': '0', 'ep': '20', 'pmem': '*268435456', # 'nproc': '100', 'iops': '*4096'} try: # Extract PMEM limit pmem_limit_bytes = int(limits['pmem'].replace('*', '')) if pmem_limit_bytes != 0 and pmem_limit_bytes < 512 * 1024 * 1024: # PMEM limit not unlimited and < 512 MB - error return True except ValueError: pass return False