/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
console_utils
/
run_user
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This package contains classes and functions to run X Ray utilities as end-user """ from socket import socket, AF_UNIX, SOCK_STREAM from xray import gettext as _ from xray.console_utils.cmdline_parser import ( parse_cmd_arguments ) from xray.internal.constants import user_agent_sock from xray.internal.exceptions import XRayError from xray.internal.user_plugin_utils import ( pack_request, unpack_response, check_for_root, error_response ) from .runners import get_runner from clwpos.utils import get_locale_from_envars def executed_as_root_check() -> None: """ Check if User Manager is executed as root and throw error in case if yes """ if check_for_root(): raise SystemExit( error_response(_('X-Ray End-User utilities cannot be executed as root'))) def run(run_for: str = 'manager') -> None: """ Main run function """ executed_as_root_check() try: runner = get_runner(run_for) except XRayError as e: raise SystemExit(str(e)) args = parse_cmd_arguments(runner.args_parser()) validated_args = runner.validator(args.__dict__) validated_args.runner = runner.name args_dict = validated_args.__dict__ lang_env = get_locale_from_envars() if not args_dict.get('lang') and lang_env: args_dict['lang'] = lang_env to_send = pack_request(args_dict) with socket(AF_UNIX, SOCK_STREAM) as s: try: s.connect(user_agent_sock) except (ConnectionError, OSError): raise SystemExit(error_response( _('X-Ray User Plugin is not enabled. Please, contact your server administrator'))) s.sendall(to_send.encode()) data = unpack_response(s) print(data.decode())