$ python3 -m venv test
$ source test/bin/activate
(test)$
(test)$ deactivate
$
s = "2019-06-01"
print(f"{s[0:4]}-{s[5:7]}-{s[8:10]}")
--Echappez les crochets ondulés avec des crochets ondulés.
var = 'aiuto'
print( f"val is {{{var}}}" )
import os
os.makidirs('tmp', exist_ok=True)
class
classsample
├── main.py
└── prop
└── user_property.py
main.py
from prop.user_property import UserProperty
user_property = UserProperty({'first_name': 'Ichiro', 'family_name': 'tester'})
print(f'{user_property.FAMILY_NAME} {user_property.FIRST_NAME}')
prop/user_property.py
from typing import ClassVar, Dict, List, Any
class UserProperty:
def __init__(self, kwargs: Dict[str, Any]):
self.FIRST_NAME = kwargs['first_name']
self.FAMILY_NAME = kwargs['family_name']
Résultat d'exécution
$ python main.py
Tester Ichiro
subprocess
import subprocess
c = ['hadoop', 'fs', '-rm', '-r', '/tmp/test']
subprocess.run(c)
c = 'ls tmp/* | xargs -L 1 -P 10 -t bash -c \'cat $0 | test.sh -'
p = subprocess.Popen(c, shell = True)
p.wait()
#Traitement ultérieur
click
click
├── cli.py
└── command
└── hello
└── cli.py
click/cli.py
import click
from command.hello.cli import hello
@click.group()
def entry_point():
print('click/cli.message py.')
entry_point.add_command(hello)
def init():
entry_point(**{})
if __name__ == '__main__':
init()
click/command/hello/cli.py
import click
@click.command('hello')
@click.option('--msg', '-m', 'msg', type=str, help='Saisissez le message que vous souhaitez afficher.')
def hello(**kwargs):
print(f'Message entré:{kwargs["msg"]}')
print('click/cmd/hello/cli.message py.')
$ python cli.py hello -m 'tester'
click/cli.message py.
Message entré: Test
click/cmd/hello/cli.message py.
pandas
import pandas as pd
df = pd.read_csv('user.tsv', delimiter='\t', header=None, names=['id', 'name'], dtype={'id': str, 'name': str}, low_memory=False)
df.to_csv('test.tsv', sep='\t')
columns = ['id', 'name']
df[colums].to_csv('test.tsv', sep='\t', index=False)
df.sample(n=100).to_csv('test.tsv', sep='\t')
df.drop_duplicates()
df.query('row_name.str.contains("\\\"keyword\\\"")')
import sys
sys.exit(1)
―― Utilisé pour voir si vous disposez des entrées nécessaires avant d'effectuer l'analyse des données.
import os
if os.path.exists():
print('Le fichier existe. Effectue le traitement ultérieur.')
else:
print('Le fichier n'existe pas. Le processus se termine.')
sys.exit(1)
--Utilisez la journalisation du module standard de Python.
test
├── module
│ └── sub.py
└── main.py
main.py
#Module fait maison
import module.sub as sub
from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING
from logging import NullHandler, StreamHandler, basicConfig, getLogger, Formatter
from logging.handlers import TimedRotatingFileHandler
logger = getLogger(__name__)
logger.addHandler(NullHandler())
logger.setLevel(DEBUG)
sh = StreamHandler()
def init() -> None:
basicConfig(
handlers=[sh],
format="[%(asctime)s] %(name)s %(levelname)s: %(message)s",
datefmt="%y-%m-%d %H:%M:%S",
)
root_logger = getLogger()
root_logger.setLevel(DEBUG)
rfh = TimedRotatingFileHandler(
"log/test.log",
when="midnight",
backupCount=30,
)
format_template = (
f"PID:%(process)d [%(asctime)s] %(name)s %(levelname)s: %(message)s"
)
log_format = Formatter(fmt=format_template, datefmt="%y-%m-%d %H:%M:%S")
rfh.setFormatter(log_format)
root_logger.addHandler(rfh)
logger.debug("Lancer l'exécution du script")
if __name__ == "__main__":
init()
#Appelez la fonction de votre propre module
sub.hello()
module/sub.py
from logging import getLogger
logger = getLogger(__name__)
def hello():
print('hello! this is sub module.')
logger.debug('Sortie du sous-module')
$ python main.py
[20-06-25 14:20:56] __main__ DEBUG:Lancer l'exécution du script
hello! this is sub module.
[20-06-25 14:20:56] module.sub DEBUG:Sortie du sous-module
$ head log/test.log
PID:15171 [20-06-25 14:20:56] __main__ DEBUG:Lancer l'exécution du script
PID:15171 [20-06-25 14:20:56] module.sub DEBUG:Sortie du sous-module
cnt = str(sum(1 for line in open('test.tsv')))
main.py
import os
def load_file_as_one_line(file, sep):
with open(file) as f:
lines_one_str = ''
# a\nb\nc\n -> a|b|c|d
lines = f.readlines()
for line in lines:
w = line.rstrip(os.linesep)
if(w != ''):
lines_one_str += w + sep
return lines_one_str[:-1]
print(load_file_as_one_line('data.txt', '|'))
$ cat data.txt
tester
test
texte
text
Goût
$ python main.py
tester|test|texte|text|Goût|taste
main.py
import datetime
from dateutil.relativedelta import relativedelta
def out_term(year, month, term, base_dir):
d = datetime.date(year, month, 1)
txt = ""
for i in range(term):
txt += base_dir + (d + relativedelta(months=i)).strftime("%Y/%m")
if(i != term - 1) :
txt += ","
return txt
def out_reverse_term_by_day(d, reverse_term, base_dir):
txt = ""
d = d - relativedelta(days=reverse_term - 1)
for i in range(reverse_term):
txt += base_dir + (d + relativedelta(days=i)).strftime("%Y/%m/%d")
if(i != reverse_term - 1) :
txt += ","
return txt
# 2019-Préparer un annuaire pendant 11 à 4 mois
print(out_term(2019, 11, 4, '/tmp/input/'))
# 2019-11-Préparez un annuaire qui remonte à 5 jours depuis 02
print(out_reverse_term_by_day(datetime.date(2019, 11, 2), 5, '/tmp/input/'))
Résultat d'exécution
$ python main.py
/tmp/input/2019/11,/tmp/input/2019/12,/tmp/input/2020/01,/tmp/input/2020/02
/tmp/input/2019/10/29,/tmp/input/2019/10/30,/tmp/input/2019/10/31,/tmp/input/2019/11/01,/tmp/input/2019/11/02
--Définissez le mot que vous souhaitez remplacer dans le dictionnaire et la valeur que vous souhaitez remplacer, puis générez un Pig intégré dans le modèle.
main.py
def substitute_condition(template, output, target_word, condition):
txt = ''
with open(template) as f:
lines_one_str = f.read()
txt = lines_one_str.replace(target_word, condition)
with open(output, mode='w') as f:
f.write(txt)
def translate(template: str, output: str, d: {str, str}):
for i, (k, v) in enumerate(d.items()):
if i == 0:
substitute_condition(template, output, k, v)
else:
substitute_condition(output, output, k, v)
d = {'$INPUT': '/tmp/input', '$COND': 'tester|test', '$OUTPUT': '/tmp/output'}
translate('template.pig', 'output.pig', d)
Courir
$ python main.py
template.pig
L = LOAD '$INPUT' USING PigStorage('\t');
F = FILTER L BY note matches '$COND';
FS -rm -r -f -skipTrash $OUTPUT
STORE F INTO '$OUTPUT' USING PigStorage('\t', '-schema');
output.pig
L = LOAD '/tmp/input' USING PigStorage('\t');
F = FILTER L BY note matches 'tester|test';
FS -rm -r -f -skipTrash /tmp/output
STORE F INTO '/tmp/output' USING PigStorage('\t', '-schema');
def send_mail(subject: str, body: str, from: str, to: str, svr: str, port: str, id: str, password: str):
msg = MIMEText(body, 'html')
msg['Subject'] = subject
msg['From'] = from
msg['To'] = to
server = smtplib.SMTP_SSL(svr, port)
#Pour SSL
# server = smtplib.SMTP_SSL(svr, port, context=ssl.create_default_context())
server.login(id, password)
server.send_message(msg)
Recommended Posts