CYBIRD Engineer Advent Calendar 2016, cette année est aussi @yuichi_komatsu en charge du 16e jour. Je suis ingénieur en analyse de données. Nous recherchons également des amis qui peuvent se consacrer ensemble! Si vous êtes intéressé, ici! !!
Hier, c'était @sakamoto_koji's "Findings gagned from server-side development of subscriptions". .. C'est un conseil pratique et précieux car nous avons eu du mal sur le terrain! génial! !!
Ensuite, c'est le sujet principal.
L'année dernière, j'ai écrit "L'histoire d'embulk et BigQuery étant trop dorée" (j'ai accidentellement supprimé ceci lorsque j'ai écrit un commentaire ... je suis désolé) Il y a une nouvelle force ** La combinaison Embulk et BigQuery sera encore améliorée en créant un flux de travail avec Digdag! ** ** C'est une histoire.
Ce n'est pas un jeu de fouille. C'est un moteur de workflow en OSS de Treasure Data du monde. Jenkins est utilisé par beaucoup de nos départements, mais contrairement à cela, il n'y a pas d'interface graphique (en cours de développement?), Et un fichier appelé dig est créé avec une description de type YAML et JOB est exécuté. Il existe des produits similaires tels que Luigi et AirFlow, et Luigi a été utilisé temporairement dans le département, mais comparé à cela, il est très intuitif, n'hésite pas et se sent flexible (individuel). Bien que ce soit le cas). Vous n'avez pas besoin de la puissance Python comme luigi. .. Veuillez vous référer à here pour la documentation comprenant l'installation de Digdag.
· Mode local ・ Mode serveur ・ Mode client Cependant, pour le moment, nous fonctionnons sur un serveur car il répond aux exigences en mode local. Cette fois, je voudrais vous présenter certaines des utilisations au sein de notre équipe d'analyse.
Donc, tout à coup, c'est un exemple de mise en scène.
timezone: Asia/Tokyo
schedule:
daily>: 1:00:00
+main:
_export:
host: 'XXX.XXX.XXX.XXX'
user: 'hoge'
password: 'hoge_password'
database: 'testdb'
project_id: 'sample_project'
dataset: 'hoge_dataset'
+date:
py>: date.SetDate.set_date
+all_load:
_parallel: true
+load_log:
!include : 'hoge/log.dig'
+load_user:
!include : 'hoge/user.dig'
+load_master:
!include : 'hoge/master.dig'
Il s'agit du fichier dig utilisé lors du chargement du journal DB (MySQL) de l'un de nos jeux dans BigQuery avec embulk, et c'est le fichier dig parent qui définit les parties communes.
JOB est planifié en exécutant le planificateur en arrière-plan avec ./digdag scheduler &
à l'avance et en définissant schedule:
comme décrit ci-dessus.
Sous la tâche + main
,_export:
définit d'abord les variables utilisées par la suite.
Ici, les informations d'accès de MySQL utilisées pour l'entrée d'embulk et le project_id, l'ensemble de données, etc. de BigQuery utilisé pour la sortie sont définis.
Le py>:
de + date
obtient la date cible en Python.
Étant donné que les données de date stockées dans la base de données diffèrent selon le jeu d'unixtime et datetime, l'une ou l'autre peut être spécifiée.
Pour référence, ce script Python est également inclus.
__init__.py
# -*- coding: utf-8 -*-
import digdag
import time
from datetime import datetime,timedelta
from pytz import timezone
class SetDate(object):
def set_date(self, target_date = ''):
# target_S'il y a un argument pour la date
if target_date:
#Condition d'initiation
start_datetime = datetime.strptime(target_date, '%Y-%m-%d')
#Conditions de sortie
end_datetime = datetime.strptime(target_date, '%Y-%m-%d') + timedelta(days=1)
# target_S'il n'y a pas d'argument de date
else:
#Heure actuelle
utc_now = datetime.now(timezone('UTC'))
jst_now = datetime.now(timezone('Asia/Tokyo'))
#Jour correspondant (il y a 1 jour)
target_date = (jst_now - timedelta(days=1)).strftime('%Y-%m-%d')
#Condition d'initiation
start_datetime = datetime.strptime((jst_now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d')
#Conditions de sortie
end_datetime = datetime.strptime(jst_now.strftime('%Y-%m-%d'), '%Y-%m-%d')
#Convertir en unixtime
start_unixtime = int(time.mktime(start_datetime.timetuple()))
end_unixtime = int(time.mktime(end_datetime.timetuple()))
#conversion str
start_datetime = str(start_datetime)
end_datetime = str(end_datetime)
#Définir dans la variable d'environnement
digdag.env.store({"target_date": target_date, "start_unixtime": start_unixtime, "end_unixtime": end_unixtime, "start_datetime": start_datetime, "end_datetime": end_datetime})
En faisant ʻimport digdag et
digdag.env.store`, vous pouvez utiliser la valeur définie comme variable d'environnement.
Ici, les données de date utilisées dans les scripts de liaison embulk yml et Chatwork sont acquises.
Placez le script comme «__init __. Py» sous le répertoire d'exécution de digdag.
Dans l'exemple, il est placé sous la forme «date / __ init __. Py».
Revenant au fichier dig parent,
Dans + all_load
, les tâches enfants suivantes sont exécutées en parallèle en définissant true dans _parallel:
.
Vous pouvez également charger d'autres fichiers dig avec ! Include:
.
Ici, log.dig
, ʻuser.dig et
master.dig` fonctionnent en parallèle.
Voici un exemple de log.dig
.
+log:
_export:
#----------------#
# Config by TYPE #
#----------------#
process: 'log'
+sample1_log:
_export:
table: 'sample1_log'
_error:
_export:
status: 'ERROR'
py>: info.ChatWork.post
embulk>: hoge/yml/log/sample1_log.yml
+sample2_log:
_export:
table: 'sample2_log'
_error:
_export:
status: 'ERROR'
py>: info.ChatWork.post
embulk>: hoge/yml/log/sample2_log.yml
#(Omis)
+post:
# SUCCESS info to Chatwork
_export:
job: 'ALL'
status: 'SUCCESS'
py>: info.ChatWork.post
La variable table
est définie dans _export: ʻof
+ sample1_log et
+ sample2_log, et embulk est exécuté. Les variables définies sont utilisées dans le yml d'embulk. De plus, si une erreur s'y produit, elle est publiée sur ChatWork avec
py>: info.ChatWork.postafin de pouvoir déterminer dans quelle tâche l'erreur s'est produite. Le JOB lui-même se terminera également si une erreur se produit. digdag gère la session, et si vous l'exécutez dans la même session,
digdag run main.digpassera à la partie erreur telle quelle. Si vous voulez ignorer la session et recommencer depuis le début, utilisez
digdag run main.dig -a. Veuillez consulter la [Documentation](http://docs.digdag.io/index.html) pour les spécifications de cette zone. Dans l'exemple,
target_date peut être défini comme argument, vous pouvez donc également spécifier
digdag run main.dig -p target_date = 2016-12-10`.
L'exemple Embulk yml (entrée: MySQL, sortie: BigQuery) est le suivant.
in:
type: mysql
host: ${host}
user: ${user}
password: "${password}"
database: ${database}
table: ${table}
select: "id,action,zip_url,zip_hash,zip_created_at,zip_size,summary_flg ,image_quality,created_at,updated_at"
where: created_at >= ${start_unixtime} AND created_at < ${end_unixtime}
out:
type: bigquery
mode: append
auth_method: json_key
json_keyfile: /home/hoge/embulk/keyfile/json_keyfile.json
path_prefix: /home/hoge/embulk/tmp/${dataset}/${table}
source_format: CSV
compression: GZIP
project: ${project_id}
dataset: ${dataset}
auto_create_table: true
table: ${table}
schema_file: /home/hoge/embulk/schema/${dataset}/${table}.json
delete_from_local_when_job_end: true
Les variables peuvent être référencées par $ {nom de variable}. Ici, puisque les colonnes sont spécifiées par SELECT, le fichier yml est référencé pour chaque table, mais si vous souhaitez sélectionner toutes les colonnes, vous pouvez le couvrir avec un modèle, afin de pouvoir en faire une configuration plus simple. pense. Les ensembles de données BigQuery, les partitions de table, etc. peuvent également être modifiés de manière dynamique si nécessaire.
Bien qu'il ne soit pas utilisé par notre équipe d'analyse, depuis la version 0.8.18 ou ultérieure, des opérateurs tels que «bq>», «bq_load>» et «gcs_wait>» peuvent être utilisés, de sorte que la gamme de choix lors du chargement dans BigQuery Je pense que cela s'est répandu. Eh bien, l'opérateur semble être capable de faire le sien, donc en ce sens on peut dire qu'il peut tout faire. ..
Digdag vous permet de définir les relations et les dépendances parent-enfant de manière simple et intuitive, et bien sûr, il est parfaitement compatible avec embulk, et vous pouvez effectuer un traitement de flux de travail simple et flexible en acquérant et en définissant dynamiquement des variables. !! Si vous le comparez avec le capitaine Tsubasa, Digdag est comme Misugi-kun, qui parvient à coopérer avec l'environnement.
Tomorrow's CYBIRD Engineer Advent Calendar 2016, Jour 17 [@ cy-nana-obata](http://qiita.com/cy- C'est nana-obata). Il montrera le matériel jeune et plein d'espoir unique aux nouveaux diplômés! ?? J'ai hâte d'y être! !! !!
De plus, le jeu d'entraînement de football "BFB Champions" fourni par notre société est actuellement lié à "Captain Tsubasa", et Tsubasa-kun et Misaki-kun's Vous pouvez jouer la combinaison d'or originale en plus d'Eleven, donc si vous ne l'avez pas encore jouée, essayez-la! Il y a aussi Misugi-kun! !!
Recommended Posts