/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
Upload Filee
HOME
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # pylint: disable=too-many-lines import copy import datetime import logging import pwd import time import warnings from sqlalchemy import Float, Integer, text from sqlalchemy import exc as sa_exc from sqlalchemy.sql import and_, asc, desc, label, not_, or_, select from sqlalchemy.sql.expression import ColumnElement, alias, case, cast, func from lvestats.core.plugin import LveStatsPluginTerminated from lvestats.lib.commons.dateutil import ( gm_datetime_to_unixtimestamp, gm_to_local, local_to_gm, round_1m, str_to_timedelta, unixtimestamp_to_gm_datetime, ) from lvestats.lib.commons.func import ( get_current_max_lve_id, skip_user_by_maxuid, ) from lvestats.lib.commons.sizeutil import convert_bytes, convert_powers_of_1000, mempages_to_bytes from lvestats.lib.config import ( HIDE_MAX_UID_LVE_PARAMETER, read_config, str_to_bool, ) from lvestats.orm import LVE_STATS_2_TABLENAME_PREFIX, history, history_x60, servers __all__ = ('HistoryShow', 'HistoryShowUnion', 'OutputFormatter', 'get_lve_version') def get_lve_version(dbengine, server_id): sql_query_lve_version = select([servers.lve_version]).where(servers.server_id == server_id) with dbengine.begin() as transaction_: cursor_ = transaction_.execute(sql_query_lve_version) if cursor_.returns_rows: res = cursor_.fetchone() if res is None: return 6 else: return int(res['lve_version']) else: return 6 def servers_info(dbengine): sql_query_lve_version = select([servers.server_id, servers.lve_version]) return dbengine.execute(sql_query_lve_version) def convert_key_to_label(key_): return str.lower(key_) def add_labes_to_column(func_dict): """ Add label (on SQL add ".. as ..") to aggregate_func_dict key of dict convert to lowercase and add as label :param dict func_dict: :return dict: """ func_dict_labeled = {} for key_, sqlalchemy_func in list(func_dict.items()): if issubclass(sqlalchemy_func.__class__, ColumnElement): func_labeled = label(convert_key_to_label(key_), sqlalchemy_func) else: func_labeled = sqlalchemy_func func_dict_labeled[key_] = func_labeled return func_dict_labeled def convert_to_list(arg): if isinstance(arg, (str, int)): return [arg] return arg class OutputFormatter(object): def __init__(self, fields, rows=None, orders=None): self.rows = rows or [] self.fields = list(fields) self._fields_lowered = [_.lower() for _ in fields] # self.fields use for output self._orders = [] self._hidden_fields = set() # self._fields indexes list fields to hide if orders: for field, order in orders: self.add_order(field, order) def get_fields(self): if self._hidden_fields: return [f_ for f_ in self.fields if f_ not in self._hidden_fields] else: return self.fields def set_rows(self, rows): """ Use this method if you used hide_fields number items in row must bee same as in fields after hide """ self.rows = rows def hide_fields(self, h_fields): """ :param tuple|list h_fields: :return: """ self._hidden_fields = self._hidden_fields.union(set(h_fields)) def add_order(self, fields, order): """ :param list|tuples fields: :param order: field to use to order result :return: """ if isinstance(order, str): try: order = getattr(self, order) except AttributeError as e: raise ValueError(f'Non such {order} order') from e if not hasattr(order, '__call__'): raise ValueError(f'input object {order} must be callable') self._orders.append(({_.lower() for _ in fields}, order)) def _convert_line(self, row_): """ :param iterable row_: :return: """ row_ = list(row_) row_out = [] for field_, r_ in zip(self._fields_lowered, row_): if field_ in self._hidden_fields: # continue if field must be hide continue # use many orders to one cell for order_fields, order in self._orders: if field_ in order_fields: try: r_ = order(r_) except (TypeError, ValueError, KeyError, IndexError): pass row_out.append(r_) return row_out def __iter__(self): for row_ in self.rows: yield self._convert_line(row_) def __getitem__(self, index): if isinstance(index, slice): return list(map(self._convert_line, self.rows[index.start: index.stop])) return self._convert_line(self.rows[index]) def __len__(self): return len(self.rows) def get_corrected_list(self): return list(self) # build-in orders @staticmethod def strftime(value, format_='%m-%d %H:%M'): return value.strftime(format_) @staticmethod def percentage(value): if value is None: return '-' try: float(value) except ValueError: return value return str(value * 100) + '%' @staticmethod def bytes(value): if value is None: return '-' return convert_bytes(value) @staticmethod def powers_of_1000(value): if value is None: return '-' return convert_powers_of_1000(value) @staticmethod def username(value): try: return pwd.getpwuid(int(value)).pw_name except KeyError: return value @staticmethod def datetime(value): """ Convert unix timestamp to datetime (local timezone) """ return datetime.datetime.fromtimestamp(value) def enumerate_duplicate_columns(columns): """ Enumerate if columns name or functions are duplicated Use for force add duplicate columns to select operator :param list|tuple columns: :return list|tuple: enumerated columns """ output = [] output_str = [] # for controlling columns count for c_ in columns: c_str = str(c_) dubl_count = output_str.count(c_str) if dubl_count >= 1: # check whether there is a duplicate; "c_ in columns" for sqlalchemy classes not work # check if column is string or function and get column name if isinstance(c_, str): c_name = c_ else: c_name = c_.name # numbering duplicate columns c_name += '_' + str(dubl_count + 1) c_ = label(c_name, c_) # rename column output.append(c_) output_str.append(c_str) return output usage_to_limit_dict = { 'aCPU'.lower(): 'lCPU'.lower(), 'mCPU'.lower(): 'lCPU'.lower(), 'aVMem'.lower(): 'lVMem'.lower(), 'mVMem'.lower(): 'lVMem'.lower(), 'aEP'.lower(): 'lEP'.lower(), 'mEP'.lower(): 'lEP'.lower(), 'aPMem'.lower(): 'lPMem'.lower(), 'mPMem'.lower(): 'lPMem'.lower(), 'aNproc'.lower(): 'lNproc'.lower(), 'mNproc'.lower(): 'lNproc'.lower(), 'aIO'.lower(): 'lIO'.lower(), 'mIO'.lower(): 'lIO'.lower(), 'aIOPS'.lower(): 'lIOPS'.lower(), 'mIOPS'.lower(): 'lIOPS'.lower(), } FIELD_TO_TABLE_COLUMN = { 'ID': 'id', 'aCPU': 'cpu', 'aVMem': 'mem', 'aEP': 'mep', 'aPMem': 'memphy', 'aIO': 'io', 'aNproc': 'nproc', 'aIOPS': 'iops', 'lCPU': 'cpu_limit', 'lEP': 'mep_limit', 'lVMem': 'mem_limit', 'lPMem': 'lmemphy', 'lIO': 'io_limit', 'lNproc': 'lnproc', 'lIOPS': 'liops', 'VMemF': 'mem_fault', 'PMemF': 'memphy_fault', 'EPf': 'mep_fault', 'NprocF': 'nproc_fault', 'CPUf': 'cpu_fault', 'IOf': 'io_fault', 'IOPSf': 'iops_fault', 'uCPU': 'cpu', 'uEP': 'mep', 'uVMem': 'mem', 'uPMem': 'memphy', 'uIO': 'io', 'uNproc': 'nproc', 'uIOPS': 'iops', 'mCPU': 'cpu', 'mEP': 'mep', 'mVMem': 'mem', 'mPMem': 'memphy', 'mNproc': 'nproc', 'mIO': 'io', 'mIOPS': 'iops', } FIELD_AVERAGE = ['aCPU', 'aVMem', 'aPMem', 'aEP', 'aNproc', 'aIO', 'aIOPS'] FIELD_LIMIT = ['lCPU', 'lVMem', 'lPMem', 'lEP', 'lNproc', 'lIO', 'lIOPS'] FIELD_FAULT = ['CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF', 'IOf', 'IOPSf'] FIELD_USAGE = ['uCPU', 'uVMem', 'uPMem', 'uEP', 'uNproc', 'uIO', 'uIOPS'] FIELD_MAX = ['mCPU', 'mVMem', 'mPMem', 'mEP', 'mNproc', 'mIO', 'mIOPS'] KEYS_NORMALIZATION_LOOKUP_TABLE = {'FROM': 'From', 'TO': 'To', 'ANYF': 'anyF'} for key in list(FIELD_TO_TABLE_COLUMN.keys()): KEYS_NORMALIZATION_LOOKUP_TABLE[key.upper()] = key def normalize_optional_column_names(names): if names is None: return None return normalize_column_names(names) def normalize_column_names(names): result = [] for name in names: result.append(normalize_column_name(name)) return result def normalize_column_name(name): if name: return KEYS_NORMALIZATION_LOOKUP_TABLE[name.upper()] return None time_unit_orders = [ ('10m', lambda dt: dt - datetime.timedelta(minutes=dt.minute % 10 + 10)), ('1h', lambda dt: dt.replace(minute=0) if dt.minute else dt - datetime.timedelta(hours=1)), # round to hour ('1d', lambda dt: dt.replace(hour=0, minute=0)), # round to day ] def dyn_time_unit_groups(period_from, period_to): period_groups = [round_1m(period_from), round_1m(period_to + datetime.timedelta(minutes=1))] time_unit_list = ['1m'] _from_order_fun = round_1m for time_unit, order_fun in time_unit_orders: from_to_point = order_fun(period_groups[1]) if from_to_point - period_groups[0] >= str_to_timedelta(time_unit): _from_order_fun = order_fun period_groups.insert(1, from_to_point) time_unit_list.append(time_unit) period_groups[0] = _from_order_fun(period_groups[0]) # prepare to output as list of tuples # (<From|datetime>, <To|datetime>, <time-unit|int>) from_to_groups = [] for index, time_unit in enumerate(time_unit_list): from_to_groups.append( (period_groups[-index - 2], period_groups[-index - 1], int(str_to_timedelta(time_unit).total_seconds())) ) return from_to_groups class HistoryShow(object): def __init__( self, dbengine, period_from, period_to, uid=None, show_columns=None, server_id='localhost', time_unit=None, order_by=None, by_usage=None, by_usage_percentage=0.9, by_fault=None, threshold=1, limit=0, table=None, log=None, time_count=None, show_idle=False, ): """ Show different statistics from history table :param sqlalchemy.engine.base.Engine dbengine: database engine to use :param datetime.datetime|float|int period_from: start time retrieve data :param datetime.datetime|float|int period_to: end time retrieve data :param int|None|list|tuple uid: filter the output information to the user uid :param tuple|list show_columns: display columns in the order specified. If not, show all supported valid column names: 'aCPU', 'lPMem', 'uIO', 'uEP', 'lEP', 'aVMem', 'PMemF', 'lVMem', 'NprocF', 'anyF', 'aNproc', 'VMemF', 'ID', 'lCPU', 'aIOPS', 'aEP', 'aPMem', 'uPMem', 'lIO', 'lIOPS', 'uCPU', 'lNproc', 'aIO', 'uIOPS', 'EPf', 'uVMem', 'uNproc' :param str server_id: filtering the output for "server id" :param int time_unit: grouping output over an interval of time (in seconds) :param str|None order_by: sorting output by column name (supported by columns) :param str|tuple|list by_usage: filtering are grouped data for the percentage of the use of resources :param float by_usage_percentage: percent for the parameter setting 'by_usage' :param tuple|list|None by_fault: filtering data are grouped for quantity faults (None if it is not filtered) valid names: 'aCPU', 'lPMem', 'uIO', 'uEP', 'lEP', 'aVMem', 'PMemF', 'lVMem', 'NprocF', 'anyF', 'aNproc', 'VMemF', 'ID', 'lCPU', 'aIOPS', 'aEP', 'aPMem', 'uPMem', 'lIO', 'lIOPS', 'uCPU', 'lNproc', 'aIO', 'uIOPS', 'EPf', 'uVMem', 'uNproc' :param threshold: number faults for filtering the data are grouped (used together with by_fault) :param int|None limit: limit on the number of output data (if 0 or None, then the limit is not set) :return generator: returns a list/generator of data with the order set out in the 'show_columns' """ self.dbengine = dbengine self.uid = uid self._is_multi_uids = not isinstance(uid, int) if show_columns is None: show_columns = get_supported_columns(lve_version=get_lve_version(dbengine=dbengine, server_id=server_id)) show_columns.insert(0, 'ID') self.show_columns = normalize_column_names(show_columns) self.server_id = server_id self.time_unit = time_unit self.by_fault = normalize_optional_column_names(by_fault) if order_by: self.order_by = normalize_column_name(order_by) elif self.by_fault and normalize_column_name('anyF') in self.by_fault: self.order_by = normalize_column_name('CPUf') else: self.order_by = self.by_fault and self.by_fault[0] self.by_usage = normalize_optional_column_names(by_usage) self.by_usage_percentage = by_usage_percentage self.threshold = threshold self.limit = limit self.log = log or logging.getLogger('SQL') self.table = table if table is not None else history.__table__ # "or" not supported in this self._table_alive = alias(self.table, 'alive') # alias of main table to using in self join self.period_from = ( period_from if isinstance(period_from, (int, float)) else gm_datetime_to_unixtimestamp(period_from) ) self.period_to = period_to if isinstance(period_to, (int, float)) else gm_datetime_to_unixtimestamp(period_to) self.time_count = time_count or self.get_time_count() # correct cpu/100 # we still have to round it, as it seems <[27.333/100] = [0.27332999999999996] - who knows why :( self.result_corrector = OutputFormatter( fields=self.show_columns, orders=[ [FIELD_MAX + FIELD_AVERAGE + FIELD_LIMIT + FIELD_USAGE, lambda x: round(x, 3)], [['aCPU', 'lCPU', 'mCPU'], lambda item: round(float(item) / 100.0, 5)], ], ) self.hide_maxuid_lve = str_to_bool(read_config().get(HIDE_MAX_UID_LVE_PARAMETER, 'true')) def set_normalised_output(self): # correct data obtained from database # round EP IOPS Nproc output self.result_corrector.add_order( fields=['aEP', 'mEP', 'lEP', 'aNproc', 'mNproc', 'lNproc', 'aIOPS', 'mIOPS', 'lIOPS'], order=lambda x: int(round(x)), ) if self.dbengine.url.drivername != "sqlite": self.result_corrector.add_order( fields=[ 'aVMem', 'mVMem', 'lVMem', 'aPMem', 'mPMem', 'lPMem', 'aIO', 'mIO', 'lIO', 'uCPU', 'uEP', 'uVMem', 'uPMem', 'uIO', 'uNproc', 'uIOPS', ], order=float, ) self.result_corrector.add_order( fields=['EPf', 'VMemF', 'CPUf', 'PMemF', 'NprocF', 'IOf', 'IOPSf'], order=int ) # convert Mem to bytes self.result_corrector.add_order( fields=['aVMem', 'mVMem', 'lVMem', 'aPMem', 'mPMem', 'lPMem'], order=mempages_to_bytes ) def _where_time_period(self, table=None): """ Generate WHERE created BETWEEN xxxxxxxxx AND yyyyyyyy :return: """ # filtering condition by time period if table is None: table = self.table return table.c.created.between(self.period_from, self.period_to) def _where_server_id(self): """ Generate WHERE server_id = 'server_name' :return: """ return self.table.c.server_id == self.server_id def _where_uid(self, uid=-1, table=None): """Generate WHERE id = 'user_uid'""" if table is None: table = self.table if uid == -1: uid = self.uid if uid is None: if self.hide_maxuid_lve: # skip ids in range(MAX_UID, MAX_LVE_ID), because ids > MAX_LVE_ID may contain info # about reseller`s limits return and_( table.c.id > 0, or_(not_(skip_user_by_maxuid(table.c.id)), table.c.id > get_current_max_lve_id()) ) return table.c.id > 0 elif isinstance(uid, (list, tuple)): if self.dbengine.url.drivername == 'sqlite': # little workaround for sqlite's limit of 999 variables # let's compile query manually return text(table.c.id.in_(list(uid)).expression.compile(compile_kwargs={"literal_binds": True}).string) else: # mysql or postgresql do not have such limit return table.c.id.in_(list(uid)) else: return table.c.id == uid def get_time_count(self): """ SELECT count(*) FROM lve_stats2_history WHERE id = 0 AND created BETWEN xxxx AND yyyy server_id = 'localhost' """ where = and_( history.created.between(self.period_from, self.period_to), history.id == 0, history.server_id == self.server_id, ) query = select([text('count(*)')]).where(where) time_start = time.time() q = str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ') self.log.debug(q) data = self.dbengine.execute(query) self.log.debug('query time: %s', time.time() - time_start) return data.fetchall()[0][0] def _fun_avg(self, item): """ Generate aggregate function for calculate average for example sum(lve_stats2_history.cpu) / 60 :param item: :return: """ return cast(func.sum(item) / cast(self._fun_time_count(), Float), Float) def _fun_limit(self, item): """ Generate aggregate function for calculate limit if column_limit is zero its mean no limit for example CASE WHEN (min(lve_stats2_history.cpu_limit) > 0) THEN max(lve_stats2_history.cpu_limit) ELSE 0 END :param item: :return: """ return case([(func.min(item) > 0, func.max(item))], else_=0) def _fun_fault(self, item): """ Generate aggregate function for calculate fault for example sum(lve_stats2_history.cpu_fault) :param item: :return: """ return func.sum(item) def _fun_usage(self, item, item_limit): """ Generate aggregate function for calculate resource usage equivalent average/limit for example CASE WHEN ( CASE WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit) ELSE 0 END IS NULL ) THEN NULL WHEN ( CASE WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit) ELSE 0 END > 0 ) THEN ( sum(lve_stats2_history.cpu) / 1422 ) / CASE WHEN min(lve_stats2_history.cpu_limit) > 0 THEN max(lve_stats2_history.cpu_limit) ELSE 0 END END :param item: :return: """ # noinspection PyComparisonWithNone return case( [ # Don't use "is None" here. (self._fun_limit(item_limit) == None, None), # NOQA (self._fun_limit(item_limit) > 0, self._fun_avg(item) / self._fun_limit(item_limit)), ], else_=None, ) def _fun_max(self, item, item_limit, item_fault): """ Generate aggregate function for calculate maximum resource usage; for backward capability with lve-stats 0.x for example: CASE WHEN (sum(lve_stats2_history.cpu_fault) > 0) THEN max(lve_stats2_history.cpu_limit) ELSE max(lve_stats2_history.cpu) END :param item: :param item_limit: :param item_fault: :return: """ return case([(func.sum(item_fault) > 0, func.max(item_limit))], else_=func.max(item)) def _fun_time_count(self): if self._check_need_join(): return text('count(*)') else: return self.time_count def _fun_time_from(self): if self._is_multi_uids: return self.period_from else: if self._check_need_join(): _table = self._table_alive else: _table = self.table if self.dbengine.url.drivername == 'sqlite': # cast(..., Integer) using for compatibility with lve-stats-2.1-8 database; 'created' saved as float return ( cast((_table.c.created - self.period_from) / self.time_unit, Integer) * self.time_unit + self.period_from ) else: return ( func.floor((_table.c.created - self.period_from) / self.time_unit) * self.time_unit + self.period_from ) def _fun_time_to(self): # in case of changes here don't forget to check _group_by_query if self._is_multi_uids: return self.period_to else: return self._fun_time_from() + self.time_unit def _fun_user_id(self): if self._is_multi_uids: return label('ID', self.table.c.id) else: return label('ID', text(str(self.uid))) def _aggregate_fun_case(self, item): """ Function for obtain aggregate function (or column name) by column name :param item: 'aCPU', 'aVMem', 'aPMem', 'aEP', 'aNproc', 'aIO', 'aIOPS', 'lCPU', 'lVMem', 'lPMem', 'lEP', 'lNproc', 'lIO', 'lIOPS', 'CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF', 'IOf', 'IOPSf', 'uCPU', 'uVMem', 'uPMem', 'uEP', 'uNproc', 'uIO', 'uIOPS', 'mCPU', 'mVMem', 'mPMem', 'mEP', 'mNproc', 'mIO', 'mIOPS', 'anyF', 'ID', 'From', 'To' :type item: str :return: """ if item == 'anyF': fun_ = func.sum( self.table.c.mem_fault + self.table.c.memphy_fault + self.table.c.mep_fault + self.table.c.nproc_fault + self.table.c.cpu_fault + self.table.c.io_fault + self.table.c.iops_fault ) elif item == 'ID': fun_ = self._fun_user_id() elif item == 'From': fun_ = self._fun_time_from() elif item == 'To': fun_ = self._fun_time_to() else: column_name = FIELD_TO_TABLE_COLUMN[item] table_column = getattr(self.table.c, column_name) if item in FIELD_AVERAGE: fun_ = self._fun_avg(table_column) elif item in FIELD_LIMIT: fun_ = self._fun_limit(table_column) elif item in FIELD_FAULT: fun_ = self._fun_fault(table_column) elif item in FIELD_USAGE: column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_USAGE.index(item)]] table_column_limit = getattr(self.table.c, column_name_limit) fun_ = self._fun_usage(table_column, table_column_limit) elif item in FIELD_MAX: column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]] column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]] table_column_limit = getattr(self.table.c, column_name_limit) table_column_fault = getattr(self.table.c, column_name_fault) fun_ = self._fun_max(table_column, table_column_limit, table_column_fault) return fun_ def _group_by_query(self, select_query): by_usage_list = convert_to_list(self.by_usage) by_fault_list = convert_to_list(self.by_fault) if self._is_multi_uids: query = select_query.group_by(self.table.c.id) else: # it is correct because in query result all records having the same _fun_time_from() # also have same _fun_time_to() # so .group_by(_fun_time_to()) doesn't create new groups after .group_by(_fun_time_from()) # but we need both this group conditions because mysql throws an error otherwise query = select_query.group_by(self._fun_time_from()).group_by(self._fun_time_to()) if self.order_by: order_by_aggregate_func = self._aggregate_fun_case(self.order_by) query.append_order_by(desc(order_by_aggregate_func)) else: if self._is_multi_uids: query.append_order_by(asc(self.table.c.id)) else: query.append_order_by(self._fun_time_from()) # add filter for having grouped data if by_usage_list: # add filtering by user id filter_fun_list = [] for item in by_usage_list: if item in FIELD_AVERAGE: index = FIELD_AVERAGE.index(item) filter_fun = self.by_usage_percentage <= self._aggregate_fun_case(FIELD_USAGE[index]) else: index = FIELD_MAX.index(item) filter_fun = self.by_usage_percentage * self._aggregate_fun_case( FIELD_LIMIT[index] ) <= self._aggregate_fun_case(item) filter_fun_list.append(filter_fun) query.append_having(or_(*filter_fun_list)) if by_fault_list: by_fault_filter = [self.threshold <= self._aggregate_fun_case(funk_key_) for funk_key_ in by_fault_list] query.append_having(or_(*by_fault_filter)) if self.limit != 0 and self.limit is not None: query = query.limit(self.limit) return query def _columns_query(self): """ Generate output columns for SELECT <_columns_query(self)> FROM ... :return: """ columns_agregate_func = [] for column_key in self.show_columns: column_fun = self._aggregate_fun_case(column_key) if isinstance(column_fun, list): columns_agregate_func.extend(column_fun) else: if column_key not in ('From', 'To'): # digest not support label column_fun = label(column_key, column_fun) columns_agregate_func.append(column_fun) # add label return columns_agregate_func def _check_need_time_count(self): columns = { 'aCPU', 'uCPU', 'aEP', 'uEP', 'aVMem', 'uVMem', 'aPMem', 'uPMem', 'aNproc', 'uNproc', 'aIO', 'uIO', 'aIOPS', 'uIOPS', } return bool(columns & (set(self.show_columns) | {self.order_by} | set(self.by_usage or set()))) def _check_need_join(self): return self._check_need_time_count() and not self._is_multi_uids def select_query(self, columns_=None, need_join=False): """ :type need_join: bool """ if columns_ is None: columns_ = self._columns_query() if need_join: where_query = and_( self._where_time_period(table=self._table_alive), self._where_uid(uid=0, table=self._table_alive) ) else: where_query = and_(self._where_time_period(), self._where_uid()) if self.server_id: # add filtering by server id where_query = and_(where_query, self._where_server_id()) query = select(columns_).where(where_query) if need_join: _table_joined = self._table_alive.outerjoin( self.table, and_(self._table_alive.c.created == self.table.c.created, self._where_uid(uid=self.uid)) ) query = query.select_from(_table_joined) return query def main_query(self): columns_ = self._columns_query() query = self.select_query(columns_=columns_, need_join=self._check_need_join()) query = self._group_by_query(query) return query def _min_max_created(self): """ SELECT MIN(created) AS MinCreated, MAX(created) AS MaxCreated FROM lve_stats2_history WHERE id = <ID> AND created BETWEEN 'xxxx' AND 'yyyy' AND server_id = 'localhost'; """ where_query = and_(self._where_time_period(), self._where_uid(), self._where_server_id()) query = select([func.min(self.table.c.created), func.max(self.table.c.created)]).where(where_query) time_start = time.time() q = str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ') self.log.debug(q) data = self.dbengine.execute(query) self.log.debug('query time: %s', time.time() - time_start) return data.fetchall()[0] def proceed_dyn_time_unit(self): min_created, max_created = self._min_max_created() if max_created is None: # no data return self.result_corrector # we need manipulate with datetime data in local timezone period_from = gm_to_local(unixtimestamp_to_gm_datetime(min_created)) period_to = gm_to_local(unixtimestamp_to_gm_datetime(max_created)) time_unit_groups = dyn_time_unit_groups(period_from, period_to) rows = [] for _from, _to, _time_unit in reversed(time_unit_groups): # create instance copy for modify some attributes self_copy = copy.copy(self) self_copy.period_from = gm_datetime_to_unixtimestamp(local_to_gm(_from)) self_copy.period_to = gm_datetime_to_unixtimestamp(local_to_gm(_to)) - 1 self_copy.time_unit = _time_unit self_copy.limit = 0 rows.extend(self_copy.proceed()) return rows def proceed(self): # check and return some data without run sql query if self.uid == tuple() or self.uid == []: return [] if self.uid is not None and not isinstance(self.uid, (list, tuple)) and self.uid <= 0: return [] query = self.main_query() time_start = time.time() q = str(query.compile(compile_kwargs={"literal_binds": True})) self.log.debug(q.replace('\n', ' ')) conn = self.dbengine.connect() try: cursor = conn.execute(query) self.log.debug('query time: %s', time.time() - time_start) self.result_corrector.rows = cursor.fetchall() except LveStatsPluginTerminated as e: conn.close() raise LveStatsPluginTerminated() from e else: conn.close() return self.result_corrector def proceed_dict(self): return [dict(zip(self.show_columns, items_val)) for items_val in self.proceed()] class _HistoryShowX1(HistoryShow): def __init__(self, *args, **kwargs): HistoryShow.__init__(self, *args, **kwargs) if 'ID' not in self.show_columns: self.show_columns = ['ID'] + self.show_columns self._labels = [] # variable for control duplicated labels def _aggregate_fun_case(self, item): """ :type item: str """ if item == 'anyF': fun_ = [ self.table.c.mem_fault, self.table.c.memphy_fault, self.table.c.mep_fault, self.table.c.nproc_fault, self.table.c.cpu_fault, self.table.c.io_fault, self.table.c.iops_fault, ] elif item == 'ID': fun_ = label('id', self.table.c.id) elif item == 'From': fun_ = self.period_from elif item == 'To': fun_ = self.period_to else: column_name = FIELD_TO_TABLE_COLUMN[item] table_column = getattr(self.table.c, column_name) if item in (FIELD_AVERAGE + FIELD_LIMIT + FIELD_FAULT): fun_ = label(column_name, table_column) elif item in FIELD_USAGE: column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_USAGE.index(item)]] table_column_limit = label(column_name_limit, getattr(self.table.c, column_name_limit)) fun_ = [table_column, table_column_limit] elif item in FIELD_MAX: column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]] column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]] table_column_fault = label(column_name_fault, getattr(self.table.c, column_name_fault)) table_column_limit = label(column_name_limit, getattr(self.table.c, column_name_limit)) table_column_max = label(column_name + '_max', table_column) fun_ = [table_column, table_column_limit, table_column_fault, table_column_max] return fun_ def _columns_query(self): columns_agregate_func = [] show_columns = self.show_columns + (self.by_fault or []) if self.by_usage: for item in convert_to_list(self.by_usage): if item in FIELD_AVERAGE: index = FIELD_AVERAGE.index(item) show_columns.append(FIELD_USAGE[index]) else: index = FIELD_MAX.index(item) show_columns.extend([FIELD_FAULT[index], item]) if self.order_by: show_columns.append(self.order_by) for column_key in show_columns: column_fun = self._aggregate_fun_case(column_key) if isinstance(column_fun, list): for fun_ in column_fun: if hasattr(fun_, 'name') and fun_.name not in self._labels: # prevent alias duplication columns_agregate_func.append(fun_) self._labels.append(fun_.name) else: if hasattr(column_fun, 'name') and column_fun.name not in self._labels: # prevent alias duplication columns_agregate_func.append(column_fun) self._labels.append(column_fun.name) return columns_agregate_func class _HistoryShowX60(_HistoryShowX1): AGGREGATE_PERIOD = 60 * 60 def __init__(self, *args, **kwargs): _HistoryShowX1.__init__(self, *args, table=history_x60.__table__, **kwargs) # correct and rewrite time count and period self.period_from, self.period_to = self.get_history_x60_from_to() self.time_count = kwargs.get('time_count') or self.get_time_count() def get_time_count(self): if (self.period_from, self.period_to) == (None, None): return 0 return _HistoryShowX1.get_time_count(self) def get_history_x60_from_to(self): """ calculate present in aggregate table from and to time """ if self.period_to - self.period_from <= self.AGGREGATE_PERIOD: return None, None between_query = self.table.c.created.between(self.period_from + self.AGGREGATE_PERIOD, self.period_to) query = select([func.min(self.table.c.created), func.max(self.table.c.created)]).where( and_(between_query, self._where_server_id()) ) time_start = time.time() self.log.debug(str(query.compile(compile_kwargs={"literal_binds": True})).replace('\n', ' ')) result = self.dbengine.execute(query).fetchall()[0] self.log.debug('query time: %s', time.time() - time_start) create_min, create_max = result if create_max is not None: return create_min - self.AGGREGATE_PERIOD + 1, create_max # "+1" for exclude from timestamp else: return result # rewrite average function generating def _aggregate_fun_case(self, item): """ :type item: str """ if item in FIELD_AVERAGE: column_name = FIELD_TO_TABLE_COLUMN[item] table_column = getattr(self.table.c, column_name) return label(column_name, table_column * self.table.c.time) else: return _HistoryShowX1._aggregate_fun_case(self, item) class HistoryShowUnion(HistoryShow): """ Class for retrieve statistics data using two tables """ def __init__(self, *args, **kwargs): HistoryShow.__init__(self, *args, **kwargs) self._alias = LVE_STATS_2_TABLENAME_PREFIX + 'union' kwargs.update({"time_count": self.time_count}) self.x60 = _HistoryShowX60(*args, **kwargs) self.x1 = _HistoryShowX1(*args, **kwargs) self._need_union = self.x60.period_to is not None and self._is_multi_uids # detect need union tables if self._need_union: self.table = self._select_union_query() # rewrite '_aggregate_fun_case' for correct calculate maximum def _aggregate_fun_case(self, item): """ :type item: str """ if self._need_union and item in FIELD_MAX: column_name = FIELD_TO_TABLE_COLUMN[item] column_name_limit = FIELD_TO_TABLE_COLUMN[FIELD_LIMIT[FIELD_MAX.index(item)]] column_name_fault = FIELD_TO_TABLE_COLUMN[FIELD_FAULT[FIELD_MAX.index(item)]] column_limit = getattr(self.table.c, column_name_limit) column_fault = getattr(self.table.c, column_name_fault) column_max = getattr(self.table.c, column_name + '_max') fun_ = self._fun_max(column_max, column_limit, column_fault) return fun_ else: return HistoryShow._aggregate_fun_case(self, item) def _select_union_query(self): """ union two tables """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=sa_exc.SAWarning) union_query = self.x1.select_query().where( not_(self.x1.table.c.created.between(self.x60.period_from, self.x60.period_to)) ) union_query = union_query.union_all(self.x60.select_query()) union_query = alias(union_query, self._alias) return union_query def select_query(self, columns_=None, need_join=None): if self._need_union: return select(columns_) else: return HistoryShow.select_query(self, columns_=columns_, need_join=need_join) def get_supported_columns(lve_version=None, mode=None): """ preparation list columns depending of the lve version :type mode: Union[None, str] :type lve_version: Union[None, int] """ columns = [] if mode == 'v1': columns = [ 'aCPU', 'mCPU', 'lCPU', 'aEP', 'mEP', 'lEP', 'aVMem', 'mVMem', 'lVMem', 'VMemF', 'EPf', 'aPMem', 'mPMem', 'lPMem', 'aNproc', 'mNproc', 'lNproc', 'PMemF', 'NprocF', 'aIO', 'mIO', 'lIO', ] if lve_version is None or lve_version > 6: columns.extend(['aIOPS', 'mIOPS', 'lIOPS']) elif mode == 'v2': columns = [ 'aCPU', 'lCPU', 'CPUf', 'aEP', 'lEP', 'EPf', 'aVMem', 'lVMem', 'VMemF', 'aPMem', 'lPMem', 'PMemF', 'aNproc', 'lNproc', 'NprocF', 'aIO', 'lIO', 'IOf', ] if lve_version is None or lve_version > 6: columns.extend(['aIOPS', 'lIOPS', 'IOPSf']) elif mode is None: # show all columns, v1 and v2 columns = [ 'aCPU', 'uCPU', 'mCPU', 'lCPU', 'CPUf', 'aEP', 'uEP', 'mEP', 'lEP', 'EPf', 'aVMem', 'uVMem', 'mVMem', 'lVMem', 'VMemF', 'aPMem', 'uPMem', 'mPMem', 'lPMem', 'PMemF', 'aNproc', 'uNproc', 'mNproc', 'lNproc', 'NprocF', 'aIO', 'uIO', 'mIO', 'lIO', 'IOf', ] if lve_version is None or lve_version > 6: columns.extend(['aIOPS', 'mIOPS', 'uIOPS', 'lIOPS', 'IOPSf']) return columns