CYBIRD Engineer Advent Calendar 2016, this year is also @yuichi_komatsu in charge of the 16th day. I am a data analysis engineer. We are also looking for friends who can devote themselves together! If you are interested, here! !!
Yesterday was @sakamoto_koji's "Knowledge gained from server-side development of subscription". .. It's a practical and valuable tip because we have been struggling in the field! great! !!
Then it is the main subject.
Last year I wrote "The story of embulk and BigQuery being too golden" (I accidentally deleted this when I wrote a comment ... I'm sorry) There is a new force ** Embulk and BigQuery combinations will be further enhanced by creating a workflow with Digdag! ** ** It is a story.
It's not a digging game. It is a workflow engine in OSS of Treasure Data of the world. Jenkins is used by many departments of our company, but unlike that, there is no GUI (under development?), A file called dig is created with a YAML-like description, and JOB is executed. Similar products include Luigi, AirFlow, etc., and Luigi was used temporarily within the department, but compared to that, it is very intuitive, does not hesitate, and feels flexible (individual). Intuition). You don't need Python power like luigi. .. Please refer to here for the documentation including the installation of Digdag.
・ Local mode ・ Server mode ・ Client mode However, at the moment we are running on one server because it meets the requirements in local mode. This time, I would like to introduce some of the usage within our analysis team.
So, suddenly, this is a setting example.
timezone: Asia/Tokyo
schedule:
daily>: 1:00:00
+main:
_export:
host: 'XXX.XXX.XXX.XXX'
user: 'hoge'
password: 'hoge_password'
database: 'testdb'
project_id: 'sample_project'
dataset: 'hoge_dataset'
+date:
py>: date.SetDate.set_date
+all_load:
_parallel: true
+load_log:
!include : 'hoge/log.dig'
+load_user:
!include : 'hoge/user.dig'
+load_master:
!include : 'hoge/master.dig'
This is the dig file used when loading the DB (MySQL) log of a game of our company into BigQuery with embulk, and it is the parent dig file that defines the common part.
JOB is scheduled by running the scheduler in the background with ./digdag scheduler &
and setting schedule:
as described above.
Below the + main
task,_export:
first defines the variables to be used after that.
Here, MySQL access information used in Embulk Input and BigQuery project_id, dataset, etc. used in Output are defined.
py>:
of + date
gets the target date in Python.
Since the date data stored in the DB differs depending on the game from unixtime and datetime, either can be specified.
For reference, this Python script is also included.
__init__.py
# -*- coding: utf-8 -*-
import digdag
import time
from datetime import datetime,timedelta
from pytz import timezone
class SetDate(object):
def set_date(self, target_date = ''):
# target_If there is a date argument
if target_date:
#Initiation condition
start_datetime = datetime.strptime(target_date, '%Y-%m-%d')
#Exit conditions
end_datetime = datetime.strptime(target_date, '%Y-%m-%d') + timedelta(days=1)
# target_If there is no date argument
else:
#Current time
utc_now = datetime.now(timezone('UTC'))
jst_now = datetime.now(timezone('Asia/Tokyo'))
#Corresponding day (1 day ago)
target_date = (jst_now - timedelta(days=1)).strftime('%Y-%m-%d')
#Initiation condition
start_datetime = datetime.strptime((jst_now - timedelta(days=1)).strftime('%Y-%m-%d'), '%Y-%m-%d')
#Exit conditions
end_datetime = datetime.strptime(jst_now.strftime('%Y-%m-%d'), '%Y-%m-%d')
#Convert to unixtime
start_unixtime = int(time.mktime(start_datetime.timetuple()))
end_unixtime = int(time.mktime(end_datetime.timetuple()))
#str conversion
start_datetime = str(start_datetime)
end_datetime = str(end_datetime)
#Set in environment variable
digdag.env.store({"target_date": target_date, "start_unixtime": start_unixtime, "end_unixtime": end_unixtime, "start_datetime": start_datetime, "end_datetime": end_datetime})
By doing ʻimport digdag and
digdag.env.store, you can use the set value as an environment variable. Here, the date data used in the embulk yml and Chatwork integration scripts are acquired. Place the script as
init.pyunder the digdag execution directory. In the example, it is placed as
date / __ init__.py`.
Going back to the parent dig file,
In + all_load
, the following child tasks are executed in parallel by setting true in_parallel:
.
You can also load other dig files with ! include:
.
Here, log.dig
, ʻuser.dig, and
master.dig` are operated in parallel.
Below is a sample of log.dig
.
+log:
_export:
#----------------#
# Config by TYPE #
#----------------#
process: 'log'
+sample1_log:
_export:
table: 'sample1_log'
_error:
_export:
status: 'ERROR'
py>: info.ChatWork.post
embulk>: hoge/yml/log/sample1_log.yml
+sample2_log:
_export:
table: 'sample2_log'
_error:
_export:
status: 'ERROR'
py>: info.ChatWork.post
embulk>: hoge/yml/log/sample2_log.yml
#(Omitted)
+post:
# SUCCESS info to Chatwork
_export:
job: 'ALL'
status: 'SUCCESS'
py>: info.ChatWork.post
The table
variable is set in_export:
of + sample1_log
and + sample2_log
, and embulk is executed.
The set variables are used in embulk's yml.
Also, if an error occurs there, it is posted to ChatWork with py>: info.ChatWork.post
so that it can be determined in which task the error occurred.
The JOB itself will end if an error occurs.
digdag manages the session, and if you run it in the same session, digdag run main.dig
will skip to the error part as it is.
If you want to ignore the session and start from the beginning, use digdag run main.dig -a
.
Please refer to Documentation for the specifications of this area.
In the example, target_date
can be set as an argument, so you can also specify digdag run main.dig -p target_date = 2016-12-10
.
The embulk yml sample (input: MySQL, output: BigQuery) is as follows.
in:
type: mysql
host: ${host}
user: ${user}
password: "${password}"
database: ${database}
table: ${table}
select: "id,action,zip_url,zip_hash,zip_created_at,zip_size,summary_flg ,image_quality,created_at,updated_at"
where: created_at >= ${start_unixtime} AND created_at < ${end_unixtime}
out:
type: bigquery
mode: append
auth_method: json_key
json_keyfile: /home/hoge/embulk/keyfile/json_keyfile.json
path_prefix: /home/hoge/embulk/tmp/${dataset}/${table}
source_format: CSV
compression: GZIP
project: ${project_id}
dataset: ${dataset}
auto_create_table: true
table: ${table}
schema_file: /home/hoge/embulk/schema/${dataset}/${table}.json
delete_from_local_when_job_end: true
Variables can be referenced by $ {variable name}. Here, since the columns are specified by SELECT, the yml file is referenced for each table, but if you want to select all columns, you can cover it with one template, so you can make it a simpler configuration. think. BigQuery datasets, table partitions, etc. can also be changed dynamically as needed.
Although not used by our analysis team, since Ver 0.8.18, operators such as bq>
, bq_load>
and gcs_wait>
can be used, so the range of choices when loading into BigQuery I think that has spread.
Well, the operator seems to be able to make his own, so in that sense it can be said that he can do anything. ..
Digdag allows you to define parent-child relationships and dependencies simply and intuitively, and of course it is perfectly compatible with embulk, and you can perform simple and flexible workflow processing by dynamically acquiring and setting variables. !! If you compare it to Captain Tsubasa, Digdag is like Misugi-kun, who manages to cooperate with the surroundings.
Tomorrow's CYBIRD Engineer Advent Calendar 2016, Day 17 [@ cy-nana-obata](http://qiita.com/cy- This is nana-obata). It will show off the youthful and hopeful material unique to new graduates! ?? I'm looking forward to it! !! !!
In addition, the soccer training game "BFB Champions" provided by our company is currently tied up with "Captain Tsubasa", and Tsubasa-kun and Misaki-kun's You can play the original golden combination in addition to Eleven, so if you haven't played it yet, please try it! There is also Misugi-kun! !!
Recommended Posts