/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
clselect
/
baseclselect
/
Upload Filee
HOME
# coding: utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import fcntl import os import contextlib import psutil import subprocess import simplejson as json # because of unicode handling from abc import ABCMeta, abstractmethod from time import time from . import ( INSTALLING_STATUS, REMOVING_STATUS, AcquireInterpreterLockError, ) from future.utils import with_metaclass from clcommon.utils import is_testing_enabled_repo from clcommon.group_info_reader import GroupInfoReader MAX_CACHE_AGE_SEC = 24 * 3600 class PkgManagerError(Exception): pass class BasePkgManager(with_metaclass(ABCMeta, object)): """ Class responsible for all interactions with Yum, interpreter versions installation/removal and gathering info about already installed versions """ _testing_repo_enabled_cache = None _config_dir = None _versions_info = None _yum_cmd = None _alt_names = None _redirect_log = None _install_cmd = None _remove_cmd = None @classmethod def run_background(cls, command): fnull = open(os.devnull, 'w') return subprocess.Popen( command, stdin=fnull, stdout=fnull, stderr=fnull, shell=True, executable='/bin/bash' ) @property def _testing_enabled(self): if self._testing_repo_enabled_cache is not None: return self._testing_repo_enabled_cache res = is_testing_enabled_repo() self._testing_repo_enabled_cache = res return res @property def _yum_cache_file(self): if self._testing_enabled: return os.path.join(self._config_dir, 'yum_cache.dat.testing_enabled') return os.path.join(self._config_dir, 'yum_cache.dat') def update_yum_cache(self): groups = GroupInfoReader.get_group_info(self._alt_names) groups = list(groups.keys()) with open(self._yum_cache_file, 'w') as f: for group in groups: f.write(f'{group}\n') def _read_yum_cache(self): """Return data from file or None if file is absent or outdated""" try: stat = os.stat(self._yum_cache_file) except OSError: return None if (time() - stat.st_mtime) > MAX_CACHE_AGE_SEC: return None return open(self._yum_cache_file).read() @staticmethod def _remove_silent(f): """ Silently remove file ignoring all errors """ try: os.remove(f) except (OSError, IOError): pass @property def installed_versions(self): """ Returns list of installed interpreter versions by scanning alt_node_dir and cache result. Cache also can be pre-filled at init time for testing/debugging purposes """ if self._versions_info is None: self._versions_info = self._scan_interpreter_versions() return list(self._versions_info.keys()) def get_full_version(self, maj): """ Should return full interpreter version for a particular major version or just fallback to given version if info is not available for any reason. This information is taken from the hash map populated during installed_packages scan. :param maj: Major interpreter version :return: Full interpreter version or Major if info is not available """ if self._versions_info is None: self._versions_info = self._scan_interpreter_versions() try: return self._versions_info[maj]['full_version'] except KeyError: return maj @property def _pid_lock_file(self): return os.path.join(self._config_dir, 'yum.pid.lock') @property def _cache_lock_file(self): return os.path.join(self._config_dir, 'yum_cache.pid.lock') def _write_yum_status(self, pid, version=None, status=None): """ :param pid: pid of Yum process :param version: interpreter version or None for "cache update" case :param status: what yum is currently doing(few predefined statuses) :return: None """ if not os.path.exists(self._config_dir): self._create_config_dirs() json.dump({ 'pid': pid, 'version': str(version), 'status': status, 'time': float(time()), }, open(self._pid_lock_file, 'w')) def _check_yum_in_progress(self): ongoing_yum = self._read_yum_status() if ongoing_yum is not None: return "{} of version '{}' is in progress. " \ "Please, wait till it's done"\ .format(ongoing_yum['status'], ongoing_yum['version']) def _read_yum_status(self): """ Result "None" - means installing/removing of our packages is not currently in progress. However, it doesn't mean that any other yum instance is not running at the same time, but we ok with this because our yum process will start processing automatically once standard /var/run/yum.pid lock is removed by other process :return: None or dict """ if self._pid_lock_file is None: raise NotImplementedError() try: data = json.load(open(self._pid_lock_file)) except Exception: # No file or it's broken: self._remove_silent(self._pid_lock_file) return None if not psutil.pid_exists(data.get('pid')): #pylint: disable=E1101 self._remove_silent(self._pid_lock_file) return None # TODO check timeout and stop it or just run with bash "timeout ..." try: pid, _ = os.waitpid(data['pid'], os.WNOHANG) except OSError: # Case when we exit before completion and yum process is no # longer our child process return data # still working, wait... if pid == 0: # still working, wait... return data self._remove_silent(self._pid_lock_file) return None # It was zombie and has already finished def format_cmd_string_for_installing(self, version): """ Formatting cmd string for installing package :return: formatted cmd string :param version: version of interpreter for installing :rtype: str """ return self._install_cmd.format(version) def format_cmd_string_for_removing(self, version): """ Formatting cmd string for removing package :return: formatted cmd string :param version: version of interpreter for removing :rtype: str """ return self._remove_cmd.format(version) def install_version(self, version): """Return None or Error string""" err = self._verify_action(version) if err: return err if version in self.installed_versions: return 'Version "{}" is already installed'.format(version) available = self.checkout_available() if available is None: return ('Updating available versions cache is currently ' 'in progress. Please, try again in a few minutes') if version not in available: return ('Version "{}" is not available. ' 'Please, make sure you typed it correctly'.format(version)) cmd_string = self.format_cmd_string_for_installing(version) p = self.run_background(cmd_string) self._write_yum_status(p.pid, version, INSTALLING_STATUS) def remove_version(self, version): """Return None or Error string""" err = self._verify_action(version) if err: return err if version not in self.installed_versions: return 'Version "{}" is not installed'.format(version) if self.is_interpreter_locked(version): return "This version is currently in use by another operation. " \ "Please, wait until it's complete and try again" if self._is_version_in_use(version): return "It's not possible to uninstall version which is " \ "currently in use by applications" cmd_string = self.format_cmd_string_for_removing(version) p = self.run_background(cmd_string) self._write_yum_status(p.pid, version, REMOVING_STATUS) def in_progress(self): """ Should return version and it's status for versions that is currently installing|removing """ ongoing_yum = self._read_yum_status() if ongoing_yum is not None and \ ongoing_yum['status'] in (INSTALLING_STATUS, REMOVING_STATUS,): return { ongoing_yum['version']: { 'status': ongoing_yum['status'], 'base_dir': '', } } return None @contextlib.contextmanager def acquire_interpreter_lock(self, interpreter_version): lock_name = self._get_lock_file_path(interpreter_version) try: lf = open(lock_name, 'w') except IOError: raise AcquireInterpreterLockError(interpreter_version) try: fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: # TODO: try to use LOCK_SH here # It's ok if it's already locked because we allow multiple # operations for different applications at the same time # with the same "--new-version" pass try: yield finally: # Protection from exception in "context code" lf.close() @abstractmethod def checkout_available(self): raise NotImplementedError() @abstractmethod def _scan_interpreter_versions(self): raise NotImplementedError() @abstractmethod def _create_config_dirs(self): raise NotImplementedError() def is_interpreter_locked(self, interpreter_version): lock_name = self._get_lock_file_path(interpreter_version) if not os.path.isfile(lock_name): return False lf = open(lock_name, 'w') try: fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: return True finally: lf.close() return False @abstractmethod def _verify_action(self, version): raise NotImplementedError() def _get_lock_file_path(self, version): raise NotImplementedError() @abstractmethod def _is_version_in_use(self, version): raise NotImplementedError()