/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
lib
/
bursting
/
Upload Filee
HOME
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import datetime import sqlalchemy as sa from lvestats.orm import bursting_events_table class TableDoesNotExistError(Exception): def __init__(self, table_name): self.message = f'Table "{table_name}" does not exist in the database' super().__init__(self.message) class HistoryShowBursting: def __init__(self, dbengine: sa.engine.base.Engine, period_from: datetime.datetime, period_to: datetime.datetime, uid: int | None = None, server_id: str = 'localhost') -> None: self.dbengine = dbengine self.period_from = period_from self.period_to = period_to self.uid = uid self.server_id = server_id def get(self) -> list[sa.engine.RowProxy]: """ Get history from the 'bursting_events' table. Retrieving records within the required time frame, along with one record preceding this time frame to detect the bursting status at the start of the time frame. """ # Since bursting limits functionality isn't enabled by default, # we need to check if the table exists first inspector = sa.inspect(self.dbengine) if bursting_events_table.name not in inspector.get_table_names(): raise TableDoesNotExistError(bursting_events_table.name) ts_from = self.period_from.timestamp() ts_to = self.period_to.timestamp() # Get the rows with timestamp between ts_from and ts_to stmt1 = sa.select([ bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]).where( sa.and_( bursting_events_table.c.server_id == self.server_id, bursting_events_table.c.timestamp >= ts_from, bursting_events_table.c.timestamp <= ts_to, # Add lve_id condition if it's specified (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True, ) ) # Subquery to get the maximum timestamp for each lve_id where timestamp < ts_from subquery = sa.select([ bursting_events_table.c.lve_id, sa.func.max(bursting_events_table.c.timestamp).label('max_timestamp'), ]).where( sa.and_( bursting_events_table.c.server_id == self.server_id, bursting_events_table.c.timestamp < ts_from, # Add lve_id condition if it's specified (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True, ) ).group_by( bursting_events_table.c.lve_id, ).alias('subquery') # Get the row with the maximum timestamp less than ts_from stmt2 = sa.select([ bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]).select_from( bursting_events_table.join( subquery, sa.and_( bursting_events_table.c.lve_id == subquery.c.lve_id, bursting_events_table.c.timestamp == subquery.c.max_timestamp, ) ) ) stmt = sa.union( stmt1, stmt2, ) stmt = stmt.order_by( stmt.c.lve_id, stmt.c.timestamp, ) with self.dbengine.connect() as connection: result = connection.execute(stmt).fetchall() return result