/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwpos
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # helper functions for clwpos utility import datetime import logging import os import shutil import json import pwd import subprocess from clcommon.cpapi import cpusers from clwpos import gettext as _ from clwpos.stats import _get_wpos_statistics_total_count from clwpos.cl_wpos_exceptions import WposError from clwpos.constants import ( USER_WPOS_DIR, PUBLIC_OPTIONS, AWP_BACKUP_DIR, CLWPOS_UIDS_PATH, CLWPOS_ADMIN_DIR, USERS_CONFIGS_TO_BACKUP, PUBLIC_OPTIONS_FILE_NAME, CLWPOS_UIDS_DIR_NAME, ALLOWED_SUITES_JSON, ) from clwpos.feature_suites import ALL_SUITES, AWPSuite def collect_user_configs(user=None) -> dict: """ Collects user configs inside HOME/.clwpos """ target_users = [user] if user else list(cpusers()) user_folders = {} for user in target_users: try: pw = pwd.getpwnam(user) except KeyError: logging.warning('Cannot collect configs folder for user: %s', user) continue full_user_dir = os.path.join(pw.pw_dir, USER_WPOS_DIR) user_folders[user] = [os.path.join(full_user_dir, config) for config in USERS_CONFIGS_TO_BACKUP] return user_folders def backup_single_user_conf(username, configs, backups_dir): """ Copies configs for single user """ for path in configs: if not os.path.exists(path): continue user_backup_dir = os.path.join(backups_dir, username, os.path.basename(os.path.dirname(path))) backup_path = os.path.join(user_backup_dir, os.path.basename(path)) if not os.path.exists(user_backup_dir): os.makedirs(user_backup_dir, mode=0o700, exist_ok=True) shutil.copy2(path, backup_path) def backup_accelerate_wp(): """ Backups main AccelerateWP configs to /var/clwpos/DATE-TIME/.backup/%Y_%m_%d_%H_%M_%S. Directory is only root-editable, so no user permissions drop is needed. """ files_to_backup = [ PUBLIC_OPTIONS ] folders_to_backup = [ CLWPOS_UIDS_PATH, CLWPOS_ADMIN_DIR ] user_configs = collect_user_configs() current_backup_folder = os.path.join( AWP_BACKUP_DIR, datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') ) if not os.path.exists(AWP_BACKUP_DIR): os.mkdir(AWP_BACKUP_DIR, mode=0o700) os.mkdir(current_backup_folder, mode=0o700) for path in files_to_backup: if not os.path.exists(path): continue shutil.copy2(path, current_backup_folder) for path in folders_to_backup: if not os.path.exists(path): continue shutil.copytree(path, os.path.join(current_backup_folder, os.path.basename(path))) user_backups = os.path.join(current_backup_folder, 'users') os.mkdir(user_backups, mode=0o700) for username, paths in user_configs.items(): try: backup_single_user_conf(username, paths, user_backups) except Exception: logging.exception('Cannot backup configs for user %s', username) continue def restore_accelerate_wp_public_options_backup(): """ Restore AccelerateWP suite states from latest backup (server-wide) """ visible_suites, allowed_suites, upgrade_url = _get_backup_of_public_options() disallowed_suites = [] for suite_name, suite in ALL_SUITES.items(): if suite_name != AWPSuite.name and suite_name not in visible_suites and suite_name not in allowed_suites: suite_used = 0 suite_used += _get_wpos_statistics_total_count(list(suite.features), 'allowed', allowed_by_modules=True) suite_used += _get_wpos_statistics_total_count(list(suite.features), 'visible') if suite_used == 0: disallowed_suites.append(suite_name) if disallowed_suites: subprocess.Popen([ '/usr/bin/cloudlinux-awp-admin', 'set-suite', '--disallowed-for-all', '--suites', ','.join(disallowed_suites)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if visible_suites: subprocess.Popen([ '/usr/bin/cloudlinux-awp-admin', 'set-suite', '--visible-for-all', '--suites', ','.join(visible_suites)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if allowed_suites: subprocess.Popen([ '/usr/bin/cloudlinux-awp-admin', 'set-suite', '--allowed-for-all', '--suites', ','.join(allowed_suites)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if upgrade_url: subprocess.Popen([ '/usr/bin/cloudlinux-awp-admin', 'set-options', '--upgrade-url', upgrade_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) def _get_backup_of_public_options(): allowed_suites = [] visible_suites = [] upgrade_url = '' first_backup = _get_first_backup_folder() if first_backup: public_options_backup_file = os.path.join(first_backup, PUBLIC_OPTIONS_FILE_NAME) if os.path.exists(public_options_backup_file): with open(public_options_backup_file) as f: try: public_options = json.load(f) allowed_suites = public_options.get('allowed_suites', []) visible_suites = public_options.get('visible_suites', []) upgrade_url = public_options.get('upgrade_url', '') except json.decoder.JSONDecodeError as err: raise WposError( message=_("Backup file is corrupted: %(config_file)s" " or fix the line provided in details"), details=str(err), context={'config_file': public_options_backup_file}) return visible_suites, allowed_suites, upgrade_url def get_backup_folders(): if os.path.exists(AWP_BACKUP_DIR): return [entry for entry in os.scandir(AWP_BACKUP_DIR) if entry.is_dir() and not entry.name.startswith('.')] def _get_first_backup_folder(): folders = get_backup_folders() backups = None if folders: backups = sorted(folders, key=lambda entry: entry.name, reverse=False) if backups: return backups[0] return None def make_accelerate_wp_backups_deprecated(): for folder in os.listdir(AWP_BACKUP_DIR): if folder.startswith('.'): continue folder_path = os.path.join(AWP_BACKUP_DIR, folder) folder_new = os.path.join(AWP_BACKUP_DIR, f'.{folder}') if os.path.isdir(folder_path): try: os.rename(folder_path, folder_new) except OSError as e: logging.exception('Unable to deprecate backup dir %s. Error: %s', folder_path, e) def _get_backup_of_users_suites(): suites = {} grouped_by_suite_usernames = {} first_backup = _get_first_backup_folder() if not first_backup: logging.exception("Can't restore WHMCS backup. There is no backup data.") return {} users_uids_dir = os.path.join(first_backup, CLWPOS_UIDS_DIR_NAME) with os.scandir(users_uids_dir) as entries: for uid_folder in entries: uid_config_path = os.path.join(uid_folder, ALLOWED_SUITES_JSON) try: username = pwd.getpwuid(int(uid_folder.name))[0] except (KeyError, TypeError): continue if not os.path.exists(uid_config_path): continue with open(uid_config_path) as f: try: uid_config = json.load(f) suites[username] = uid_config.get('suites', {}) except json.decoder.JSONDecodeError as e: logging.exception("Backup file is corrupted: %s" " or fix the line provided in details" 'Stdout is %s. Stderr is %s', uid_config_path, e.stdout, e.stderr) grouped_by_suite_usernames = _group_users_by_suites(suites) return grouped_by_suite_usernames def _group_users_by_suites(uids_suites: dict): """ Groups uids by suite and status to change suite status for multiple uids by one run. Converts per uid dict from uids_suites = { "user1": { "accelerate_wp": "allowed", "accelerate_wp_premium": "visible", "accelerate_wp_cdn": "default", "accelerate_wp_cdn_pro": "allowed" } } to { "accelerate_wp": { "allowed": [user1,user2,user3], "visible": [user1,user2,user3], "default": [], } } """ grouped_by_suite_uids = {} for uid, suites in uids_suites.items(): for suite, status in suites.items(): if suite not in grouped_by_suite_uids: grouped_by_suite_uids[suite] = {} if status not in grouped_by_suite_uids[suite]: grouped_by_suite_uids[suite] = {status: []} grouped_by_suite_uids[suite][status].append(uid) return grouped_by_suite_uids