/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcommon
/
cpapi
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ CloudLinux control panel API API for Hosting control panel """ import os from functools import lru_cache from .const import CACHE_CPNAME, UNKNOWN_CP_NAME from clcommon.features import ( Feature, get_cl_feature_status_map, get_hosting_accounts_limit ) from .pluginlib import ( get_cp_plugin_module, detect_panel_fast, CPANEL_NAME, DIRECTADMIN_NAME, PLESK_NAME, ISPMANAGER_NAME, INTERWORX_NAME, INTEGRATED_NAME, OFFICIAL_PLUGINS ) # NOQA needed for local disable pyflakes: F401 / 'NotSupported' warning during prospector checks from .cpapiexceptions import NotSupported, NoPanelUser # NOQA from clcommon.lib.cledition import ( is_cl_solo_edition, CLEditionDetectionError, is_container ) # If Hoster has own third-party plugins configured we can't bypass # our cache because we need to copy them to cache dir to be able to # import them and use to detect ControlPanel they provides. # So we do fast detection of our "official" plugins because they used in # most (all?) cases, and if we can't detect any CP then just fallback to # logic with cache _CP_NAME, _CP_PLUGIN_NAME = detect_panel_fast() def is_cl_supported_panel(): """ Is current panel has native CL support :return: True/False """ return _CP_NAME in [ CPANEL_NAME, DIRECTADMIN_NAME, PLESK_NAME, ISPMANAGER_NAME, INTERWORX_NAME ] def admin_packages(raise_exc=False): """ Return list of available admin's packages :param raise_exc: hack for lvemanager :rtype: list :return: List of packages. For example ['BusinessPackage', 'Package2'] """ return _panel_class.admin_packages(raise_exc) def resellers_packages(raise_exc=False): """ Return dictionary, contains available resellers packages, grouped by resellers :rtype: dict :return: Dictionary. Example: {'res1': ['BusinessPackage', 'UltraPackage', 'Package'], 'res2': ['SimplePackage', 'Package'] } """ return _panel_class.resellers_packages() def getCPName(): """ Return panel name :rtype: str :return: Panel name """ # if we are root, get panel name from script if os.getegid() == 0: return _panel_class.getCPName() # if we are user: # - for integrated control panel return real # name from scripts (they should work in cagefs) if OFFICIAL_PLUGINS[INTEGRATED_NAME] == _CP_PLUGIN_NAME: return _panel_class.getCPName() # - check _CP_NAME, probably detect_panel_fast was # able to detect control panel elif _CP_NAME is not None: return _CP_NAME # - last chance - read panel name from cache file controlpanelname = UNKNOWN_CP_NAME if os.path.isfile(CACHE_CPNAME): with open(CACHE_CPNAME, encoding='utf-8') as cache_stream: controlpanelname = cache_stream.readline().strip() return controlpanelname def get_cp_description(): """ Retrieve panel name and it's version :rtype: dict :return: dict: { 'name': 'panel_name', 'version': 'panel_version', 'additional_info': 'add_info'} or None if can't get info """ return _panel_class.get_cp_description() def cpusers(): """ Generates a list of cpusers registered in the control panel :return: list of cpusers registered in the control panel :rtype: tuple """ if is_cl_supported_panel(): # For CL supported panels we use already present plugin functions return _panel_class.cpusers() users_info = _panel_class.cpinfo(keyls=('cplogin', )) _list_users = [user_[0] for user_ in users_info] return tuple(_list_users) def resellers(): """ Generates a list of resellers in the control panel :return: list of cpusers registered in the control panel :rtype: tuple """ return _panel_class.resellers() def admins(): """ Generates a set of admins in the control panel :return: set of cpusers registered in the control panel :rtype: set :raise: NotSupported """ return _panel_class.admins() def db_access(): """ Getting root access to mysql database. For example {'login': 'root', 'db': 'mysql', 'host': 'localhost', 'pass': '9pJUv38sAqqW'} :return: root access to mysql database :rtype: dict :raises: NoDBAccessData """ return _panel_class.db_access() def dblogin_cplogin_pairs(cplogin_lst=None, with_system_users=False): """ Returs a list of pairs, the database user login - user login control panel For example: (('nata2_someuse', 'nata2'), ('testsome_dfrtbus', 'testsome')) :param list|tuple|None cplogin_lst: list of control panel users :param bool with_system_users: add system users to dbmapping :return: list of pairs, the database user login - user login control panel :rtype: tuple :raises: NotSupported, NoPackage """ return _panel_class.dblogin_cplogin_pairs(cplogin_lst, with_system_users) def cpinfo(cpuser=None, keyls=('cplogin', 'package', 'mail', 'reseller', 'dns'), search_sys_users=True): """ Retrieves info about panel user(s) :param str|unicode|list|tuple|None cpuser: user login :param keyls: list of data which is necessary to obtain the user, the valuescan be: cplogin - name/login user control panel mail - Email users reseller - name reseller/owner users locale - localization of the user account package - User name of the package dns - domain of the user :param bool search_sys_users: search for cpuser in sys_users or in control panel users (e.g. for Plesk) :return: returns a tuple of tuples of data in the same sequence as specified keys in keylst :rtype: tuple """ return _panel_class.cpinfo(cpuser, keyls, search_sys_users=search_sys_users) def get_admin_email(): """ Gets admin email Note: be careful when modifying this method. It is used in SSA, ask @dkavchuk or someone else from C-Projects team for details :rtype: string :return: string - admin email """ if is_cl_supported_panel(): return _panel_class.get_admin_email() else: try: return _panel_class.get_admin_emails_list()[0] except IndexError: return '' def get_admin_locale(): DEFAULT_LOCALE = 'en' if is_cl_supported_panel(): try: return _panel_class.get_admin_locale() except NotSupported: return DEFAULT_LOCALE return DEFAULT_LOCALE def docroot(domain): """ Return document root for domain :param str|unicode domain: :rtype tuple :return Tuple: (document_root, owner) """ return _panel_class.docroot(domain) def useraliases(cpuser, domain): """ Return domain and document root pairs for control panel user first domain is main domain :param str|unicode cpuser: user login :rtype: list :return list of tuples (domain_name, documen_root) """ return _panel_class.useraliases(cpuser, domain) def userdomains(cpuser): """ Return domain and document root pairs for control panel user first domain is main domain :param str|unicode cpuser: user login :rtype: list :return list of tuples (domain_name, documen_root) """ return _panel_class.userdomains(cpuser) def homedirs(): """ Detects and returns list of folders contained the home dirs of users of the cPanel :rtype: list :return: list of folders, which are parent of home dirs of users of the panel """ return _panel_class.homedirs() def domain_owner(domain): """ Return domain's owner :param domain: Domain/sub-domain/add-domain name :rtype: str :return: user name or None if domain not found """ if is_cl_supported_panel(): return _panel_class.domain_owner(domain) _, _domain_owner = docroot(domain) return _domain_owner def reseller_users(resellername): """ Return reseller users :param resellername: reseller name; return empty list if None :rtype list :return list[str]: user names list """ return _panel_class.reseller_users(resellername) def reseller_domains(resellername=None): """ Return reseller users and their main domains :param resellername: reseller name; autodetect name if None :rtype disct :return dict[str, str]: pairs user <==> domain """ return _panel_class.reseller_domains(resellername) def is_reseller(username): """ Check if user is reseller; :type username: str :rtype: bool """ return _panel_class.is_reseller(username) def get_user_login_url(domain): """ Get login url for current panel; :type domain: str :rtype: str """ return _panel_class.get_user_login_url(domain) def is_admin(username): """ Return True if username is in admin names :param str username: user to check :rtype bool :return: True if user is admin """ return _panel_class.is_admin(username) def is_reseller_limits_supported(): """ Return True if panel supports reseller limits (LVE10+) :return: bool - True - yes, False - no :rtype bool """ panel_features = get_cl_feature_status_map() try: return panel_features[Feature.RESELLER_LIMITS] except KeyError: return False # The method is implemented only for Plesk, # because the resellers on Plesk can be without the system users. # For other panels the logic of method distributed by the code of lve-utils package def get_reseller_id_pairs(): """ Get dict reseller => id Optional method for panels without hard link reseller <=> system user :rtype: dict[str,int] - {'res1': id1} """ return _panel_class.get_reseller_id_pairs() # TODO: PTCCLIB-158 # The method should be private and moved to DA plugin def get_encoding_name(): """ Retrive encoding name, used for package/reseller names, from panel :return: """ return _panel_class.get_encoding_name() def is_panel_feature_supported(feature: Feature) -> bool: """ Get if feature is supported by control panel. :param feature: constant from Feature class :return: boolean """ try: features = get_supported_cl_features() return features is None or features.get(feature, False) except CLEditionDetectionError: return False def get_apache_connections_number(): """ Retrieves Apache's connections number (from mod_status) For CM """ return _panel_class.get_apache_connections_number() def get_apache_ports_list(): """ Retrieves active httpd's ports from httpd's config :return: list of apache's ports """ return _panel_class.get_apache_ports_list() def get_apache_max_request_workers(): """ Get current maximum request apache workers from httpd's config :return: tuple (max_req_num, message) max_req_num - Maximum request apache workers number or 0 if error message - OK/Trace """ return _panel_class.get_apache_max_request_workers() def get_main_username_by_uid(uid: int) -> str: """ Get "main" panel username by uid. :param uid: uid :return Username """ return _panel_class.get_main_username_by_uid(uid) def get_user_emails_list(username, domain): return _panel_class.get_user_emails_list(username, domain) def panel_login_link(username): return _panel_class.panel_login_link(username) def panel_awp_link(username): return _panel_class.panel_awp_link(username) def get_unsupported_features(): """ Returns features that are not supported on current control panel. """ return _panel_class.get_unsupported_cl_features() or {} # Global caching need to make this method work fast, # especially when we call it on vendors control panel. @lru_cache() def get_supported_cl_features(): """ Left here mostly for backwards compatibility. See get_cl_feature_status_map docstring. """ return get_cl_feature_status_map() def is_throttling_supported(): if is_cl_solo_edition(skip_jwt_check=True) or is_container(): return False return True def get_hosting_accounts_count() -> int: """ Get panel users count, admin accounts are not counted :return: Users count. """ return _panel_class.get_hosting_accounts_count() def is_hitting_max_accounts_limit(): """ Check if customer hitting max accounts count per edition """ max_accounts_limit = get_hosting_accounts_limit() if not max_accounts_limit: return False current_accounts_count = _panel_class.get_hosting_accounts_count() return current_accounts_count > max_accounts_limit def get_max_accounts_limit(): # warn: deprecated, use features.get_hosting_accounts_limit directly return get_hosting_accounts_limit() def count_hosting_accounts(): """ Retrieve users count and admins count, substract the last value from the first one and dump the result on disk """ try: users_list = cpusers() except NotSupported as e: raise NotSupported("cpusers() method not supported") from e # admins returns set or list depending on the control panel in use admins_collection = admins() return sum(1 for user in users_list if user not in admins_collection) def is_wp2_environment() -> bool: """ They have symlink /usr/local/cpanel/server.type -> wp2 """ wp2_marker = '/usr/local/cpanel/server.type' if not os.path.islink(wp2_marker): return False try: return os.readlink('/usr/local/cpanel/server.type') == 'wp2' except Exception: return False # Loading api _apiplugin = get_cp_plugin_module(_CP_PLUGIN_NAME) _panel_class = _apiplugin.PanelPlugin() CP_NAME = getCPName() # TODO: make a proxy that will do this stuff automatically get_domains_php_info = _panel_class.get_domains_php_info get_system_php_info = _panel_class.get_system_php_info list_all = _panel_class.list_all list_users = _panel_class.list_users get_reseller_users = _panel_class.get_reseller_users reseller_package_by_uid = _panel_class.reseller_package_by_uid get_uids_list_by_package = _panel_class.get_uids_list_by_package get_installed_php_versions = _panel_class.get_installed_php_versions get_customer_login = _panel_class.get_customer_login get_domain_login = _panel_class.get_domain_login get_server_ip = _panel_class.get_server_ip invalidate_cpapi_cache = _panel_class.invalidate_cpapi_cache suspended_users_list = _panel_class.suspended_users_list