/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
console_utils
/
run_user
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains end-user runners for X Ray """ import logging from argparse import Namespace, ArgumentParser from dataclasses import dataclass, field from typing import Callable, Optional from xray import gettext as _ from xray.internal.exceptions import XRayError from ..cmdline_parser import cmd_parser_user_manager, cmd_parser_adviser from ..validations import validate_user, validate_adviser logger = logging.getLogger('runners') @dataclass class Runner: """Runner dataclass""" name: str target: str args_parser: Callable[[], ArgumentParser] validator: Callable[[dict], Namespace] option_cast: Optional[dict] = field(default_factory=dict) manager_runner = Runner('manager', 'cloudlinux-xray-manager', cmd_parser_user_manager, validate_user) adviser_runner = Runner('smart_advice', 'cl-smart-advice', cmd_parser_adviser, validate_adviser, option_cast={ 'api_version': lambda x: x.replace('_', '-'), 'ignore_errors': lambda x: x.replace('_', '-'), 'async_mode': lambda x: x.rstrip('_mode')}) runners = { 'manager': manager_runner, 'smart_advice': adviser_runner } def get_runner(name: str) -> Optional[Runner]: """Get appropriate runner by name""" runner = runners.get(name) logger.debug('Runner is %s', runner) if runner is None: raise XRayError(_('Unknown runner')) return runner