/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
console_utils
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains functions for performing necessary validations """ import ipaddress import json import logging import re from argparse import Namespace from urllib.parse import urlparse from schema import Schema, And, Optional, Regex, SchemaError, Use, Or from xray.internal.utils import read_sys_id logger = logging.getLogger('validations') # --- URL validator regexps taken from Django framework --- ul = '\u00a1-\uffff' # unicode letters range (must not be a raw string) # IP patterns ipv4_re = r'(?:25[0-5]|2[0-4]\d|[0-1]?\d?\d)(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}' # Host patterns hostname_re = r'[a-z' + ul + r'0-9](?:[a-z' + ul + r'0-9-]{0,61}[a-z' + ul + r'0-9])?' # Max length for domain name labels is 63 characters per RFC 1034 sec. 3.1 domain_re = r'(?:\.(?!-)[a-z' + ul + r'0-9-]{1,63}(?<!-))*' tld_re = ( r'\.' # dot r'(?!-)' # can't start with a dash r'(?:[a-z' + ul + '-]{2,63}' # domain label r'|xn--[a-z0-9]{1,59})' # or punycode label r'(?<!-)' # can't end with a dash r'\.?' # may have a trailing dot ) host_re = '(' + hostname_re + domain_re + tld_re + '|localhost)' regex = re.compile( r'^(?:http)s?://' # scheme r'(?:[^\s:@/]+(?::[^\s:@/]*)?@)?' # user:pass authentication r'(?:' + ipv4_re + '|' + host_re + ')' r'(?::\d{2,5})?' # port r'(?:[/?#][^\s]*)?' # resource path r'\Z', re.IGNORECASE) # --- # --- EMAIL validator regexp taken from https://emailregex.com/ --- email_regex = r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])""" # --- # --- Validation helper functions def is_ip_valid(ipaddr: str) -> bool: """ Checks if given IP address is a valid one :param ipaddr: IP address :return: True if IP address is valid, False otherwise """ if isinstance(ipaddr, int): return False try: ipaddress.ip_address(ipaddr) return True except ValueError: return False def is_ipv4_valid(ipaddr: str) -> bool: """ Checks if given IP address is a valid one for IPv4 '*' is also allowed :param ipaddr: IPv4 address :return: True if IPv4 address is valid, False otherwise """ if isinstance(ipaddr, int): return False if ipaddr == '*': return True try: ipaddress.IPv4Address(ipaddr) return True except ValueError: return False def url_cast(orig_url: str) -> str: """ Add http scheme if original url is missing it """ fragments = urlparse(orig_url) if not fragments.scheme: return f'http://{orig_url}' return orig_url # --- # --- Validation Schemas validation_schema = Schema({ 'command': And(str, lambda c: c in ('start', 'stop', 'continue', 'complete', 'delete', 'autocomplete-tasks', 'enable-continuous', 'disable-continuous', 'start-continuous', 'stop-continuous', 'continuous-tracing-list', 'tasks-list', 'requests-list', 'request-data', 'enable-user-agent', 'disable-user-agent', 'user-agent-status', 'advanced-metrics', 'enable-serverwide-mode', 'disable-serverwide-mode'), error='Invalid command'), Optional('lang'): Or(str, None), 'system_id': And(str, lambda s: s == read_sys_id(), error='system_id is invalid'), Optional('tracing_task_id'): str, Optional('url'): And(Use(url_cast), Regex(regex, error='URL is invalid')), Optional('email'): And(str, Regex(email_regex, error='EMAIL is invalid')), Optional('client_ip'): And(str, is_ipv4_valid, error='IP is invalid'), Optional('time'): And(int, lambda t: 0 < t <= 2880, error='minimum time count is 1 minute, maximum - 48 hours (2880 minutes)'), Optional('request_qty'): And(int, lambda q: 0 < q <= 100, error='minimum request_qty count is 1, maximum - 100'), Optional('request_id'): And(int, lambda q: q > 0, error='minimum request_id is 1'), Optional('enable'): Or(bool, None), Optional('disable'): Or(bool, None), Optional('status'): Or(bool, None), }) validation_user_schema = Schema({ 'command': And(str, lambda c: c in ('start', 'stop', 'continue', 'complete', 'delete', 'tasks-list', 'requests-list', 'request-data', 'user-agent-status'), error='Invalid user command'), Optional('lang'): Or(str, None), Optional('tracing_task_id'): str, Optional('url'): And(Use(url_cast), Regex(regex, error='URL is invalid')), Optional('client_ip'): And(str, is_ipv4_valid, error='IP is invalid'), Optional('time'): And(int, lambda t: 0 < t <= 2880, error='minimum time count is 1 minute, maximum - 48 hours (2880 minutes)'), Optional('request_qty'): And(int, lambda q: 0 < q <= 100, error='minimum request_qty count is 1, maximum - 100'), Optional('request_id'): And(int, lambda q: q > 0, error='minimum request_id is 1') }) validation_adviser_schema = Schema({ Optional('api_version'): Or(str, None), Optional('lang'): Or(str, None), 'command': And(str, lambda c: c in ('list', 'details', 'apply', 'rollback', 'subscription', 'agreement', 'counters', 'status', 'sites-status', 'wordpress-plugin-install', 'wordpress-plugin-uninstall', 'get-options', 'get-limits', 'get-usage', 'update-advices-metadata', 'awp-cdn-get-pullzone', 'awp-cdn-remove-pullzone', 'awp-cdn-purge', 'awp-sync', 'get-cdn-usage', 'report-analytics', 'wp-plugin-data', 'wp-plugin-copy'), error='Invalid command'), Optional('advice_id'): Or(str, None), Optional('ignore_errors'): bool, Optional('async_mode'): bool, Optional('source'): Or(str, None), Optional('reason'): Or(str, None), Optional('extends'): bool, Optional('listen'): bool, Optional('username'): Or(str, None), Optional('account_id'): Or(str, None), Optional('domain'): str, Optional('website'): str, Optional('text'): str, Optional('accept_license_terms'): bool, Optional('analytics_data'): Or(str, None), # for analytics reporting Optional('feature'): Or(str, None), Optional('event'): Or(str, None), Optional('user_hash'): Or(str, None), Optional('journey_id'): Or(str, None), Optional('variant_id'): Or(str, None), # for wp plugin manager Optional('plugin_name'): str, Optional('plugin_version'): str, Optional('tmp_dir'): str, }) # --- def _validate(input_args: dict, _scheme: Schema) -> Namespace: """ Validate given input with schema Input arguments expected in s dict form :param input_args: dict with input data :param _scheme: schema for validation """ try: return Namespace(**(_scheme.validate(input_args))) except SchemaError as e: logger.error('Input validation error', extra={'err': str(e)}) raise SystemExit(json.dumps({ 'result': f'Input validation error: {str(e)}' })) def validate(input_args: dict) -> Namespace: """ Validate given input with schema Input arguments expected in s dict form :param input_args: dict with input data :param _scheme: schema for validation """ return _validate(input_args, validation_schema) def validate_user(input_args: dict) -> Namespace: """ Validate given input with schema Input arguments expected in s dict form :param input_args: dict with input data :param _scheme: schema for validation """ return _validate(input_args, validation_user_schema) def validate_adviser(input_args: dict) -> Namespace: """ Validate given input with schema Input arguments expected in s dict form :param input_args: dict with input data :param _scheme: schema for validation """ return _validate(input_args, validation_adviser_schema)