/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
Upload Filee
HOME
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import pwd from typing import Dict, Optional # NOQA from sqlalchemy.engine import Engine # NOQA from sqlalchemy.orm import sessionmaker from sqlalchemy.orm.exc import NoResultFound from clcommon import cpapi from clcommon.clproc import LIMIT_LVP_ID from lveapi import NameMap from lvestats.lib.commons.func import deserialize_lve_id, serialize_lve_id from lvestats.orm import user __author__ = 'shaman' __all__ = ('uid_to_username', 'username_to_uid') _name_map_cache = None # type: NameMap _id_reseller_map_cache = None # type: Dict[int, str] def _get_name_map(): # type: () -> NameMap global _name_map_cache if _name_map_cache is None: name_map = NameMap() name_map.link_xml_node() _name_map_cache = name_map return _name_map_cache def uid_to_username_local(uid): # type: (int) -> Optional[str] try: return pwd.getpwuid(uid).pw_name except KeyError: return None def _uid_to_reseller_local(uid): # type: (int) -> Optional[str] reseller = _get_name_map().get_name(uid) if reseller is None: try: reseller = pwd.getpwuid(uid).pw_name except KeyError: reseller = _get_reseller_name_from_panel(uid) return reseller def _get_reseller_name_from_panel(uid): # type: (int) -> Optional[str] global _id_reseller_map_cache if _id_reseller_map_cache is None: try: _id_reseller_map_cache = \ {v: k for k, v in cpapi.get_reseller_id_pairs().items()} except cpapi.NotSupported: _id_reseller_map_cache = {} reseller = _id_reseller_map_cache.get(uid) return reseller def _reseller_to_uid_local(name): # type: (str) -> Optional[int] return _get_name_map().get_id(name) def reseller_to_uid(name): result = _reseller_to_uid_local(name) if result is not None: return serialize_lve_id(LIMIT_LVP_ID, result) def username_to_uid_local(username): # type: (str) -> Optional[int] try: return pwd.getpwnam(username).pw_uid except KeyError: return None def _uid_to_username_db(uid, server_id, db_engine): # type: (int, str, Engine) -> Optional[str] session = sessionmaker(bind=db_engine)() try: user_inst = session.query(user).filter(user.uid == uid, user.server_id == server_id).one() except NoResultFound: return None finally: session.close() return user_inst.user_name def _uid_to_reseller_db(uid, server_id, db_engine): # type: (int, str, Engine) -> Optional[str] # FIXME: store reseller's names in db (centralized database on Plesk/DA) return _uid_to_username_db(uid, server_id, db_engine) def _username_to_uid_db(username, server_id, db_engine): # type: (str, str, Engine) -> Optional[int] session = sessionmaker(bind=db_engine)() try: user_inst = session.query(user).filter(user.user_name == username, user.server_id == server_id).one() except NoResultFound: return None finally: session.close() return user_inst.uid def uid_to_username(uid, local_server_id, server_id, db_engine): # type: (int, str, str, Engine) -> Optional[str] if None in (uid, local_server_id, server_id, db_engine): raise ValueError("All parameters should be specified and not None for uid_to_username()") uid, is_reseller = deserialize_lve_id(uid) uid_to_name_local_func = _uid_to_reseller_local if is_reseller else uid_to_username_local uid_to_name_database_func = _uid_to_reseller_db if is_reseller else _uid_to_username_db if server_id == local_server_id: result = uid_to_name_local_func(uid) if result: return result return uid_to_name_database_func(uid, server_id, db_engine) def username_to_uid(username, local_server_id, server_id, db_engine): # type: (str, str, str, Engine) -> Optional[int] """ Lookups for uid in local user database (/etc/passwd) and global one (table lve_stats2_user). If local_server_id equals to server_id then local user database has priority. """ if None in (username, local_server_id, server_id, db_engine): raise ValueError("All parameters should be specified and not None for username_to_uid()") if server_id == local_server_id: result = username_to_uid_local(username) if result: return result return _username_to_uid_db(username, server_id, db_engine) def convert_id_to_lvp_id(any_id): # type: (int) -> int dummy, is_reseller = deserialize_lve_id(any_id) if is_reseller: return any_id else: return serialize_lve_id(LIMIT_LVP_ID, any_id) def convert_name_to_lvp_id(name): # type: (str) -> int lvp_id = reseller_to_uid(name) if lvp_id: return lvp_id return -1