/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
lvestats
/
core
/
Upload Filee
HOME
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import importlib.util import inspect import logging import os import sys from lvestats.core.plugin import LveStatsPlugin class PluginLoader(object): def check_incorrect_symlink(self, path): """ Check correcting symlink. If symlink incorrect return True :param str path: path to file or symlink :return bool: """ if os.path.islink(path) and not os.path.exists(os.path.realpath(path)): self.log.warning('Incorrect symlink %s', path) return True def __init__(self, plugins_folder): self.log = logging.getLogger(__name__) classes_loaded = [] if os.path.isdir(plugins_folder): file_names = (fn for fn in os.listdir(plugins_folder) if fn.endswith('.py')) for file_name in file_names: try: module_name, _ = os.path.splitext(file_name) full_path = os.path.join(plugins_folder, file_name) if self.check_incorrect_symlink(full_path): continue spec = importlib.util.spec_from_file_location(module_name, full_path) if spec is None: self.log.error('Unable to find spec for %s', file_name) continue module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Load only subclasses of LveStatsPlugin classes = [x for x in list(module.__dict__.values()) if inspect.isclass(x) and issubclass(x, LveStatsPlugin) and x is not LveStatsPlugin] if not classes: self.log.error('No classes to load found in %s', file_name) continue classes_loaded.extend(classes) sys.modules[module.__name__] = module except Exception as ex: self.log.exception('Unable to load %s due to exception %s', file_name, ex) self.plugins = classes_loaded