/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
clwpos
/
Upload Filee
HOME
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import logging import os import signal import subprocess import time import json import socket import select import pwd import struct from typing import Optional, Dict, Tuple, List from contextlib import contextmanager from collections import namedtuple from threading import Lock, Thread from clwpos import gettext as _ from clcommon.cpapi import ( cpusers, get_domains_php_info, docroot, get_installed_php_versions, get_main_username_by_uid, cpinfo ) from clcommon.utils import ( get_process_pid, remove_pid_file, write_pid_file, is_litespeed_running ) from clwpos.daemon_redis_lib import ( is_user_redis_alive, kill_all_users_redises, reload_redis_for_user_thread, parse_redises, ) import clwpos.socket_utils as socket_utils from clwpos.daemon_base import WposDaemonBase from clwpos.constants import WPOS_DAEMON_SOCKET_FILE from clwpos.cl_wpos_exceptions import WposDaemonLockError from clwpos.daemon_subscription_handler import PendingSubscriptionWatcher User_data = namedtuple('User_data', ['redis_pid', 'lock', 'last_reload_time']) logger = logging.getLogger(__name__) @contextmanager def _umask_0(): """ Context manager for dropping umask """ prev = os.umask(0) yield os.umask(prev) def _create_socket() -> socket.socket: """ Create world-writable socket in given sock_location or reuse existing one :return: socket object """ with _umask_0(): sockobj = socket.socket(socket.AF_UNIX) sockobj.bind(WPOS_DAEMON_SOCKET_FILE) sockobj.listen() return sockobj @contextmanager def non_blocking_lock(lock: Lock): """ Non-blocking lock implementation for with statement """ if not lock.acquire(blocking=False): raise WposDaemonLockError(message=_("Can't acquire lock. May be it already acquired.")) try: yield lock finally: lock.release() def whmapi1(function: str, input_parameters: Optional[Dict[str, str]] = None): input_parameters_as_list = [f"{key}={value}" for key, value in input_parameters.items()] if input_parameters else [] result = subprocess.run(["/usr/sbin/whmapi1", "--output=json", function, *input_parameters_as_list], capture_output=True) return json.loads(result.stdout.decode())["data"] class WposDaemon(WposDaemonBase): """ AccelerateWP daemon main class. Socket operations, redis process control, etc """ _WPOS_DAEMON_RELOAD_FILE = "/var/lve/wpos_reload" _DAEMON_RELOAD_COMMAND = 'reload' _DAEMON_GET_REDIS_STATUS_COMMAND = 'get-redis-status' _DAEMON_GET_LITESPEED_STATUS_COMMAND = 'get-litespeed-status' DAEMON_PHP_GET_VHOST_VERSIONS_COMMAND = "php_get_vhost_versions" DAEMON_PHP_GET_INSTALLED_VERSIONS_COMMAND = "php_get_installed_versions" DAEMON_GET_UPGRADE_LINK_COMMAND = "get_upgrade_link" DAEMON_GET_UNIQUE_ID_COMMAND = "get_unique_id" DAEMON_SUITE_ALLOWED_CALLBACK = "suite_allowed_callback" DAEMON_REGISTER_UPGRADE_ATTEMPT = "daemon_register_upgrade" DAEMON_GET_UPGRADE_ATTEMPT_STATUS = "daemon_get_upgrade_status" DAEMON_GET_SUPPORTED_SUITES_BY_LICENSE = "daemon_get_supported_suites_by_license" _DAEMON_VALID_COMMANDS = [ _DAEMON_RELOAD_COMMAND, _DAEMON_GET_REDIS_STATUS_COMMAND, _DAEMON_GET_LITESPEED_STATUS_COMMAND, DAEMON_PHP_GET_VHOST_VERSIONS_COMMAND, DAEMON_PHP_GET_INSTALLED_VERSIONS_COMMAND, DAEMON_GET_UPGRADE_LINK_COMMAND, DAEMON_GET_UNIQUE_ID_COMMAND, DAEMON_REGISTER_UPGRADE_ATTEMPT, DAEMON_GET_UPGRADE_ATTEMPT_STATUS, DAEMON_SUITE_ALLOWED_CALLBACK, DAEMON_GET_SUPPORTED_SUITES_BY_LICENSE, ] _DAEMON_VALID_COMMANDS_FOR_ROOT = [ _DAEMON_RELOAD_COMMAND, DAEMON_GET_UPGRADE_LINK_COMMAND, DAEMON_GET_UNIQUE_ID_COMMAND, DAEMON_GET_UPGRADE_ATTEMPT_STATUS, DAEMON_SUITE_ALLOWED_CALLBACK, DAEMON_GET_SUPPORTED_SUITES_BY_LICENSE, ] # Minimum allowed time between "reload" commands from same user, sec _MIN_ALLOWED_RELOAD_PERIOD = 15 # Waiting redis socket presence timeout, sec _REDIS_SOCKET_WAIT_TIMEOUT = 20 def __init__(self): super().__init__() # Dictionary with user's redices PIDs for monitoring # Example: # {1001: User_data(redis_pid=136181, lock= < unlocked _thread.lock object at 0x7fbece922840 >, # last_reload_time = 0)} self._monitoring_users_dict: Dict[int, User_data] = {} self._username_list_to_process = [] self._socket: Optional[socket.socket] = None # User uid from socket to reload, 0 - no user to reload self._reload_uid = 0 self._socket_thread: Optional[Thread] = None self._socket_thread_work = True self._suite_watcher = PendingSubscriptionWatcher() def _reload_redis_for_user(self, user_uid: int, is_store_last_reload_time: bool, force_reload: str = 'no') -> Dict: """ Starts/stops redis for user and updates dict for monitoring (self._monitoring_users_dict) :param user_uid: uid to reload redis :param is_store_last_reload_time: True - write last reload time to user data, False - write 0 :return: dict {"result": "success"} / {"result": "error", "context": {}} """ # self._monitoring_users_dict example: # {1001: User_data(redis_pid=136181, lock= < unlocked _thread.lock object at 0x7fbece922840 >, # last_reload_time = 0)} try: userdata = self._monitoring_users_dict.get(user_uid) if userdata is None: old_redis_pid = None else: old_redis_pid = userdata.redis_pid username = pwd.getpwuid(user_uid).pw_name logger.info('Reloading redis for user: %s, force reload: %s', username, force_reload) new_redis_pid, result_dict = reload_redis_for_user_thread( username, old_redis_pid, force_reload) # check error if result_dict["result"] != "success": return result_dict if new_redis_pid: # Redis was started for user # last_reload_time == 0 disables reload interval check if not self._config.enable_reload_rate_limit or not is_store_last_reload_time: last_reload_time = 0 else: last_reload_time = time.time() self._monitoring_users_dict[user_uid] = User_data(redis_pid=new_redis_pid, lock=Lock(), last_reload_time=last_reload_time) else: # Redis was stopped for user if user_uid in self._monitoring_users_dict: del self._monitoring_users_dict[user_uid] except Exception as e: logger.exception(e) return {"result": str(e)} return {"result": "success"} def _user_monitoring(self, user_uid: int) -> bool: """ One user monitoring actions :param user_uid: uid to reload redis :return: User remove flag: True - user has stopped his redis (or user absent in system), remove it from monitoring list False - redis still active, should be monitored again """ user_data: User_data = self._monitoring_users_dict[user_uid] username = '' try: with non_blocking_lock(user_data.lock): is_redis_alive, is_user_present, _ = is_user_redis_alive(user_uid) if not is_user_present: return True if is_redis_alive: return False # Redis process is died, restart it username = pwd.getpwuid(user_uid).pw_name redis_pid, _ = reload_redis_for_user_thread( username, self._monitoring_users_dict[user_uid].redis_pid) if redis_pid is None: # User has stopped his redis return True # User's redis was successfully restarted self._monitoring_users_dict[user_uid] = User_data(redis_pid=redis_pid, lock=user_data.lock, last_reload_time=0) except WposDaemonLockError as e: # non_blocking_lock exception: User is working with his redis, skip monitoring actions logger.debug("Lock error: %s. User %s (uid=%d) is working with his redis, skip monitoring actions", e.message, username, user_uid) except Exception as e: logger.exception(e) return False def _redises_monitoring(self): """ Working redises monitoring """ user_ids_list_to_remove = [] # this list may change during iteration in another thread for user_id in list(self._monitoring_users_dict.keys()): is_remove = self._user_monitoring(user_id) if is_remove: user_ids_list_to_remove.append(user_id) # Remove absent users from monitoring list for user_id in user_ids_list_to_remove: if user_id in self._monitoring_users_dict: del self._monitoring_users_dict[user_id] def _reload_all_users(self): """ Process all requests - start/stop redis for users """ if self._reload_config_need: # if SIGHUP was happened - reload all users config (full reload) self._reload_config_need = False # Read all user .clwpos/clwpos_config.json data for username in cpusers(): try: user_id = pwd.getpwnam(username).pw_uid except KeyError: continue if user_id not in self._monitoring_users_dict: self._reload_redis_for_user(user_id, False) def _main_cycle(self): # Main daemon cycle i = 0 existing_redises = parse_redises() # type: List[Tuple[int, int]] logger.info('Found existing redises: %s', existing_redises) for item in existing_redises: self._monitoring_users_dict.update({item[0]: User_data(redis_pid=item[1], lock=Lock(), last_reload_time=0)}) while not self._is_terminate: try: # Self-reload if lvectl destroy was called self._force_reload() self._reload_all_users() if self._is_terminate: break time.sleep(1) i += 1 if i > self._config.monitoring_interval: # monitoring of existing redises self._redises_monitoring() i = 0 except Exception: logger.exception("Cloudlinux AccelerateWP daemon general error", exc_info=True) def _create_and_start_socket_thread(self): """ Create and start socket processing thread """ self._socket_thread = Thread(target=self._process_socket_connections) self._socket_thread.start() def run(self): """ Main work daemon function """ if get_process_pid(self._PID_FILENAME) is not None: # Error - daemon already works logger.warning("PID file %s existing. Cloudlinux AccelerateWP daemon already works?", self._PID_FILENAME) return if os.path.exists(WPOS_DAEMON_SOCKET_FILE): # Error - daemon's socket already exist, remove it self._remove_socket() # Create daemon's socket try: self._socket = _create_socket() except (OSError, IOError) as e: message = "Can't create AccelerateWP daemon socket. Error is: %s" # We used .warning instead .error because .error will send message to sentry logger.warning(message, str(e)) return logger.info("Cloudlinux AccelerateWP daemon started") self._setup_signals() write_pid_file(self._PID_FILENAME) # Force self-reload at start self.reload() self._create_and_start_socket_thread() # Main daemon_work self._main_cycle() # Stop socket thread self._socket_thread_work = False self._socket_thread.join() self._remove_socket() remove_pid_file(self._PID_FILENAME) logger.info("Cloudlinux AccelerateWP daemon terminated") def _force_reload(self): """ Force reload daemon """ # Force reload if flag file exists if os.path.isfile(self._WPOS_DAEMON_RELOAD_FILE): # self reload self.reload() try: os.remove(self._WPOS_DAEMON_RELOAD_FILE) except (OSError, IOError): pass def _remove_socket(self): """ Remove daemon's socket file """ try: logger.info("Removing socket %s", WPOS_DAEMON_SOCKET_FILE) if self._socket is not None: self._socket.close() os.remove(WPOS_DAEMON_SOCKET_FILE) logger.info("Socket %s removed", WPOS_DAEMON_SOCKET_FILE) except (OSError, IOError) as e: logger.warning("Can't remove socket %s. Error: %s", WPOS_DAEMON_SOCKET_FILE, str(e)) def stop(self, graceful: bool = False, timeout=15, interval=1): """ Stops a working daemon process """ pid = get_process_pid(self._PID_FILENAME) if pid: # Try killing the daemon process try: logger.info('Killing process with PID %s', pid) os.kill(pid, signal.SIGTERM) except OSError as e: logger.info('Process with pid %s is not possible to be killed: %s', pid, e) # give process some time to die i = 0 while i < timeout: try: os.kill(pid, 0) except OSError: logger.info('Process with pid %s is finally dead', pid) break i += interval time.sleep(interval) else: # still alive somehow? force kill logger.info('Process with pid %s did not exit in timeout, sigkilling it', pid) try: os.kill(pid, signal.SIGKILL) except OSError as e: logger.info('Process with pid %s is not possible to be killed: %s', pid, e) if graceful: # Terminate all user's redices kill_all_users_redises(logger) # Remove PID file remove_pid_file(self._PID_FILENAME) logger.info("Cloudlinux AccelerateWP daemon stopped") #################################### # Socket functions @staticmethod def _get_litespeed_status() -> dict: """ Get litespeed webserver status: running or not. :return: Dict to send to clwpos-user via socket """ return {'result': 'success', 'timestamp': time.time(), 'status': is_litespeed_running()} def _validate_socket_connection(self, client_socket_obj: socket.socket) -> Tuple[bool, int, Optional[str], Optional[dict], bool]: """ Validate socket connection. Check: - root connections - connection user presense - command validity :return: tuple(is_connection_valid, uid, username, user_request_dict, is_root_query) is_connection_valid: True - Socket connection valid, should be processed. uid, username and user_request_dict filled False - invalid connection, should be skipped. uid == -1, username and user_request_dict = None is_root_query - True - root query, do not check reload interval """ _uid = socket_utils.get_uid_from_socket(client_socket_obj) try: username = get_main_username_by_uid(_uid) except KeyError: socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {'result': 'No user uid=%(uid)d', 'context': {'uid': _uid}, 'timestamp': time.time()}) return False, -1, None, None, False try: user_request: Optional[dict] = socket_utils.read_unpack_response_from_socket_daemon(client_socket_obj) except (socket.error, socket.timeout, json.JSONDecodeError, struct.error, AttributeError, UnicodeDecodeError): user_request = None if user_request is None: # Undecodable request (invalid message format, bad json, etc) from user to socket socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {"result": "User %(username)s sent invalid query to " "CL AccelerateWP daemon", "context": {"username": username}}) return False, -1, None, None, False if 'command' not in user_request: # User sent invalid query socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {'result': 'Daemon received malformed query (' 'command absent)', 'timestamp': time.time()}) return False, -1, None, None, False if user_request['command'] not in self._DAEMON_VALID_COMMANDS: # User sent invalid query socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {'result': 'Daemon received invalid command', 'timestamp': time.time()}) return False, -1, None, None, False if _uid == 0: # Query from admin via clwpos-admin set-module --users=xxxx --disallowed --modules=object_cache if 'uid' not in user_request: socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {'result': 'Root request to daemon should ' 'contain uid', 'timestamp': time.time()}) return False, -1, None, None, False _uid = user_request['uid'] try: username = get_main_username_by_uid(_uid) except KeyError: socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {'result': 'No user uid=%(uid)d', 'context': {'uid': _uid}, 'timestamp': time.time()}) return False, -1, None, None, False if user_request['command'] not in self._DAEMON_VALID_COMMANDS_FOR_ROOT: # User sent invalid query socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {'result': 'Daemon received invalid command', 'timestamp': time.time()}) return False, -1, None, None, False # Valid query from root return True, _uid, username, user_request, True # Valid query from user return True, _uid, username, user_request, False def _socket_user_get_redis_status(self, client_socket_obj: socket.socket, username: str, uid: int): """ Get redis status for user (socket query) :param client_socket_obj: Client socket connection :param username: User name :param uid: User uid """ # Get redis status for user user_data: User_data = self._monitoring_users_dict.get(uid, None) if user_data is None: # User has no redis yet socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, {"result": "User %(username)s has no redis yet", "context": {"username": username}}) else: with user_data.lock: is_redis_alive, _, result_dict = is_user_redis_alive(uid) # check error if result_dict["result"] == "success": result_dict['status'] = is_redis_alive socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, result_dict) def _is_reload_available(self, last_reload_time: int) -> bool: """ Checks is reload available by checking time :return: True - available, False - not available """ return last_reload_time == 0 or time.time() - last_reload_time > self._MIN_ALLOWED_RELOAD_PERIOD def _socket_user_redis_reload(self, client_socket_obj: socket.socket, username: str, uid: int, is_root_query: bool, force_reload: str = 'no', skip_last_time_check: bool = False): """ Reload redis for user (socket query) :param client_socket_obj: Client socket connection :param username: User name :param uid: User uid :param is_root_query: True - reload redis by root, do not check reload interval False - user reload, check reload interval """ user_data: User_data = self._monitoring_users_dict.get(uid) if user_data is None: # User has no redis yet, reload. Lock object not available here. result_dict = self._reload_redis_for_user(uid, True, force_reload=force_reload) result_dict['timestamp'] = time.time() socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, result_dict) logger.debug("[Redis user data not found] " "Cloudlinux AccelerateWP daemon reloaded for user %s (uid=%d)", username, uid) return # User already has redis last_reload_time = user_data.last_reload_time if not skip_last_time_check and not is_root_query and not self._is_reload_available(last_reload_time): # Last reload was less then self._MIN_ALLOWED_RELOAD_PERIOD seconds # ago -- skip (possibly DDoS attack to socket?) result_dict = {'timestamp': time.time(), "result": "Can't reload redis for user %(user)s. Last reload was less than %(sec)s " "seconds ago", "context": {"user": username, "sec": self._MIN_ALLOWED_RELOAD_PERIOD}} socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, result_dict) return # No DDoS, reload with user_data.lock: result_dict = self._reload_redis_for_user(uid, True) result_dict['timestamp'] = time.time() socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, result_dict) logger.info("[Redis user data found] " "Cloudlinux AccelerateWP daemon reloaded for user %s (uid=%d)", username, uid) @staticmethod def _php_get_vhost_versions(account: str = None, logger=None, php_vhost_data=None) -> list: def _cast(handler_name: str, version_id: str) -> str: if handler_name == 'fpm': return 'php-fpm' elif 'x-httpd-lsphp' in version_id: return 'lsapi' # for DirectAdmin elif handler_name and 'lsphp' in handler_name: return 'lsapi' return handler_name vhosts_data = php_vhost_data or get_domains_php_info() if logger: logger.info("[PHP vhost info]: %s", str(vhosts_data)) if account is not None: vhosts_data = {key: value for key, value in vhosts_data.items() if value['username'] == account} result = [] for key, value in vhosts_data.items(): try: result.append( {'vhost': key, 'account': value['username'], 'version': value['display_version'], 'handler': _cast(value['handler_type'], value['php_version_id']), 'documentroot': docroot(key)[0]}) except Exception: # there are places where we use it as staticmethod if logger: logger.exception('Error on getting php version for %s, skip', key) continue return result def _socket_user_litespeed_status(self, client_socket_obj: socket.socket): """ Get litespeed status for user """ # Get litespeed status for user result: dict = self._get_litespeed_status() socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, result) def _socket_user_php_get_vhost_versions(self, client_socket_obj: socket.socket, account: str): try: vhost_data = self._php_get_vhost_versions(account, logger=logger) except Exception as e: response: dict = { "result": "Daemon cannot get vhosts data: %(reason)s", "context": str(e) } else: response: dict = { "result": "success", # it's necessary key-value combination "data": vhost_data } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def _socket_user_php_get_installed_versions(self, client_socket_obj: socket.socket): response: dict = { "result": "success", # it's necessary key-value combination "data": get_installed_php_versions() } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def _get_upgrade_link(self, client_socket_obj: socket.socket, username, feature): # another circular import # fix it with previous one # https://gerrit.cloudlinux.com/c/accelerate-wp/+/116499 from clwpos.utils import get_server_wide_options server_options = get_server_wide_options() response: dict = { "result": "success", "upgrade_url": server_options.get_upgrade_url_for_user( username=username, domain=cpinfo(username, keyls=('dns',))[0][0], feature=feature ) } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def _get_unique_id(self, client_socket_obj: socket.socket, username): """ Get unique identifier which we use as user's auth token """ from clwpos.billing import get_or_create_unique_identifier response: dict = { "result": "success", "unique_id": get_or_create_unique_identifier(username) } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def _get_supported_suites_by_license(self, client_socket_obj: socket.socket): from clwpos.utils import get_suites_status_from_license awp_premium_status, awp_cdn_status = get_suites_status_from_license() response: dict = { "result": "success", "accelerate_wp_premium": awp_premium_status, "accelerate_wp_cdn": awp_cdn_status } socket_utils.send_dict_to_socket_connection_and_close(client_socket_obj, response) def _handle_client_socket_connection(self, client_socket_obj: socket.socket) -> None: """ Process client's socket connection (Works in thread) :param client_socket_obj: Client socket connection """ try: is_connection_valid, uid, username, user_request, is_root_query = \ self._validate_socket_connection(client_socket_obj) if not is_connection_valid: return logger.info('Income user request=%s', str(user_request)) # User command processing # TODO replace this on decorator for method which handles command if user_request['command'] == self._DAEMON_GET_REDIS_STATUS_COMMAND: # Get user redis status self._socket_user_get_redis_status(client_socket_obj, username, uid) elif user_request['command'] == self._DAEMON_RELOAD_COMMAND: # Reload user's redis force_reload = user_request.get('force_reload', 'no') skip_last_time_check = user_request.get('skip_last_reload_time', 'no') == 'yes' self._socket_user_redis_reload(client_socket_obj, username, uid, is_root_query, force_reload, skip_last_time_check) elif user_request['command'] == self._DAEMON_GET_LITESPEED_STATUS_COMMAND: self._socket_user_litespeed_status(client_socket_obj) elif user_request["command"] == self.DAEMON_PHP_GET_VHOST_VERSIONS_COMMAND: self._socket_user_php_get_vhost_versions(client_socket_obj, username) elif user_request["command"] == self.DAEMON_PHP_GET_INSTALLED_VERSIONS_COMMAND: self._socket_user_php_get_installed_versions(client_socket_obj) elif user_request["command"] == self.DAEMON_GET_UPGRADE_LINK_COMMAND: self._get_upgrade_link(client_socket_obj, username, user_request.get('feature', 'object_cache')) elif user_request["command"] == self.DAEMON_GET_UNIQUE_ID_COMMAND: self._get_unique_id(client_socket_obj, username) elif user_request["command"] == self.DAEMON_SUITE_ALLOWED_CALLBACK: self._suite_watcher.suite_allowed_callback(client_socket_obj) elif user_request["command"] == self.DAEMON_REGISTER_UPGRADE_ATTEMPT: self._suite_watcher.add_pending_upgrade_task(client_socket_obj, user_request, uid) elif user_request["command"] == self.DAEMON_GET_UPGRADE_ATTEMPT_STATUS: self._suite_watcher.get_upgrade_task_status(client_socket_obj, uid, user_request.get('feature', 'object_cache')) elif user_request["command"] == self.DAEMON_GET_SUPPORTED_SUITES_BY_LICENSE: self._get_supported_suites_by_license(client_socket_obj) except Exception: logger.exception("Socket connection processing error", exc_info=True) def _process_socket_connections(self): """ Process socket connections (works in thread) """ logger.info("Cloudlinux AccelerateWP daemon socket thread started") while self._socket_thread_work: readable, _, _ = select.select([self._socket], [], [], 1) for _sock_object in readable: try: client_socket_obj, _ = _sock_object.accept() t = Thread(target=self._handle_client_socket_connection, args=(client_socket_obj,)) t.start() except socket.error: logger.exception("Socket connection error", exc_info=True) logger.info("Cloudlinux AccelerateWP daemon socket thread stopped")