/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
console_utils
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains X Ray Smart Advice local utility main run function """ import sys import os from xray.adviser.cli_api import SmartAdviceUtil from xray.adviser.awp_provision_api import AWPProvisionAPI from xray.adviser.wordpress_plugin_manager import get_plugin_data, get_plugin from xray.console_utils.base_app import app from xray.console_utils.cmdline_parser import ( cmd_parser_adviser, parse_cmd_arguments ) from xray.console_utils.validations import validate_adviser from xray.internal.constants import adviser_log from xray.internal.user_plugin_utils import root_execution_only_check from xray.internal.utils import configure_logging from xray.internal.user_plugin_utils import get_xray_exec_user from xray import json_unicode_dump, gettext as _ awp_provision_commands = ['awp-cdn-get-pullzone', 'awp-cdn-remove-pullzone', 'awp-cdn-purge', 'awp-sync', 'get-cdn-usage'] wp_plugin_manager_commands = ['wp-plugin-data', 'wp-plugin-copy'] @app def run(): """ X Ray Smart Advice run function """ root_execution_only_check() configure_logging(adviser_log) raw_args = parse_cmd_arguments(cmd_parser_adviser()) args = validate_adviser(raw_args.__dict__) sa = SmartAdviceUtil() if args.command == 'list': result = sa.advice_list(extended=args.extends) elif args.command == 'sites-status': # because from user plugin UI it works via XRAYEXEC_UID envvar username = args.username or get_xray_exec_user() if not username: print(_('parameter --username must be specified')) sys.exit(1) result = sa.get_site_statuses(username=username) elif args.command == 'details': result = sa.advice_details(args.advice_id) elif args.command == 'apply': result = sa.manage_advice('apply', args.advice_id, ignore_errors=args.ignore_errors, async_mode=args.async_mode, source=args.source, accept_terms=args.accept_license_terms, analytics_data=args.analytics_data) elif args.command == 'rollback': result = sa.manage_advice('rollback', args.advice_id, async_mode=args.async_mode, source=args.source, reason=args.reason, analytics_data=args.analytics_data) elif args.command == 'counters': result = sa.advice_counters() elif args.command == 'status': result = sa.manage_advice_status(args.advice_id) elif args.command == 'subscription': result = sa.create_subscription(args.advice_id) elif args.command == 'wordpress-plugin-install': result = sa.sync_advices_wordpress_plugin(im360_cached=False) elif args.command == 'wordpress-plugin-uninstall': result = sa.uninstall_wordpress_plugins() elif args.command == 'agreement': result = sa.get_agreement_text(feature=args.text) elif args.command == 'update-advices-metadata': result = sa.update_advices_metadata() elif args.command == 'get-options': username = args.username or get_xray_exec_user() if not username: print(_('parameter --username must be specified')) sys.exit(1) result = sa.get_options(username=username) elif args.command == 'get-limits': username = args.username or get_xray_exec_user() if not username: print(_('parameter --username must be specified')) sys.exit(1) result = sa.get_limits(username=username) elif args.command == 'get-usage': username = args.username or get_xray_exec_user() if not username: print(_('parameter --username must be specified')) sys.exit(1) result = sa.get_usage(username=username) elif args.command == 'report-analytics': result = sa.analytics_report(username=args.username, feature=args.feature, source=args.source, event=args.event, advice_id=args.advice_id, journey_id=args.journey_id, user_hash=args.user_hash, variant_id=args.variant_id) elif args.command in awp_provision_commands: awp_cli = AWPProvisionAPI() if not args.account_id and not args.username and not os.environ.get('XRAYEXEC_UID'): print(_('parameter --account_id or --username must be specified')) sys.exit(1) original_url = '' # domain is required parameter for commands which needs original_url if hasattr(args, 'domain'): original_url = args.domain if original_url: from clwpos.constants import PULLZONE_DOMAIN_PROTOCOL if not original_url.startswith(PULLZONE_DOMAIN_PROTOCOL): original_url = f'{PULLZONE_DOMAIN_PROTOCOL}{args.domain}' if args.command == 'awp-cdn-get-pullzone': result = awp_cli.get_pullzone(account_id=args.account_id, domain=original_url, website=args.website, username=args.username) elif args.command == 'awp-cdn-remove-pullzone': result = awp_cli.remove_pullzone(account_id=args.account_id, domain=original_url, website=args.website, username=args.username) elif args.command == 'awp-cdn-purge': result = awp_cli.purge_cdn_cache(account_id=args.account_id, domain=original_url, website=args.website, username=args.username) elif args.command == 'awp-sync': account_id = args.account_id if account_id: account_id = account_id.split(',') result = awp_cli.sync_account(account_id, username=args.username) elif args.command == 'get-cdn-usage': result = awp_cli.get_usage(args.account_id, username=args.username) else: raise SystemExit(_('Unknown action')) elif args.command in wp_plugin_manager_commands: if args.command == 'wp-plugin-data': if not args.plugin_name: print(_('parameter --plugin_name must be specified')) sys.exit(1) result = get_plugin_data(plugin_name=args.plugin_name) elif args.command == "wp-plugin-copy": if not args.tmp_dir or not args.plugin_version or not args.plugin_name: print(_('parameters --plugin_name, --tmp_dir, and --plugin_version must be specified')) sys.exit(1) result = get_plugin(plugin_name=args.plugin_name, plugin_version=args.plugin_version, dest_dir=args.tmp_dir) else: raise SystemExit(_('Unknown action')) else: raise SystemExit(_('Unknown action')) if result: print(json_unicode_dump(result))