TL;DR
To estimate the number of bytes processed by a single run of a given DAG on a given day, execute the following:
$ pip3 install https://github.com/dtws/[email protected]
$ airflow-dag-profiler <dag_id> <date in the format %Y-%m-%d>
Problem
At work I often use BigQueryandAirflow. However, BigQuery is not free and is billed based on the amount of data processed. Therefore, it is often necessary to know how many bytes the run of the given DAG will process, so to estimate the price of the backfill. In this case, I find the following script being of use:
#!/usr/bin/env python3
"""===============================================================================
FILE: airflow-dag-profiler.py
USAGE: ./airflow-dag-profiler <DAG_ID> <DATE in format %Y-%m-%d>
DESCRIPTION:
OPTIONS: ---
REQUIREMENTS: ---
BUGS: ---
NOTES: ---
AUTHOR: Alex Leontiev ([email protected])
ORGANIZATION: Datawise Inc.
VERSION: ---
CREATED: 2020-12-11T10:15:42.030633
REVISION: ---
==============================================================================="""
import click
import logging
import os
import subprocess
from collections import namedtuple
import re
from tqdm import tqdm
from google.cloud import bigquery
import pandas as pd
from jinja2 import Template
def _add_logger(f):
logger = logging.getLogger(f.__name__)
def _f(*args, **kwargs):
return f(*args, logger=logger, **kwargs)
_f.__name__ = f.__name__
return _f
RetCodeAndOutput = namedtuple("RetCodeAndOutput", "retcode output")
@_add_logger
def _system(cmd, logger=None):
"""return (exitcode,output)"""
logger.info(f"> {cmd}")
exitcode, output = subprocess.getstatusoutput(cmd)
return RetCodeAndOutput(retcode=exitcode, output=output)
@click.command()
@click.option("--debug/--no-debug", default=False)
@click.argument("dag_id")
@click.argument("date", type=click.DateTime())
@click.option("--airflow-render-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND", default="airflow render {{dag_id}} {{bq_task}} {{ds}}")
@click.option("--airflow-list-tasks-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND", default="airflow list_tasks -t {{dag_id}} 2>/dev/null")
def airflow_dag_profiler(dag_id, debug, date, airflow_list_tasks_command, airflow_render_command):
if debug:
logging.basicConfig(level=logging.INFO)
tasktree = _system(Template(airflow_list_tasks_command).render(
{"dag_id": dag_id, "date": date})).output
client = bigquery.Client()
# <Task(BigQueryOperator): do_filter>
bq_tasks = [t for t in re.findall(
r"<Task\(BigQueryOperator\): ([a-zA-Z0-9_]+)>", tasktree)]
bq_tasks = list(set(bq_tasks))
quota = []
for bq_task in tqdm(bq_tasks):
sql = _system(
Template(airflow_render_command).render({"dag_id": dag_id, "bq_task": bq_task, "date": date,"ds":date.strftime("%Y-%m-%d")})).output
lines = sql.split("\n")
start = next(i for i, line in enumerate(lines) if re.match(
"^ *# property: sql *$", line) is not None)
end = next(i for i, line in enumerate(lines) if re.match(
"^ *# property: destination_dataset_table *$", line) is not None)
sql = "\n".join(lines[start+2:end-1])
total_bytes_processed = client.query(sql, job_config=bigquery.job.QueryJobConfig(
dry_run=True, use_query_cache=False)).total_bytes_processed
quota.append(
{"bq_task": bq_task, "total_bytes_processed": total_bytes_processed})
df = pd.DataFrame(quota)
print(df)
print(f"total: {sum(df.total_bytes_processed)} bytes")
if __name__ == "__main__":
airflow_dag_profiler()
The online version can be found at https://github.com/dtws/airflow-dag-profiler.Thescriptcanalsobeinstalledviapip
,asshowinTL;DR section
above.
Under the hood, the script does the following:
BigQueryOperator
tasks;google.cloud.bigquery
API to get the estimate for the number of bytes processed by each SQL;By default, script queries the local airflow via airflow
command. However, it can also be used in scenarios when your Airflow is deployed remotely. In this case, you will need to provide --airflow-render-command
and
--airflow-list-tasks-command
keys (alternatively, AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND
and AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND
environment variables).
Current Limitations
A
and B
and A
creates table a
that B
then uses), then you can only use the script after you ran the DAG. Although it
makes the script pretty useless in this situation, I often find myself in situation when I need to estimate the price of a long backfill (~200 days). In this case, I can first backfill for the first day, and then use this data for the
estimate, exploiting the fact that the number of bytes processed every day is about the same;Recommended Posts