Part-san vérifie chaque jour l'état de fonctionnement des équipements de communication sur un écran dédié, crée un rapport et l'envoie par e-mail. Je pense qu'il est assez courant pour toute entreprise de faire un travail similaire, mais du point de vue d'un ingénieur, une telle routine est impossible, alors j'ai écrit un petit outil.
Nous savons que toutes les informations sous-jacentes se trouvent dans la base de données.
Par conséquent, vous pouvez implémenter un programme qui joint un fichier PDF généré à partir de la base de données à un e-mail et l'envoie ** côté serveur, et l'enregistre également dans cron
.
J'ai pensé à la sortie dans un fichier Excel avec le module ** OpenPyXL ** de Python, [Créer un CV au format PDF avec python + reportlab] Inspiré par cet article, j'ai décidé de sortir un fichier PDF avec le module ** ReportLab **.
Installation
pip3 install reportlab
Il existe différentes façons de se connecter de Python à MySQL, mais il se trouve que j'ai ** MySQLdb ** sur le serveur, alors je l'ai utilisé. Je le publierai sans trop de modifications, mais je pense que le schéma est raisonnablement polyvalent, alors veuillez vous y référer.
Python
"""
Générer un rapport au format PDF à partir de la base de données et l'envoyer en pièce jointe à un e-mail
"""
__author__ = "MindWood"
__version__ = "1.00"
__date__ = "31 Oct 2019"
import MySQLdb
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, portrait
from reportlab.lib.units import mm
from reportlab.platypus import Table, TableStyle
import smtplib
from email import encoders
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
import os
import datetime
def setup_page():
"""Enregistrez la police et définissez l'en-tête et le pied de page"""
pdfmetrics.registerFont(UnicodeCIDFont(font_name)) #Enregistrement des polices
#Dessin d'en-tête
c.setFont(font_name, 18)
c.drawString(10*mm, height - 15*mm, "Rapport d'état de fonctionnement de l'équipement de communication")
c.setFont(font_name, 9)
c.drawString(width - 58*mm, height - 10*mm, header_date)
#Dessin de pied de page
c.drawString(10*mm, 16*mm, "Les ID de terminal qui ont reçu des données au cours des 8 dernières heures sont affichés en vert.")
global page_count
c.drawString(width - 15*mm, 5*mm, "{}page".format(page_count)) #Dessiner le numéro de page
page_count += 1
def control_break():
"""Contrôle de la pause par nom du client"""
if ctrl_break_key == "": return
c.showPage()
setup_page()
def page_break(n):
"""Traitement des sauts de page"""
n += 1
if n < 28: return n
c.showPage()
setup_page()
return 0
#Réglage de la date et de l'heure de base
dt = datetime.datetime.now()
header_date = "{:%Y année%-m mois%-jour j%-H heure%-M minutes maintenant}".format(dt)
safe_date = "{:%Y/%m/%d %H:%M}".format(dt + datetime.timedelta(hours=-8)) #il y a 8 heures
#Initialisation du fichier PDF
pdf_filename = "report{:%y%m}.pdf".format(dt)
c = canvas.Canvas(pdf_filename, pagesize=portrait(A4)) #Nom du fichier PDF et format de papier
width, height = A4 #Obtenir la taille du papier
c.setAuthor("MindWood")
c.setTitle("IoT gateway Working report")
c.setSubject("")
font_name = "HeiseiKakuGo-W5" #Nom de la police
page_count = customer_no = 1
setup_page()
ctrl_break_key = ""
#Connectez-vous à MySQL
conn = MySQLdb.connect(host="localhost", user="xxxxxx", passwd="yyyyyy", db="zzzzzz", charset="utf8")
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
#Acquisition des informations de base des équipements de communication
cursor.execute('''
SELECT c.Name as CustName, d.Name as DeptName, a.Code, a.Area, hex(a.MacAddress) as MacAddress
FROM table0001 a
LEFT JOIN table0002 c ON a.CustomerID = c.CustomerID
LEFT JOIN table0003 d ON a.CustomerID = d.CustomerID AND a.DeptID = d.DeptID
ORDER BY c.CustomerID, d.DeptID, MacAddress;
''')
gws = cursor.fetchall()
for row_gw in gws:
#Saut de page lorsque le nom du client change
if ctrl_break_key != row_gw["CustName"]:
control_break()
ctrl_break_key = row_gw["CustName"]
c.setFont(font_name, 15)
c.drawString(10*mm, height - 36*mm, "{}. {}".format(customer_no, ctrl_break_key))
customer_no += 1
data = [ [ "Nom du département", "Code de gestion", "Zone d'installation", "Adresse Mac" ] ] #Titre du périphérique de communication
table = Table(data, colWidths=(70*mm, 40*mm, 40*mm, 40*mm), rowHeights=8*mm) #Créer une table
table.setStyle(TableStyle([
("FONT", (0, 0), (-1, -1), font_name, 11), #Police de caractère
("BOX", (0, 0), (-1, -1), 1, colors.black), #Bordure extérieure
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.black), #Bordure intérieure
("VALIGN", (0, 0), (-1, -1), "MIDDLE"), #Aligner les caractères verticalement au centre
("BACKGROUND", (0, 0), (-1, -1), colors.lightgrey), #Remplir de gris
]))
table.wrapOn(c, 10*mm, height - 50*mm) #Position de la table
table.drawOn(c, 10*mm, height - 50*mm) #Position de la table
line_count = 1
styles = [
("FONT", (0, 0), (-1, -1), font_name, 11),
("BOX", (0, 0), (-1, -1), 1, colors.black),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.black),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
]
MacAddress = row_gw["MacAddress"]
if os.uname()[1] == "ip-172-35-10-XX": #Pour un serveur spécifique
MacAddress = "XXXXXXXXXXXX"
styles.append(("BACKGROUND", (3, 0), (3, 0), colors.yellow)) #Remplissez de jaune
data = [ [ row_gw["DeptName"], row_gw["Code"], row_gw["Area"], MacAddress ] ] #Données détaillées de l'équipement de communication
table = Table(data, colWidths=(70*mm, 40*mm, 40*mm, 40*mm), rowHeights=8*mm)
table.setStyle(TableStyle(styles))
table.wrapOn(c, 10*mm, height - 50*mm - 8*mm * line_count)
table.drawOn(c, 10*mm, height - 50*mm - 8*mm * line_count)
line_count = page_break(line_count)
#Acquisition de la date et de l'heure de réception des données
cursor.execute('''
SELECT hex(TermID) as TermID, from_unixtime(min(Time),"%Y/%m/%d %H:%i:%S") as from_date, from_unixtime(max(Time),"%Y/%m/%d %H:%i:%S") as to_date FROM table0005
WHERE MacAddress=0x{} GROUP BY TermID ORDER BY to_date;
'''.format(MacAddress))
terms = cursor.fetchall()
for row_term in terms:
data = [ [ "période", row_term["from_date"] + " ~ " + row_term["to_date"], "ID du terminal", row_term["TermID"] ] ] #Données détaillées du terminal
table = Table(data, colWidths=(25*mm, 100*mm, 25*mm, 40*mm), rowHeights=8*mm)
styles = [
("FONT", (0, 0), (-1, -1), font_name, 11),
("BOX", (0, 0), (-1, -1), 1, colors.black),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.black),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("BACKGROUND", (0, 0), (0, 0), colors.lightgrey),
("BACKGROUND", (2, 0), (2, 0), colors.lightgrey),
]
if row_term["to_date"] > safe_date: #Si la date et l'heure de la dernière réception se situent dans les 8 dernières heures
styles.append(("BACKGROUND", (3, 0), (3, 0), colors.palegreen))
table.setStyle(TableStyle(styles))
table.wrapOn(c, 10*mm, height - 50*mm - 8*mm*line_count)
table.drawOn(c, 10*mm, height - 50*mm - 8*mm*line_count)
line_count = page_break(line_count)
#Déconnectez MySQL
cursor.close()
conn.close()
#Enregistrer dans un fichier PDF
c.showPage()
c.save()
#Envoyer le fichier PDF par e-mail
from_addr = "[email protected]"
to_addr = "[email protected]"
bcc_addr = "[email protected]" #Plusieurs spécifications peuvent être spécifiées séparées par des virgules
rcpt = bcc_addr.split(",") + [to_addr]
msg = MIMEMultipart()
msg["Subject"] = "Rapport d'état de fonctionnement de l'équipement de communication{:%Y année%-m mois}".format(dt)
msg["From"] = from_addr
msg["To"] = to_addr
msg.attach(MIMEText("""
Cet e-mail est automatiquement envoyé par le système.
L'état de fonctionnement du système du serveur XXXX est associé.
""".strip()))
attachment = MIMEBase("application", "pdf")
file = open(pdf_filename, "rb+")
attachment.set_payload(file.read())
file.close()
encoders.encode_base64(attachment)
attachment.add_header("Content-Disposition", "attachment", filename=pdf_filename)
msg.attach(attachment)
smtp = smtplib.SMTP("smtp.xxxxxx.com", 587) #Serveur SMTP
smtp.starttls()
smtp.login(from_addr, "PASSWORD") #mot de passe
smtp.sendmail(from_addr, rcpt, msg.as_string())
smtp.close()
Recommended Posts