/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
internal
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import dataclasses import subprocess from contextlib import contextmanager from pathlib import Path import requests import urllib3 from clcommon.clpwd import drop_privileges from clcommon.cpapi import docroot from requests.exceptions import ChunkedEncodingError from secureio import disable_quota from urllib3.exceptions import ReadTimeoutError from xray import gettext as _ from xray.internal import utils, exceptions # long timeout is set because our tested # sites may be really slow TIMEOUT: int = 10 HEADERS = { 'User-Agent': 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13) ' 'Gecko/20101209 CentOS/3.6-2.el5.centos Firefox/3.6.13' } class WebsiteNotResponding(exceptions.XRayManagerError): def __init__(self, url, details): self.url = url self.details = details @utils.retry_on_exceptions(3, [ChunkedEncodingError]) def _request_url(url): """ retry on: - ChunkedEncodingError -> sometimes error happens due to network issues/glitch """ try: response = requests.get(url, timeout=TIMEOUT, verify=False, headers=HEADERS) response.raise_for_status() except ConnectionError as e: # really strange behavior of requests that wrap # errors inside of ConnectionError if e.args and isinstance(e.args[0], ReadTimeoutError): raise raise WebsiteNotResponding(url, details=str(e)) except requests.RequestException as e: raise exceptions.XRayManagerError( _("Unable to detect php version for website " "because it is not accessible. " "Try again and contact an administrator if the issue persists. " "Original error: %s. ") % str(e)) return response @contextmanager def _temporary_phpinfo_file(username: str, document_root: Path): php_file_contents = """ <?php $php_ini_scan_dir = getenv("PHP_INI_SCAN_DIR"); if(!empty($php_ini_scan_dir)) { // get first non-empty path $php_ini_scan_dir = array_values(array_filter(explode(":", $php_ini_scan_dir)))[0]; } echo "phpversion=" . phpversion() . "\n"; echo "ini_scan_dir=" . ($php_ini_scan_dir ? $php_ini_scan_dir: PHP_CONFIG_FILE_SCAN_DIR) . "\n"; echo "php_sapi_name=". php_sapi_name() . "\n"; echo "include_path=" . get_include_path() . "\n"; """ php_file_name = 'xray_info.php' php_file_path = document_root / php_file_name with drop_privileges(username), disable_quota(): php_file_path.write_text(php_file_contents) try: yield php_file_name finally: php_file_path.unlink() @dataclasses.dataclass class PhpConfiguration: # 'user' username: str # '8.3.30' phpversion: str # '/etc/php.d/' ini_scan_dir: str # 'cgi-fcgi' php_sapi_name: str # '.:/opt/alt/php80/usr/share/pear' include_path: str @property def short_php_version(self) -> str: return ''.join(self.phpversion.split('.')[:2]) def get_full_php_version(self, default_prefix: str): if '/opt/alt' in self.include_path: return f"alt-php{self.short_php_version}" return f"{default_prefix}{self.short_php_version}" @property def absolute_ini_scan_dir(self): # the only directory that we expect to be changed in cagefs # is our conf link which is managed by selectorctl if 'link/conf' in self.ini_scan_dir: return _resolve_ini_path_in_cagefs(self.username, self.ini_scan_dir) return self.ini_scan_dir @property def is_php_fpm(self): return self.php_sapi_name == 'fpm-fcgi' def _parse_configuration(username: str, response: str) -> PhpConfiguration: config = {} for line in response.split('\n'): if not line.strip(): continue key, value = line.split('=') config[key] = value.strip() return PhpConfiguration(username=username, **config) def _resolve_ini_path_in_cagefs(username: str, path: str): """ ini path inside cagefs can be a symlink and as cagefs has different namespace for each user, the only way to know that for sure is to dive into cage and resolve path there """ cmd = ['/sbin/cagefs_enter_user', username, '/usr/bin/realpath', path] try: resolved_path = subprocess.check_output( cmd, text=True, stderr=subprocess.DEVNULL).strip() except subprocess.CalledProcessError: return None if resolved_path.startswith('/etc/cl.php.d/'): prefix = utils.cagefsctl_get_prefix(username) if prefix is None: raise ValueError( _('CageFS prefix resolved as None, but should be a number')) return f'/var/cagefs/{prefix}/{username}{resolved_path}' return resolved_path def get_php_configuration(username: str, domain: str) -> PhpConfiguration: """ Writes temporary phpinfo-like file to document root and executes request to website to retrieve the current php version and configuration """ # if certificate is bad, but the site itself works, # we consider it ok urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) with _temporary_phpinfo_file(username, Path(docroot(domain)[0])) as php_info_file: domain_phpinfo_file_path = domain + '/' + php_info_file try: http_url = 'http://' + domain_phpinfo_file_path response = _request_url(http_url) except WebsiteNotResponding: # Some websites did not enable HTTP to HTTPS redirection. # Try connecting with HTTPS protocol. https_url = 'https://' + domain_phpinfo_file_path response = _request_url(https_url) # you may think that we can use json, but we can't because it;s # optional php module on older php versions configuration = _parse_configuration(username, response.text) return configuration