Bonsoir, c'est mon premier post depuis longtemps. J'ai un peu de temps Avec un service commémoratif pour l'ouverture de l'application qui a coulé il y a environ un an J'essaierai de le refaire.
Cette fois, nous allons créer une application résidente qui surveille les dossiers. Si le fichier est simplement placé quelque part dans un dossier spécifié C'est aussi simple que de copier dans un dossier spécifié. Cela a pris du temps. .. ..
Windows 10 python-3.8.2 Bibliothèque utilisée: watchdog
Pour le moment, créez la classe WatchFileHandler suivante. Il s'agit de la fin de la surveillance. Tu l'as fait! <détails> <résumé> Premier </ résumé>
folder_watch.py
#Surveillance de la classe d'acquisition d'événements
class WacthFileHandler(FileSystemEventHandler):
def __init__(self, watch_path, copy_to_path, backup_path):
super(WacthFileHandler, self).__init__()
self.watch_path = watch_path
self.copy_to_path = copy_to_path
self.backup_path = backup_path
def on_moved(self, event):
"""
Détection de mouvement de fichier
:param event:
:return:
"""
file_path = event.src_path
file_name = os.path.basename(file_path)
def on_created(self, event):
"""
Détection de création de fichier
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
def on_modified(self, event):
"""
Détection de changement de fichier
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
def on_deleted(self, event):
"""
Détection de suppression de fichier
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
def watch_start(watch_path, copy_to_path, backup_path):
"""
Le processus de surveillance des dossiers a commencé
:param from_watch_path :Suivi du chemin du dossier
:param to_copy_path :Chemin du dossier de destination
:param backup_path :Enregistrer le chemin du dossier de destination
:return
"""
event_handler = WacthFileHandler(watch_path, copy_to_path, backup_path)
observer = Observer()
observer.schedule(event_handler, watch_path, recursive=True)
observer.start()
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
except Exception as e:
observer.stop()
raise e
finally:
# finaly =Dernier traitement indépendamment de l'occurrence d'une exception
observer.join()
Ensuite, nous surveillerons et examinerons le comportement lorsque le fichier arrivera.
Cette fois après avoir déplacé (copié) le fichier
Assurez-vous que le fichier d'origine correspond et supprimez le fichier d'origine.
Si vous ne pouvez pas le copier correctement, isolez-le.
C'est devenu une parade de méthode statique. .. ..
<détails>
folder_watch.py
def on_created(self, event):
"""
Détection de création de fichier
:param event:
:return:
"""
#Obtenir le nom du fichier
src_name = os.path.basename(event.src_path)
#Générer le chemin du dossier de la source de surveillance
src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
#copie(Bouge toi)Générer le chemin du dossier de destination
copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
#Générer un chemin de dossier de destination de sauvegarde
backup_link = pathlib.Path(self.backup_path)
try:
#Exécuter le traitement
self._run(src_path, copy_path, backup_link)
except TimeoutError as e:
#C'est trop grand!
pass
except Exception as e:
pass
def _run(self, src: Path, copy: Path, bk: Path):
"""
Copier, vérifier, supprimer lorsque le fichier est détecté/Bouge toi
:param src:
:param copy:
:return:
"""
#Attendez la fin du placement(Pour une certaine période de temps[600s]Attendre)
if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
raise TimeoutError
#Copiez le fichier placé
if not self._copy_to_file(src, copy):
return
#Obtenez le hachage de deux fichiers
src_hash = self._get_md5_hash(src)
copy_hash = self._get_md5_hash(copy)
if self._check_hash(src_hash, copy_hash):
#Matchs de hachage
#Supprimer le fichier d'origine
self._del_original_file(src)
else:
#Non-concordance de hachage
#Déplacer pour enregistrer la destination
self._move_original_file(bk)
def _copy_to_file(self, src, copy):
"""
Copiez le fichier placé dans le dossier spécifié
:param src:
:param copy_to:
:return:
"""
#S'il n'y a pas de fichier placé, aucun autre traitement ne sera effectué.
if not src.exists():
return False
#Métadonnées du fichier(Heure de création, heure de changement, etc.)Copie y compris
copy_link = shutil.copy2(src, copy, follow_symlinks=True)
#Vérifiez que le chemin que vous tentiez de copier correspond au chemin que vous avez copié
if copy != copy_link:
return False
if not copy.exists():
return False
return True
@staticmethod
def _wait_for_file_created_finished_linux(file_path, time_out):
"""
Non confirmé pour fonctionner sous Linux
Méthode de jugement de fin de création du fichier placé
URL de référence:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
:param file_path:
:param time_out:
:return:
"""
size_now = 0
size_past = -1
start = time.time()
while True:
size_now = os.path.getsize(file_path)
time.sleep(1)
elapsed_time = time.time() - start
if size_now == size_past and os.access(file_path, os.R_OK):
return True
else:
size_past = os.path.getsize(file_path)
if elapsed_time >= time_out:
return False
@staticmethod
def _wait_for_file_created_finished_windows(file_path: Path, time_out):
"""
Création du fichier placé(copie)Méthode de jugement d'achèvement
URL de référence:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
:param file_path:
:param time_out:
:return:
"""
start = time.time()
while True:
try:
elapsed_time = time.time() - start
new_path = str(file_path) + "_"
os.rename(file_path, new_path)
os.rename(new_path, file_path)
time.sleep(1)
return True
except OSError:
time.sleep(1)
if elapsed_time >= time_out:
return False
@staticmethod
def _get_md5_hash(file_path):
"""
Valeur de hachage du fichier md5(Format hexagonal)Avoir
:param file_path:
:return:
"""
with open(file_path, 'rb') as file:
binary_data = file.read()
#Obtenir la valeur de hachage au format hexadécimal
md5 = hashlib.md5(binary_data).hexdigest()
return md5
@staticmethod
def _check_hash(src_hash, target_hash):
"""
Comparez deux hachages
:param src_hash:
:param target_hash:
:return:
"""
return src_hash == target_hash
@staticmethod
def _del_original_file( src):
"""
Supprimer le fichier source de la copie
:param src:
:return:
"""
os.remove(src)
@staticmethod
def _move_original_file(src_path, move_path):
"""
Déplacer le fichier source de la copie(Évacuation)
:return:
"""
shutil.move(src_path, move_path)
<détails>
folder_watch.py
def interpret_args():
"""
Méthode d'interprétation des arguments au moment de l'exécution
:return:Arguments d'exécution
"""
#Création d'objets
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
#Réglage de l'argument
#Suivi du chemin du dossier(Obligatoire)
parser.add_argument("-w", "--watch_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is watch folder path'''), type=str)
#Copier le chemin du dossier de destination(Obligatoire)
parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is copy to folder path'''), type=str)
#Enregistrer le chemin du dossier de destination(Obligatoire)
parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is backup to folder path'''), type=str)
#Renvoyer le résultat
return parser.parse_args()
def check_args(args):
"""
Méthode de jugement de l'argument d'exécution
:param args:
:return: True or False
"""
#Erreur si le chemin du dossier surveillé n'est pas spécifié
if not hasattr(args, 'watch_path') and args.watch_path is None:
raise argparse.ArgumentError('Je ne spécifie pas de dossier de surveillance!')
#Erreur si le chemin du dossier de destination n'est pas spécifié
if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination!')
#Erreur si le chemin du dossier de destination d'enregistrement n'est pas spécifié
if not hasattr(args, 'backup_path') and args.backup_path is None:
raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination de sauvegarde!')
#Génération d'objets pour chaque chemin
watch_path = pathlib.Path(args.watch_path)
copy_to_path = pathlib.Path(args.copy_to_path)
backup_path = pathlib.Path(args.backup_path)
#Vérifiez si le chemin du dossier de surveillance existe
if not watch_path.exists():
raise FileNotFoundError('Il n'y a pas de dossier de surveillance!')
#Vérifiez si le dossier de surveillance est un répertoire
if not watch_path.is_dir():
raise TypeError('Le dossier de surveillance spécifié n'est pas un dossier!')
#Vérifiez si le chemin du dossier de destination existe
if not copy_to_path.exists():
raise FileNotFoundError('Il n'y a pas de dossier de destination!')
#Vérifiez si le dossier de destination est un répertoire
if not copy_to_path.is_dir():
raise TypeError('Le dossier de destination spécifié n'est pas un dossier!')
#Vérifiez si le chemin du dossier de destination existe
if not backup_path.exists():
raise FileNotFoundError('Il n'y a pas de dossier de destination de sauvegarde!')
#Vérifiez si le dossier de destination est un répertoire
if not backup_path.is_dir():
raise TypeError('Le dossier de destination d'enregistrement spécifié n'est pas un dossier!')
#Traitement d'exécution
if __name__ == '__main__':
try:
#Interprétation des arguments,Jugement
args = interpret_args()
#Vérification des arguments
check_args(args)
#Suivi de l'exécution
watch_start(args.watch_path, args.copy_to_path)
except argparse.ArgumentError as e:
pass
except FileNotFoundError as e:
pass
except TypeError as e:
pass
except Exception as e:
pass
À ce rythme, je ne sais pas ce que j'ai déménagé ou ce qui est arrivé, alors je vais préparer un journal.
Pour le moment, la source est presque complète.
<détails>
folder_watch.py
# -*- coding: utf-8 -*-
import os
import time
import sys
import logging
import hashlib
import argparse
import textwrap
import pathlib
from pathlib import Path
import shutil
from datetime import datetime
from watchdog.observers import Observer
from logging.handlers import TimedRotatingFileHandler
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
try:
import codecs
except ImportError:
codecs = None
class MyTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
"""
Gestionnaire de fichiers pour la classe de sortie du journal de date
"""
def __init__(self, dir_log):
self.dir_log = dir_log
filename = self.dir_log + time.strftime("%Y%m%d") + ".log" # dir_log here MUST be with os.sep on the end
logging.handlers.TimedRotatingFileHandler.__init__(self, filename, when='midnight', interval=1, backupCount=0,
encoding=None)
def doRollover(self):
"""
TimedRotatingFileHandler remix - rotates logs on daily basis, and filename of current logfile is time.strftime("%m%d%Y")+".txt" always
"""
self.stream.close()
# get the time that this sequence started at and make it a TimeTuple
t = self.rolloverAt - self.interval
timeTuple = time.localtime(t)
self.baseFilename = self.dir_log + time.strftime("%Y%m%d") + ".log"
if self.encoding:
self.stream = codecs.open(self.baseFilename, 'w', self.encoding)
else:
self.stream = open(self.baseFilename, 'w')
self.rolloverAt = self.rolloverAt + self.interval
#Surveillance de la classe d'acquisition d'événements
class WacthFileHandler(FileSystemEventHandler):
def __init__(self, watch_path, copy_to_path, backup_path):
super(WacthFileHandler, self).__init__()
self.watch_path = watch_path
self.copy_to_path = copy_to_path
self.backup_path = backup_path
def on_moved(self, event):
"""
Détection de mouvement de fichier
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
logger.info(f'{src_name}A bougé')
def on_created(self, event):
"""
Détection de création de fichier
:param event:
:return:
"""
#Obtenir le nom du fichier
src_name = os.path.basename(event.src_path)
logger.info(f'{src_name}A été fait')
#Générer le chemin du dossier de la source de surveillance
src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
#copie(Bouge toi)Générer le chemin du dossier de destination
copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
#Générer un chemin de dossier de destination de sauvegarde
backup_link = pathlib.Path(self.backup_path)
try:
#Exécuter le traitement
self._run(src_path, copy_path, backup_link)
except TimeoutError as e:
#C'est trop grand!
logger.error('C'est trop grand!')
logger.error(e)
except Exception as e:
logger.error(e)
def on_modified(self, event):
"""
Détection de changement de fichier
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
logger.info(f'{src_name}Modifié')
def on_deleted(self, event):
"""
Détection de suppression de fichier
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
logger.info(f'{src_name}Supprimé s')
def _run(self, src: Path, copy: Path, bk: Path):
"""
Copier, vérifier, supprimer lorsque le fichier est détecté/Bouge toi
:param src:
:param copy:
:return:
"""
#Attendez la fin du placement(Pour une certaine période de temps[600s]Attendre)
if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
raise TimeoutError
#Copiez le fichier placé
if not self._copy_to_file(src, copy):
return
#Obtenez le hachage de deux fichiers
src_hash = self._get_md5_hash(src)
copy_hash = self._get_md5_hash(copy)
if self._check_hash(src_hash, copy_hash):
#Matchs de hachage
#Supprimer le fichier d'origine
self._del_original_file(src)
else:
#Non-concordance de hachage
#Déplacer pour enregistrer la destination
self._move_original_file(bk)
def _copy_to_file(self, src, copy):
"""
Copiez le fichier placé dans le dossier spécifié
:param src:
:param copy_to:
:return:
"""
#S'il n'y a pas de fichier placé, aucun autre traitement ne sera effectué.
if not src.exists():
return False
#Métadonnées du fichier(Heure de création, heure de changement, etc.)Copie y compris
copy_link = shutil.copy2(src, copy, follow_symlinks=True)
#Vérifiez que le chemin que vous tentiez de copier correspond au chemin que vous avez copié
if copy != copy_link:
return False
if not copy.exists():
return False
return True
@staticmethod
def _wait_for_file_created_finished_linux(file_path, time_out):
"""
Non confirmé pour fonctionner sous Linux
Méthode de jugement de fin de création du fichier placé
URL de référence:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
:param file_path:
:param time_out:
:return:
"""
size_now = 0
size_past = -1
start = time.time()
while True:
size_now = os.path.getsize(file_path)
time.sleep(1)
elapsed_time = time.time() - start
logger.info(f"size_now: {size_now}")
logger.info(f"size_past: {size_past}")
if size_now == size_past and os.access(file_path, os.R_OK):
logger.info("file has copied completely now size: %s", size_now)
return True
else:
size_past = os.path.getsize(file_path)
if elapsed_time >= time_out:
logger.info('time out error')
return False
@staticmethod
def _wait_for_file_created_finished_windows(file_path: Path, time_out):
"""
Création du fichier placé(copie)Méthode de jugement d'achèvement
URL de référence:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
:param file_path:
:param time_out:
:return:
"""
start = time.time()
while True:
try:
elapsed_time = time.time() - start
new_path = str(file_path) + "_"
os.rename(file_path, new_path)
os.rename(new_path, file_path)
time.sleep(1)
logger.info('file copy...')
return True
except OSError:
time.sleep(1)
if elapsed_time >= time_out:
logger.info('time out error')
return False
@staticmethod
def _get_md5_hash(file_path):
"""
Valeur de hachage du fichier md5(Format hexagonal)Avoir
:param file_path:
:return:
"""
with open(file_path, 'rb') as file:
binary_data = file.read()
#Obtenir la valeur de hachage au format hexadécimal
md5 = hashlib.md5(binary_data).hexdigest()
logger.info(f'Fichier:{file_path} -Valeur de hachage- {md5}')
return md5
@staticmethod
def _check_hash(src_hash, target_hash):
"""
Comparez deux hachages
:param src_hash:
:param target_hash:
:return:
"""
return src_hash == target_hash
@staticmethod
def _del_original_file( src):
"""
Supprimer le fichier source de la copie
:param src:
:return:
"""
os.remove(src)
@staticmethod
def _move_original_file(src_path, move_path):
"""
Déplacer le fichier source de la copie(Évacuation)
:return:
"""
shutil.move(src_path, move_path)
def watch_start(from_watch_path, to_copy_path, backup_path):
"""
Le processus de surveillance des dossiers a commencé
:param from_watch_path :Suivi du chemin du dossier
:param to_copy_path :Chemin du dossier de destination
:param backup_path :Enregistrer le chemin du dossier de destination
:return:
"""
event_handler = WacthFileHandler(from_watch_path, to_copy_path, backup_path)
observer = Observer()
observer.schedule(event_handler, from_watch_path, recursive=True)
logger.info(f'Démarrage de la surveillance des dossiers')
observer.start()
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
except Exception as e:
observer.stop()
raise e
finally:
# finaly =Dernier traitement indépendamment de l'occurrence d'une exception
logger.info(f'Surveillance des dossiers terminée')
observer.join()
def make_log_folder():
"""
Créer s'il n'y a pas de dossier de journaux au démarrage
:return:
"""
p = pathlib.Path(sys.argv[0])
p2 = pathlib.Path(p.parent) / pathlib.Path('logs')
if not p2.exists():
os.makedirs(str(p2))
def interpret_args():
"""
Méthode d'interprétation des arguments au moment de l'exécution
:return:Arguments d'exécution
"""
#Création d'objets
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
#Réglage de l'argument
#Suivi du chemin du dossier(Obligatoire)
parser.add_argument("-wp", "--watch_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is watch folder path'''), type=str)
#Copier le chemin du dossier de destination(Obligatoire)
parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is copy to folder path'''), type=str)
#Enregistrer le chemin du dossier de destination(Obligatoire)
parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is backup folder path'''), type=str)
#Renvoyer le résultat
return parser.parse_args()
def check_args(args):
"""
Méthode de jugement de l'argument d'exécution
:param args:
:return: True or False
"""
#Erreur si le chemin du dossier surveillé n'est pas spécifié
if not hasattr(args, 'watch_path') and args.watch_path is None:
raise argparse.ArgumentError('Je ne spécifie pas de dossier de surveillance!')
#Erreur si le chemin du dossier de destination n'est pas spécifié
if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination!')
#Erreur si le chemin du dossier de destination d'enregistrement n'est pas spécifié
if not hasattr(args, 'backup_path') and args.backup_path is None:
raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination de sauvegarde!')
#Génération d'objets pour chaque chemin
watch_path = pathlib.Path(args.watch_path)
copy_to_path = pathlib.Path(args.copy_to_path)
backup_path = pathlib.Path(args.backup_path)
#Vérifiez si le chemin du dossier de surveillance existe
if not watch_path.exists():
raise FileNotFoundError('Il n'y a pas de dossier de surveillance!')
#Vérifiez si le dossier de surveillance est un répertoire
if not watch_path.is_dir():
raise TypeError('Le dossier de surveillance spécifié n'est pas un dossier!')
#Vérifiez si le chemin du dossier de destination existe
if not copy_to_path.exists():
raise FileNotFoundError('Il n'y a pas de dossier de destination!')
#Vérifiez si le dossier de destination est un répertoire
if not copy_to_path.is_dir():
raise TypeError('Le dossier de destination spécifié n'est pas un dossier!')
#Vérifiez si le chemin du dossier de destination existe
if not backup_path.exists():
raise FileNotFoundError('Il n'y a pas de dossier de destination de sauvegarde!')
#Vérifiez si le dossier de destination est un répertoire
if not backup_path.is_dir():
raise TypeError('Le dossier de destination d'enregistrement spécifié n'est pas un dossier!')
#Traitement d'exécution
if __name__ == '__main__':
#Créer s'il n'y a pas de dossier de journal(* La journalisation ne crée pas de dossiers.)
make_log_folder()
#Paramètres de journalisation
# get the root logger
root_logger = logging.getLogger()
# set overall level to debug, default is warning for root logger
root_logger.setLevel(logging.DEBUG)
# setup logging to file, rotating at midnight
file_log = MyTimedRotatingFileHandler(f'./logs/log_')
file_log.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)ligne d] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
file_log.setFormatter(file_formatter)
root_logger.addHandler(file_log)
# setup logging to console
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)ligne d] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
console.setFormatter(formatter)
root_logger.addHandler(console)
# get a logger for my script
logger = logging.getLogger(__name__)
try:
#Interprétation des arguments,Jugement
args = interpret_args()
#Vérification des arguments
check_args(args)
#Suivi de l'exécution
watch_start(args.watch_path, args.copy_to_path, args.backup_path)
except argparse.ArgumentError as e:
logger.error(e)
except FileNotFoundError as e:
logger.error(e)
except TypeError as e:
logger.error(e)
except Exception as e:
logger.error(e)
C'est difficile à exécuter tel quel Créez une chauve-souris comme ↓ comme d'habitude.
execute.bat
@echo off
setlocal
Faire de l'emplacement où se trouve le script rem le répertoire actuel
cd /d %~dp0
SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
dossier de surveillance rem
SET WATCH_PATH=.\test\from
rem copier le dossier de destination
SET COPY_PATH=.\test\to
dossier de destination de sauvegarde rem
SET BK_PATH=.\test\bk
exécution de la surveillance du dossier rem
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -wp %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
Je me demande si c'est la fin si je le mets correctement dans le planificateur de tâches Je me posais la question, mais cela semble être le cas. Cette application est résidente et j'aimerais que vous la lanciez immédiatement même si elle tombe. De plus, si vous utilisez le planificateur de tâches, de nombreux processus seront lancés. .. .. J'ai donc fait la chauve-souris et la source comme suit.
execute.bat
@echo off
setlocal
Faire de l'emplacement où se trouve le script rem le répertoire actuel
cd /d %~dp0
SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
dossier de surveillance rem
SET WATCH_PATH=.\test\from
rem copier le dossier de destination
SET COPY_PATH=.\test\to
dossier de destination de sauvegarde rem
SET BK_PATH=.\test\bk
Rem Dossier d'écriture PID
SET PID_FILD=pid
Exécuter immédiatement s'il n'y a pas de fichier pid rem
IF EXIST %PID_FILD% (GOTO FILE_TRUE) ELSE GOTO FILE_FALSE
Le fichier pid rem existe déjà
:FILE_TRUE
Obtenir le numéro pid du fichier rem
SET /p PID_VAL=<pid
indicateur de présence rem pid(true=1, false=0)
SET IS_EXIST=0
nom de l'image rem:"python.exe"Rechercher le pid avec(Seule la partie numérique)
for /F "usebackq tokens=2" %%a in (
`tasklist /fi "IMAGENAME eq python.exe" ^| findstr "[0-9]"`) do (
rem ECHO %%a
if %%a==%PID_VAL% SET IS_EXIST=1
)
rem ECHO %PID_VAL%
rem ECHO %IS_EXIST%
rem il y a un match=Ne rien faire car il fonctionne déjà
rem Il n'y a pas de correspondance=Exécution du script car il n'est pas démarré
IF %IS_EXIST%==1 (GOTO EOF) ELSE (GOTO APPT_START)
Si le fichier pid rem n'existe pas
:FILE_FALSE
GOTO APPT_START
exécution de la surveillance du dossier rem
:APPT_START
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -w %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
GOTO EOF
rem fin
:EOF
rem pause
folder_watch.py
try:
#Ajoutez ici!
with open('pid', mode='w') as f:
logger.info(f'pid = [{str(os.getpid())}]')
f.write(str(os.getpid()))
#Interprétation des arguments,Jugement
args = interpret_args()
#Vérification des arguments
check_args(args)
#Suivi de l'exécution
watch_start(args.watch_path, args.copy_to_path, args.backup_path)
except argparse.ArgumentError as e:
logger.error(e)
except FileNotFoundError as e:
logger.error(e)
except TypeError as e:
logger.error(e)
except Exception as e:
logger.error(e)
Enregistrez le pid dans un fichier au démarrage côté python C'est un processus qui ne démarre pas si le contenu du fichier et le pid actuel correspondent du côté du lot. Si vous démarrez le planificateur de tâches à des intervalles d'une minute, vous devriez pouvoir y faire quelque chose! C'est donc la fin de ce temps. -Il ne prend pas en charge la copie de dossiers ・ Le traitement parallèle est meilleur -Il y a eu une Comparaison de fichiers en python sans utiliser MD5. Il y a plusieurs trous, mais j'y repenserai si j'en ai envie. Le journal est également assez approprié. .. .. Nous n'avons pas tellement confirmé l'opération, veuillez donc l'utiliser comme référence uniquement. Github peut être trouvé à ici
Le point dans lequel je suis resté coincé était lorsque le chien de garde a détecté la création (déplacement / copie) d'un fichier dans un dossier. L'événement allait se dérouler lorsqu'il n'était pas encore terminé. Lorsqu'un gros fichier arrive, la copie n'est pas encore terminée J'obtiens une erreur en essayant de le déplacer. Dans un premier temps, le processus de jugement a été fait en se référant à cet article. Apparemment, dans le cas des fenêtres, cela ne fonctionne pas comme prévu et il semble que la taille du fichier soit acquise même pendant la copie. (Est-ce que ce sera le mouvement que vous attendiez pour Linux?)
Quand j'ai eu des problèmes, j'ai trouvé le [prochain article](https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size- while-copying).
La réponse au processus de jugement de copie dans Windows a été écrite. Merci stackoverflow!
C'est un processus qui renomme si la copie (déplacement) est en cours et détermine qu'il est toujours copié (déplacé) si une erreur se produit.
Je me demande s'il y avait un tel moyen.
Cependant, le goulot d'étranglement est que le coût est élevé car c'est parce qu'une exception est générée.
Si quelqu'un connaît une autre méthode, apprenez-moi s'il vous plaît.
<détails>
folder_watch.py
@staticmethod
def _wait_for_file_created_finished_linux(file_path, time_out):
"""
Non confirmé pour fonctionner sous Linux
Méthode de jugement de fin de création du fichier placé
URL de référence:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
:param file_path:
:param time_out:
:return:
"""
size_now = 0
size_past = -1
start = time.time()
while True:
size_now = os.path.getsize(file_path)
time.sleep(1)
elapsed_time = time.time() - start
if size_now == size_past and os.access(file_path, os.R_OK):
return True
else:
size_past = os.path.getsize(file_path)
if elapsed_time >= time_out:
return False
@staticmethod
def _wait_for_file_created_finished_windows(file_path: Path, time_out):
"""
Création du fichier placé(copie)Méthode de jugement d'achèvement
URL de référence:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
:param file_path:
:param time_out:
:return:
"""
start = time.time()
while True:
try:
elapsed_time = time.time() - start
new_path = str(file_path) + "_"
os.rename(file_path, new_path)
os.rename(new_path, file_path)
time.sleep(1)
return True
except OSError:
time.sleep(1)
if elapsed_time >= time_out:
return False
https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying Merci pour l'URL ci-dessus.
Recommended Posts