/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
xray
/
continuous
/
Upload Filee
HOME
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import os import smtplib import subprocess from configparser import ConfigParser, SectionProxy from email.message import EmailMessage from socket import gethostname from typing import Optional from xray import gettext as _ from ..internal.constants import mail_template_location, mail_scripts_location from ..internal.exceptions import XRayMailerError class Mailer: """ Class contains X-Ray e-mail send logic """ def __init__(self): self.logger = logging.getLogger('mailer') self._sender = None @property def mail_server(self) -> tuple: """ Local mail server address """ return ('localhost', ) @property def sender(self) -> str: """ Retrieve 'From' mail address if it is not already set """ if self._sender is None: self._sender = self.retrieve_mail_sender() return self._sender def retrieve_mail_sender(self) -> str: """ 'From' address (control panel admin or dummy one) """ dummy_mail = f"xray.continuous@{gethostname()}" admin_mail = self.admin_email() return admin_mail if admin_mail is not None else dummy_mail def admin_email(self) -> Optional[str]: """ Try to retrieve control panel admin e-mail """ panel = self.get_control_panel() if panel is not None: get_email_script = f'{mail_scripts_location}/{panel}_email' try: p = subprocess.run([get_email_script], capture_output=True, text=True, check=True) return p.stdout.strip() except subprocess.CalledProcessError as e: self.logger.error('% script failed with: %s', get_email_script, str(e)) except (OSError, ValueError, subprocess.SubprocessError) as e: self.logger.error('Failed to run script %s with: %s', get_email_script, str(e)) def get_control_panel(self) -> Optional[str]: """ Get control panel name """ try: return subprocess.run( ['cldetect', '--detect-cp-name'], check=True, text=True, capture_output=True).stdout.strip() except (subprocess.CalledProcessError, AttributeError) as e: self.logger.error('cldetect utility failed with %s', str(e)) except (OSError, ValueError, subprocess.SubprocessError) as e: self.logger.error('Failed to run cldetect utility with %s', str(e)) @staticmethod def read_template(name: str = 'greeting') -> SectionProxy: """ Get preformatted data for e-mail by name of template """ tmpl = f'{mail_template_location}/{name}.ini' if os.path.exists(tmpl): config = ConfigParser(interpolation=None) config.read(tmpl) return config['data'] raise XRayMailerError(_('Failed to find template {} in {}'.format(name, mail_template_location))) def _smtp_send(self, message: EmailMessage) -> None: """ Send preformatted e-mail via localhost SMTP """ self.logger.info('Try to send via smtp') try: with smtplib.SMTP(*self.mail_server) as server: result = server.send_message(message) self.logger.info('Send result: %s', result) except smtplib.SMTPException as e: raise XRayMailerError(f'smtp mailing failed: {str(e)}') except (ConnectionError, OSError) as e: raise XRayMailerError(_('smtp connection failed: %s') % str(e)) def _console_send(self, message: EmailMessage) -> None: """ Send preformatted e-mail via sendmail utility """ self.logger.info('Try to send via sendmail utility') cmd = ["/usr/sbin/sendmail", "-t", "-oi"] try: subprocess.run(cmd, input=message.as_string(), capture_output=True, text=True, check=True) except (OSError, subprocess.CalledProcessError) as e: raise XRayMailerError(_('sendmail utility failed with %s') % str(e)) def _send(self, mail: EmailMessage) -> None: """ Try to send mail via localhost smtp server, if fails -- try to use sendmail utility """ try: self._smtp_send(mail) except XRayMailerError as e: self.logger.error(str(e)) try: self._console_send(mail) except XRayMailerError as e: self.logger.error(str(e)) self.logger.critical( 'Both smtp and sendmail failed to send message to %s', mail['To']) def send_mail(self, recipient: str, template: str = 'greeting', **kwargs) -> None: data = self.read_template(template) msg = EmailMessage() msg['Subject'] = data['subject'] msg['From'] = self.sender msg['To'] = recipient msg.set_content(data['text'] % kwargs) msg.add_alternative(data['html'] % kwargs, subtype='html') self.logger.info('Generated mail --> %s', msg.as_string()) self._send(msg)