/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
internal
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains classes for X-Ray internal objects """ import hashlib import logging import os import pwd import subprocess from dataclasses import dataclass, field from datetime import datetime, timedelta from fnmatch import fnmatchcase from typing import Iterator, List, Optional, NamedTuple from urllib.parse import urlparse from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported from xray import gettext as _ from .constants import request_data_storage, api_server, tasks_base_storage from .exceptions import XRayManagerError, XRayManagerExit from .utils import ( timestamp, skeleton_update, safe_move, dbm_storage_update, cagefsctl_get_prefix, _selectorctl_get_version, _is_selector_phpd_location_set, _is_cagefs_enabled, _cagefsctl_remount, set_privileges ) from ..reconfiguration.global_ini import ( is_global_ini_mode, ) logger = logging.getLogger('types') class URL(NamedTuple): """ Representation of an URL """ domain_name: str uri_path: str class ContinuousTask(NamedTuple): """ Representation of a continuous tracing task """ creation_time: int domain: str original_url: str email: str execution_count: int status: int @dataclass class User: """ Representation of a user entity """ name: str uid: int gid: int path: str = field(init=False) def __post_init__(self): self.path = os.path.join(tasks_base_storage, str(self.uid)) def url_split(url: str) -> URL: """ Split URL into domain_name and uripath including query string :param url: URL of format protocol://domain/path;parameters?query#fragment :return: namedtuple URL(domain_name, uripath) """ fragments = urlparse(url) qs = f'?{fragments.query}' if fragments.query else '' uri = f'{fragments.path}{qs}' if fragments.path else '/' _no_www_netloc = fragments.netloc.replace('www.', '') _no_port_netloc = _no_www_netloc.split(':')[0] logger.info('Parsed %s into %s:%s', url, _no_port_netloc, uri) return URL(_no_port_netloc, uri) class DomainInfo: """ Simple container class for storing information about domain. Allows to add whatever attributes. Implements two properties: - version set in CL selector - direct ini path in CageFS """ def __init__(self, **kwargs): self.name = None self.user = None self.panel_php_version = None self.saved_selector_php_version = 'native' self.panel_fpm = None self.is_selector_applied: bool = False self.php_ini_scan_dir: Optional[str] = None for k, v in kwargs.items(): self.__setattr__(k, v) def __repr__(self): return str(self.__dict__) def __str__(self): return f'Domain {self.name}' def add_cagefs_dirpath(self, basename: str) -> Optional[str]: """ Get path to ini location directory in CageFS """ prefix = cagefsctl_get_prefix(self.user) if prefix is None: raise ValueError( _('CageFS prefix resolved as None, but should be a number')) return f'/var/cagefs/{prefix}/{self.user}/etc/cl.php.d/{basename}' @property def selector_php_version(self) -> Optional[str]: """ Get PHP version set in selector :param domain_user: user of domain :return: PHP version in format of 'alt-phpXY' """ if self.saved_selector_php_version == 'native': result = _selectorctl_get_version(self.user) if result is not None: out, err = result not_err_result = 'ERROR' not in err and 'ERROR' not in out if not_err_result and 'native' not in out: short_version = out.split()[0].strip() self.saved_selector_php_version = f"alt-php{''.join(short_version.split('.'))}" else: # selectorctl success, but returned ERROR or native version self.saved_selector_php_version = None else: # selectorctl command failed or just returned non-zero retcode self.saved_selector_php_version = None return self.saved_selector_php_version @property def selector_ini_path(self) -> Optional[str]: """ Resolve direct ini path in CageFS """ return self.add_cagefs_dirpath(self.selector_php_version) @property def phpd_location_ini_path(self) -> Optional[str]: """ Resolve direct ini path for the case of php.d.location = selector. This, if set, affects only alt-php versions with enabled CageFS. Also, consider that FPM is not compatible with selector behaviour """ alt_php_in_cage = 'alt-php' in self.panel_php_version and _is_cagefs_enabled( self.user) if _is_selector_phpd_location_set() and alt_php_in_cage and not self.panel_fpm: return self.add_cagefs_dirpath(self.panel_php_version) class Task: """ Container class for storing task data """ cron_file = '/etc/cron.d/xray-manager' def __init__(self, *, url: str, client_ip: str, tracing_by: str, tracing_count: int, ini_location: str = 'unknown', task_id: str = 'unknown', status: str = 'unknown', starttime: int = -1, initial_count: int = 0, request_count: int = 0, auto_task: bool = False, user: str = None, domain_owner: str = None): self.url = url self.client_ip = client_ip self.tracing_by = tracing_by self.tracing_count = tracing_count self.ini_location = ini_location self.task_id = task_id self.status = status self.starttime = starttime self.request_count = request_count self.initial_count = tracing_count if initial_count == 0 else initial_count self.auto_task = auto_task self.user = user self.domain_owner = domain_owner self.owner_data = None logger.debug('Instantiate Task %s: %s', self.task_id, self.as_dict()) def update_with_local_data(self, *, next_request_id): self.request_count = next_request_id - 1 if self.tracing_by != 'time': self.tracing_count = self.initial_count - self.request_count def __repr__(self): return str(self.__dict__) def __str__(self): return f'Task {self.task_id}' @property def _owner(self) -> User: """""" if self.owner_data is None: if self.domain_owner is not None: user_data = pwd.getpwnam(self.domain_owner) self.owner_data = User(name=self.domain_owner, uid=user_data.pw_uid, gid=user_data.pw_gid) else: raise XRayManagerError( _('Unable to operate with tracing task, because domain owner is not set')) return self.owner_data @property def fake_id(self) -> str: return hashlib.blake2b(self.task_id.encode(), digest_size=10).hexdigest() @property def shared_link(self) -> str: if self.auto_task: return f'https://{api_server}/static/xray/reports/?tracing_task_id={self.task_id}' raise XRayManagerError( _('No shared link could be obtained for manual task')) @property def is_manual(self) -> bool: """If task is a manual one""" return not self.auto_task @property def is_continuous(self) -> bool: """If task is a continuous one""" return self.auto_task and self.user != '*autotracing*' @property def is_autotracing(self) -> bool: """If task is an autotracing one""" return self.auto_task and self.user == '*autotracing*' def as_dict(self) -> dict: """ Represent task as a regular dictionary without task_id field :return: dict """ return {k: v for k, v in self.__dict__.items() if k not in ('task_id', 'owner_data')} def set_domain_owner(self, username: str) -> None: """ Update domain_owner attribute if it is not already set """ if self.domain_owner is None: self.domain_owner = username def generate(self) -> str: """ Generate task for writing into xray.ini file. :return: domain_name:uripath:client_ip:tracing_task_id """ # encode only " chars within URL # since we wrap the value of xray.tasks into them url_data = ':'.join(url_split(self.url)).replace('"', "%22") task_view = ':'.join((url_data, self.client_ip, self.fake_id)) logger.info('Generated task %s', task_view) return task_view def is_path_available(self, p: str) -> bool: """ Check if given path is available in user environment """ if not is_panel_feature_supported(Feature.CAGEFS): # this check is skipped in case if CageFS is not available # since in such a case cagefs_enter_user is available, but throws # 'error while loading shared libraries: liblve.so.0' return True cmd = ['/sbin/cagefs_enter_user', self._owner.name, '/usr/bin/stat', p] try: subprocess.run(cmd, capture_output=True, check=True) except subprocess.CalledProcessError: logger.error('Mount %s missing', p) return False except (OSError, ValueError, subprocess.SubprocessError) as e: logger.error('External command `%s` failed: %s', cmd, str(e)) return False return True def ini_file(self) -> str: """ Full path to xray.ini file :return: xray.ini full path """ return os.path.join(self.ini_location, 'xray.ini') def check_xray_dir(self): if not os.path.isdir(self._owner.path): with set_privileges(target_uid=0, target_gid=0, mask=0o066): os.makedirs(self._owner.path) _cagefsctl_remount(self._owner.name) def tasks_file(self) -> str: """ Full path to xray.tasks file Create location on the fly :return: xray.tasks full path """ self.check_xray_dir() return os.path.join(self._owner.path, 'xray.tasks') def fake_id_real_id_file(self) -> str: self.check_xray_dir() return os.path.join(self._owner.path, self.fake_id) def set_cronjob(self, system_id: str) -> None: """ Set a cron job for stop task by time :param system_id: a unique system ID """ if self.tracing_by == 'time': stop_at = datetime.fromtimestamp(self.starttime) + timedelta( minutes=self.tracing_count + 1) shceduled = f'{stop_at.minute} {stop_at.hour} {stop_at.day} {stop_at.month} *' cmd = f'root /usr/sbin/cloudlinux-xray-manager stop' params = f'--system_id {system_id} --tracing_task_id {self.task_id}' cron_line = f'{shceduled}\t{cmd} {params}\n' individual_cron_file = self.cron_file + f'_{self.task_id}' logger.info('Generating cron file %s with command %s', individual_cron_file, cron_line) with open(individual_cron_file, 'w') as cron: cron.write(cron_line) def drop_cronjob(self) -> None: """ Drop existing cron job file for current task """ job = self.cron_file + f'_{self.task_id}' logger.info('Removing cron job %s', job) try: os.unlink(job) except FileNotFoundError: logger.info('No such job') @staticmethod def read_file(filepath) -> str: """ Read contents of given file Raise XRayError in case if xray.ini does not exists :return: list of lines contained in xray.ini """ if os.path.exists(filepath): with set_privileges(target_path=filepath): with open(filepath) as existing: return existing.read().strip('\n') else: raise XRayManagerError( 'Failed to find %s' % os.path.basename(filepath)) def read_ini(self) -> List[str]: """ Read contents of xray.ini Raise XRayError in case if xray.ini does not exists :return: list of lines contained in xray.ini """ return self.read_file(self.ini_file()).split('\n') @staticmethod def ini_tasks_count(xray_tasks: str) -> int: """ Retrieve the list of existing tracing tasks from given xray.tasks line """ return int(xray_tasks[12:].strip('"\n')) @staticmethod def generate_tasks_line(*args: str) -> str: """ Create an xray.tasks line with given args like: args0,args1,args2,...argsN\n """ return ','.join(args) def _tasks(self) -> List[str]: """ Retrieve the list of existing tracing tasks from xray.tasks file """ try: return self.read_file(self.tasks_file()).split(',') except XRayManagerError: return list() def tasks_parsed(self) -> Iterator[URL]: """ Iterate over URL of each task in xray.tasks file :return: list of URLs """ for task in self._tasks(): info = task.split(':') yield URL(info[0], info[1]) def is_a_duplicate(self): """ Check if task is duplicated by another tasks already present in current xray.ini :return: True if task is a duplicate, False otherwise """ domain, uri = url_split(self.url) def check(t): """ Match domain name and uri separately, but return joined result :param t: task :return: True if both domain and uri matches task, False otherwise """ direct_match = domain == t.domain_name and uri == t.uri_path wildcard_match = fnmatchcase(domain, t.domain_name) and fnmatchcase(uri, t.uri_path) return direct_match or wildcard_match return any( [check(task) for task in self.tasks_parsed()]) @staticmethod def so_path(php_ver: str = None) -> str: """ Ensure full path for versions, which are not among known. In such a case version is expected as two numbers, e.g. 56 or 74 """ if php_ver is None or len(php_ver) > 2: return 'xray.so' return f'/opt/alt/php{php_ver}/usr/lib64/php/modules/xray.so' def update_ini(self, version: str = None, with_decrement: bool = False) -> None: """ Update xray.ini tasks counter """ new_counter = 0 try: existing_contents = self.read_ini() except XRayManagerError: ini_contents = f"""extension={self.so_path(version)} ;xray.tasks=1\n""" new_counter = 1 else: def update(): """ A generator function aimed to update xray.task field """ nonlocal new_counter for line in existing_contents: if 'xray.tasks' in line: current = self.ini_tasks_count(line) new_counter = current - 1 if with_decrement else current + 1 yield f";xray.tasks={new_counter}\n" else: yield line + '\n' ini_contents = ''.join(list(update())) if new_counter <= 0 and not is_global_ini_mode(): # this indicates that there are no tasks left, # thus the whole xray.ini should be deleted self.unified_erase(self.ini_file()) else: # in case when global ini mode is enabled # or counter still reports some tasks # keep the file and update counter self.unified_write(self.ini_file(), ini_contents, target_path=self.ini_location) def add_task(self) -> None: """ Add a task into xray.tasks """ _file = self.tasks_file() if not self.is_path_available(self._owner.path): # check to see if mount is missing (generally cagefs related check) _cagefsctl_remount(self._owner.name) raise XRayManagerError( _('Creation of tracing task failed due to filesystem problem. Please, try to recreate the task. If the problem persists, contact support for resolution')) tasks_contents = self.generate_tasks_line(*self._tasks(), self.generate()) self.unified_write(_file, tasks_contents, target_uid=0, target_gid=self._owner.gid, mask=0o137) self.generate_fake_real_id_file() def remove_task(self) -> None: """ Remove a task from xray.tasks """ if os.path.exists(self.fake_id_real_id_file()): self.unified_erase(self.fake_id_real_id_file()) tasks_contents = self.generate_tasks_line( *[task for task in self._tasks() if self.generate() != task]) if not tasks_contents.strip(): # this indicates that there are no tasks left, # thus the xray.tasks file should be deleted self.unified_erase(self.tasks_file()) else: self.unified_write(self.tasks_file(), tasks_contents, target_uid=0, target_gid=self._owner.gid, mask=0o137) def generate_fake_real_id_file(self): """ /usr/share/alt-php-xray-tasks/1004/<fake_id> used by x-ray extension to obtain real task id """ self.unified_write(self.fake_id_real_id_file(), self.task_id, target_uid=0, target_gid=self._owner.gid, mask=0o137) @staticmethod def unified_write(filepath: str, contents: str, target_uid: int = None, target_gid: int = None, target_path='.', mask: int = None) -> None: """ Unified writing files method. Includes writing into temporary file and then moving temporary to original path """ working_path = filepath + '.tmp' logger.info('Writing %s with contents %s', working_path, contents) with set_privileges(target_uid, target_gid, target_path, mask): try: with open(working_path, 'w') as _file: _file.write(contents) except OSError as e: logger.error('Failed to generate %s %s', os.path.basename(filepath), str(e), extra={'reason': str(e)}) base_path = os.path.basename(filepath) raise XRayManagerExit( _('Failed to generate {}. {}'.format(base_path, str(e.strerror)))) from e else: safe_move(working_path, filepath) @staticmethod def unified_erase(filepath: str) -> None: """ Unified erase method. Unlinks target file """ logger.info('Remove %s: no tasks left', os.path.basename(filepath)) with set_privileges(target_path=filepath): try: os.unlink(filepath) except OSError as e: logger.warning('Failed to unlink %s file', filepath, extra={'file': filepath, 'err': str(e)}) def create_ini_location(self) -> None: """ Create ini location dir if it does not exist. Due to troubles with additional ini scan dir on DirectAdmin we are to create ini_location path before generating xray.ini """ if not os.path.exists(self.ini_location): os.mkdir(self.ini_location) @skeleton_update @dbm_storage_update def add(self, php_version: str = None) -> None: """ Add task with primary check for duplicates Adding task includes modifying two files: - xray.ini file with xray extension and task count - xray.tasks file per each user with list of tasks """ if self.is_a_duplicate(): raise XRayManagerError(_('Task is duplicated by URL'), flag='warning') self.create_ini_location() self.add_task() self.update_ini(version=php_version) self.starttime = timestamp() logger.info('Starttime set %s', self.starttime) @skeleton_update @dbm_storage_update def remove(self) -> None: """ If task is present in xray.tasks file, remove task from xray.tasks file and decrement tasks counter in ini. """ if self.generate() not in self._tasks(): return self.remove_task() self.update_ini(with_decrement=True) def recalculate_counts(self) -> int: """ Recalculates remaining number of time|request_qty counts :return: remaining number of counts """ if self.tracing_by == 'time': current = timestamp() if current < self.starttime: logger.error( 'Failed to recalculate time count: current timestamp appears to be in the past', extra={'currenttime': current, 'starttime': self.starttime}) raise XRayManagerError( _('Failed to recalculate time count: current timestamp appears to be in the past relatively to task start time')) remaining = (current - self.starttime) // 60 recalculated = self.tracing_count - remaining logger.info('Remaining time recalculated: %s left', recalculated) return recalculated elif self.tracing_by == 'request_qty': return self.initial_count - self.request_count raise XRayManagerError(_('Unknown tracing marker: %s') % self.tracing_by) def erase_request_id_storage(self) -> None: """ Unlink request ID file (by fake ID of tracing_task) """ req_id_file = os.path.join(request_data_storage, self.fake_id) logger.info('Erasing storage %s', req_id_file) try: os.unlink(req_id_file) except OSError as e: logger.warning('Failed to unlink request_id file', extra={'file': req_id_file, 'err': str(e)})