Computing the BigQuery cost of a single Airflow DAG run

TL;DR

To estimate the number of bytes processed by a single run of a given DAG on a given day, execute the following:

$ pip3 install https://github.com/dtws/[email protected]
$ airflow-dag-profiler <dag_id> <date in the format %Y-%m-%d>

Problem

At work I often use BigQueryandAirflow. However, BigQuery is not free and is billed based on the amount of data processed. Therefore, it is often necessary to know how many bytes the run of the given DAG will process, so to estimate the price of the backfill. In this case, I find the following script being of use:

#!/usr/bin/env python3
"""===============================================================================

        FILE: airflow-dag-profiler.py

       USAGE: ./airflow-dag-profiler <DAG_ID> <DATE in format %Y-%m-%d>

 DESCRIPTION: 

     OPTIONS: ---
REQUIREMENTS: ---
        BUGS: ---
       NOTES: ---
      AUTHOR: Alex Leontiev ([email protected])
ORGANIZATION: Datawise Inc.
     VERSION: ---
     CREATED: 2020-12-11T10:15:42.030633
    REVISION: ---

==============================================================================="""

import click
import logging
import os
import subprocess
from collections import namedtuple
import re
from tqdm import tqdm
from google.cloud import bigquery
import pandas as pd
from jinja2 import Template


def _add_logger(f):
    logger = logging.getLogger(f.__name__)

    def _f(*args, **kwargs):
        return f(*args, logger=logger, **kwargs)
    _f.__name__ = f.__name__
    return _f


RetCodeAndOutput = namedtuple("RetCodeAndOutput", "retcode output")


@_add_logger
def _system(cmd, logger=None):
    """return (exitcode,output)"""
    logger.info(f"> {cmd}")
    exitcode, output = subprocess.getstatusoutput(cmd)
    return RetCodeAndOutput(retcode=exitcode, output=output)


@click.command()
@click.option("--debug/--no-debug", default=False)
@click.argument("dag_id")
@click.argument("date", type=click.DateTime())
@click.option("--airflow-render-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND", default="airflow render {{dag_id}} {{bq_task}} {{ds}}")
@click.option("--airflow-list-tasks-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND", default="airflow list_tasks -t {{dag_id}} 2>/dev/null")
def airflow_dag_profiler(dag_id, debug, date, airflow_list_tasks_command, airflow_render_command):
    if debug:
        logging.basicConfig(level=logging.INFO)
    tasktree = _system(Template(airflow_list_tasks_command).render(
        {"dag_id": dag_id, "date": date})).output
    client = bigquery.Client()
    # <Task(BigQueryOperator): do_filter>
    bq_tasks = [t for t in re.findall(
        r"<Task\(BigQueryOperator\): ([a-zA-Z0-9_]+)>", tasktree)]
    bq_tasks = list(set(bq_tasks))
    quota = []
    for bq_task in tqdm(bq_tasks):
        sql = _system(
            Template(airflow_render_command).render({"dag_id": dag_id, "bq_task": bq_task, "date": date,"ds":date.strftime("%Y-%m-%d")})).output
        lines = sql.split("\n")
        start = next(i for i, line in enumerate(lines) if re.match(
            "^ *# property: sql *$", line) is not None)
        end = next(i for i, line in enumerate(lines) if re.match(
            "^ *# property: destination_dataset_table *$", line) is not None)
        sql = "\n".join(lines[start+2:end-1])
        total_bytes_processed = client.query(sql, job_config=bigquery.job.QueryJobConfig(
            dry_run=True, use_query_cache=False)).total_bytes_processed
        quota.append(
            {"bq_task": bq_task, "total_bytes_processed": total_bytes_processed})
    df = pd.DataFrame(quota)
    print(df)
    print(f"total: {sum(df.total_bytes_processed)} bytes")


if __name__ == "__main__":
    airflow_dag_profiler()

The online version can be found at https://github.com/dtws/airflow-dag-profiler.Thescriptcanalsobeinstalledviapip,asshowinTL;DR section above.

Under the hood, the script does the following:

  1. queries Airflow to get the list of tasks in the DAG;
  2. filters through tasks to leave only BigQueryOperator tasks;
  3. renders the remaining tasks one-by-one using the given datetime;
  4. uses google.cloud.bigquery API to get the estimate for the number of bytes processed by each SQL;

By default, script queries the local airflow via airflow command. However, it can also be used in scenarios when your Airflow is deployed remotely. In this case, you will need to provide --airflow-render-command and --airflow-list-tasks-command keys (alternatively, AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND and AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND environment variables).

Current Limitations

  1. DAG should be already deployed to Airflow (which means that you will need working Airflow deployment)
  2. If your DAG uses input tables which it itself creates during the DAG run (e.g. the DAG consists of tasks A and B and A creates table a that B then uses), then you can only use the script after you ran the DAG. Although it makes the script pretty useless in this situation, I often find myself in situation when I need to estimate the price of a long backfill (~200 days). In this case, I can first backfill for the first day, and then use this data for the estimate, exploiting the fact that the number of bytes processed every day is about the same;
  3. if you use BigQuery through operators other than BigQueryOperator (say, PythonOperator or using your own heir of BigQueryOperator), those will not be included in the bill;

Recommended Posts

Computing the BigQuery cost of a single Airflow DAG run
The day of docker run (note)
The story of writing a program
I measured the run queue wait time of a process on Linux
Measure the relevance strength of a crosstab
A quick overview of the Linux kernel
Run the Python interpreter in a script
A memo explaining the axis specification of axis
Get the filename of a directory (glob)
The story of blackjack A processing (python)
Notice the completion of a time-consuming command
[AWS] Let's run a unit test of Lambda function in the local environment