[BigQuery] How to use BigQuery API for Python -Table creation-

In 5 lines

--Data scientists usually analyze with Jupyter --Therefore, there is a desire to process DB on Jupyter. --Therefore, it is more convenient to use BigQuery on Jupyter via the library instead of WebUI or REST API. --I decided to investigate the function of the official library google.cloud.bigquery to realize what I wrote above. --The following is a summary of how to create a table in BigQuery.

Preparation

from google.cloud import bigquery

#Specify your own GCP Project ID
project_id = 'YourProjectID'
client = bigquery.Client(project=project_id)

If you don't know how to authenticate with Colaboratory, I wrote an article earlier so please refer to it.

If you run it in a GCE environment, authentication should pass by default in the first place.

If you want to access it in other environment, create and load JSON for authentication according to the official API reference below.

Three ways to access BigQuery with Colaboratory Official API Reference

Premise

google.cloud.bigquery : Ver. 1.20.0

Needless to say, please install it like this

pip install google-cloud-bigquery==1.20.0

DataSet is assumed to be created in the US region

DataSet operation

** If the PJ already has a DataSet, you can skip this part completely **. Even if you don't have a DataSet yet, it's basically not possible to recreate the DataSet in various ways, so once you've processed it, you can forget about the functions around here.

By the way, BigQuery DataSet corresponds to the "schema" in other DBs. But in BQ, the schema is given a different meaning, so DataSet is not called the schema </ font> here.

Creating a DataSet

Create Dataset


# [demo]Create a DataSet with the name
dataset_name = "demo"
dataset_id = "{}.{}".format(client.project, dataset_name)

dataset = bigquery.Dataset(dataset_id)
#The location is the cheapest in the US so I always use this.If you are particular about the region, please change it.
dataset.location = "US"

client.create_dataset(dataset) 

Reference: Managing Datasets

Table operation

About the process of creating a table and loading data into the table

--Create table --Check the table --Data Import to table --Data Export from table --Delete table

Basically, if you read the official reference, it's all written, but well. .. .. Yup. .. .. You can write it in Qiita. .. ..

-reference- Managing Tables Use data definition language statements (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language?hl=ja)

Creating a table

Here, we will describe a code example assuming that a purchase history table for the following products will be created.

# Column name Mold mode comment
1 TRANSACTION_ID STRING REQUIRED Purchase history ID
2 ORDER_TS TIMESTAMP REQUIRED Purchase time
3 ORDER_DT DATE REQUIRED Purchase date
4 ITEM_CODE STRING REQUIRED Product ID
5 ITEM_NAME STRING NULLABLE Product name
6 QUANTITY INTEGER NULLABLE Purchase quantity
7 AMOUNT FLOAT NULLABLE Purchase price
8 DISCOUNT FLOAT NULLABLE Discount amount
9 CUSTOMER_ID STRING REQUIRED Customer ID
10 ITEM_TAG RECORD REPEATED Product tag list
10.1 TAG_ID STRING NULLABLE Tag ID
10.2 TAG_NAME STRING NULLABLE Tag name
  • Ignore field # 10 if you don't want to handle nested information

Create table definition (create schema)

BigQuery reads the table definition as a schema

Therefore, various definitions will be inserted into the method of bigquery.SchemaField.

Field name and type cannot be omitted

Tag information is defined in a nested format

from google.cloud import bigquery

#Define Schema
schema = [
    bigquery.SchemaField('TRANSACTION_ID', 'STRING', mode='REQUIRED', description='Purchase history ID'),
    bigquery.SchemaField('ORDER_TS', 'TIMESTAMP', mode='REQUIRED', description='Purchase time'),
    bigquery.SchemaField('ORDER_DT', 'DATE', mode='REQUIRED', description='Purchase date'),
    bigquery.SchemaField('ITEM_CODE', 'STRING', mode='REQUIRED', description='Product ID'),
    bigquery.SchemaField('ITEM_NAME', 'STRING', mode='NULLABLE', description='Product name'),
    bigquery.SchemaField('QUANTITY', 'INTEGER', mode='NULLABLE', description='Purchase quantity'),
    bigquery.SchemaField('AMOUNT', 'FLOAT', mode='NULLABLE', description='Purchase price'),
    bigquery.SchemaField('DISCOUNT', 'FLOAT', mode='NULLABLE', description='Discount amount'),
    bigquery.SchemaField('CUSTOMER_ID', 'STRING', mode='NULLABLE', description='Customer ID'),
    
    bigquery.SchemaField('ITEM_TAG', 'RECORD', mode='REPEATED', description='Tag information', 
                         fields= [
                         bigquery.SchemaField('TAG_ID', 'STRING', mode='NULLABLE', description='Tag ID'),
                         bigquery.SchemaField('TAG_NAME', 'STRING', mode='NULLABLE', description='Tag name'),
                         ]
                         )
]

Actually create a table

After creating the schema, the next step is to actually create the table

Other factors to consider besides the schema

--Use a split table (If you plan to store data with dates over 2000 days, it is better not to use a split table) --Make it a cluster table (note that it can only be applied to split tables)

What I was addicted to when creating a split table with BigQuery

This time we will create a table as a split & cluster table

Create table


from google.cloud import bigquery

# project_id = 'YourProjectID'
# client = bigquery.Client(project=project_id)
# dataset_name = "demo"
# dataset_id = "{}.{}".format(client.project, dataset_name)

#Decide on a table name
table_name = "demo_transaction"
table_id = "{}.{}.{}".format(client.project, dataset_id, table_name)

#Use the schema defined above
table = bigquery.Table(table_id, schema=schema)

#Split table settings(Here ORDER_DT)
table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="ORDER_DT"
)
#Cluster table settings
table.clustering_fields = ["ITEM_CODE", "CUSTOMER_ID"]
table.description = "Demo Data"

#Execute table creation
table = client.create_table(table)

If you check it on the console on the web, you can see that it is defined like this

スクリーンショット 2019-11-30 15.51.17.png

Check table list

To check the list of tables, either confirm the DataSet name or specify the DataSet object. Can be confirmed at

# [demo]Check the table in the DataSet

#Pattern to check the table name by specifying the DataSet name
dataset_id = "demo"
for table in client.list_tables(dataset=dataset_id):
  print(table.table_id)

#Pattern to specify and confirm DataSet object
dataset_object = client.get_dataset("demo")
for table in client.list_tables(dataset=dataset_object):
  print(table.table_id)

Load data into table

Data scientists frequently import / export data, so I want to understand this as well.

--Import local file --Read CSV --Read JSON --Import GCS file

Import local file

I will describe the pattern that stores the file in CSV and the two patterns that store it in JSON.

Read CSV file

I think it's common to store data in CSV files, so I'll follow this pattern. However, CSV cannot be used because nested information cannot be expressed in CSV in a table with nested fields.

Try loading a table name that does not contain nested information as demo_transaciton

Import csv


#Specify a local file
filename = 'demo_transaction.csv'

#Data set name
detaset_id = "demo"
#Table name to be imported
table_id = "demo_transaction_csv"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

#Settings for importing
job_config = bigquery.LoadJobConfig()
#Specify that CSV is the source
job_config.source_format = bigquery.SourceFormat.CSV
#Skip the first line if the file contains headers
job_config.skip_leading_rows = 1

with open(filename, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

#Run
job.result()

By the way, if an error occurs for some reason, check the contents of the error with job.errors and reload it.

Read JSON file

Import table with nested data with json

The format that can be imported by json is decided, and it is necessary to contain data in the form of judging one record by line break as shown below

json format


{"TRANSACTION_ID":"t0001","ORDER_TS":"2019-11-02 12:00:00 UTC","ORDER_DT":"2019-11-02","ITEM_CODE":"ITEM001","ITEM_NAME":"YYYYY1","QUANTITY":"29","AMOUNT":2200,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[{"TAG_ID":"XXX1", "TAG_NAME":"XYZ1"},{"TAG_ID":"XXX2", "TAG_NAME":"XYZ2"}]}
{"TRANSACTION_ID":"t0002","ORDER_TS":"2019-11-03 12:00:00 UTC","ORDER_DT":"2019-11-03","ITEM_CODE":"ITEM002","ITEM_NAME":"YYYYY2","QUANTITY":"35","AMOUNT":5700,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[]}
{"TRANSACTION_ID":"t0003","ORDER_TS":"2019-11-04 12:00:00 UTC","ORDER_DT":"2019-11-04","ITEM_CODE":"ITEM003","ITEM_NAME":"YYYYY3","QUANTITY":"48","AMOUNT":4200,"DISCOUNT":0,"CUSTOMER_ID":"F0002","ITEM_TAG":[{"TAG_ID":"XXX3", "TAG_NAME":"XYZ3"}]}

If there is jsonized data in such a state, the local file can be imported like this

Import json



#Specify a local file name
filename = 'demo_transaction.json'

#Data set name
detaset_id = "demo"
#Table name with nested information
table_id = "demo_transaction"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

job_config = bigquery.LoadJobConfig()
#I will tell you that json is the original file
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON

with open(filename, "rb") as source_file:
    job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

#Run
job.result()

By the way, since it is nested data, it looks like this when viewed on the console

スクリーンショット 2019-11-30 15.46.54.png

-reference- Gist: Try Google BigQuery JSON input lightly

Import GCS file

There are times when you want to import data into BigQuery as a local file, but since you are using GCP, you should make full use of GCS.

So, check how to import the data in GCS

It's nice that you don't have to call GCS related libraries as long as you know the path of the files stored in GCS

An example assuming that the CSV file is imported into the demo_transaction_csv table ↓

Load from GCS


#Specify dataset and table name
detaset_id = "demo"
table_id = "demo_transaction_csv"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

#Since CSV is loaded, various settings
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV

#Specify the path where the GCS file is located
uri = "gs://{yourbacketname}/demo_transaction.csv"

#Generate job
load_job = client.load_table_from_uri(
    uri, table_ref, job_config=job_config
) 

#Run load
load_job.result()

(Maybe Evil) Push DataFrame into BQ with pandas function

Although it is not an official API function, you can also insert the data of pd.DataFrame into the BigQuery table using the function on the pandas side.

It is possible to insert it additionally into the existing table, but I feel that it is often used to export the DataFrame after various processing as a new table.

As an example, pull out a part of the data of demo.demo_transaction_csv created earlier and write it out as another table.

to_Export DataFrame with gbq


#Prepare a query to get a part of the data
query = """
    SELECT 
        TRANSACTION_ID
        , ORDER_TS
        , ITEM_CODE
        , QUANTITY
        , AMOUNT
        
    FROM 
        `{YourProjectID}.demo.demo_transaction_csv` 
    LIMIT 200
    ;
    """
#Generate query job
query_job = client.query(
    query, location='US'
)

#Receive results in dataframe
df = query_job.to_dataframe()

#Data frame[demo_transaciton_csv_extracted]Export with the name of
# if_exists:append ->If there is already a table, add it, if not, create a new one
# if_exists:fail ->Fail if there is already a table, create a new one if not
# if_exists:replace ->If there is already a table, replace it, if not, create a new one

detaset_id = "demo"
table_id = "demo_transaciton_csv_extracted"

df.to_gbq(destination_table='{dataset}.{table}'.format(dataset=dataset_id, table=table_id),project_id=project_id, if_exists='append')

Make sure that Import is working

Check if the table contains data


detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)

table_id = "demo_transaciton_csv_extracted"
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)
print("Table has {} rows".format(table.num_rows))

> Table has 200 rows

Import pd.DataFrame data with API native function

I wrote the evil method first, but it is also possible to insert a DataFrame with the API

Schema can be executed without defining Schema in the sample code

pd.Import DataFrame


import pandas as pd

detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)

table_id = "demo_pandas_data"
table_ref = dataset_ref.table(table_id)

#Appropriately pd.Create DataFrame data
rows = [
    {"item_id": "xx1", "quantity": 1},
    {"item_id": "xx2", "quantity": 2},
    {"item_id": "xx3", "quantity": 3},
]

dataframe = pd.DataFrame(
    rows,
    columns=["item_id", "quantity"]
)

#Define schema(You can import without it, but it is safer to write it)
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("item_id", "STRING"),
        bigquery.SchemaField("quantity", "INTEGER"),
    ],
)

# pd.Store DataFrame data in a table
job = client.load_table_from_dataframe(
    dataframe,
    table_ref,
    job_config=job_config,
    location="US",
)
#Run
job.result()

Import existing table data

Evilly pulling data from an existing table via DataFrame and writing it as a new table, but basically I want to implement it with official functions

--Extract the information of the existing table using the API function and write a new one --Write a new one directly in the query

Write query results with API native functionality

When writing using the API function, just specify a new table name in QueryJobConfig.destination.

**simple! !! !! ** **

Write the query result by specifying the write table


#Generate job config
job_config = bigquery.QueryJobConfig()

detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)

#Define the table name to write to
table_id = "demo_transaciton_csv_direct_extracted"
table_ref = dataset_ref.table(table_id)

# (important)Specify the table to write to
job_config.destination = table_ref

#Define the query to write to
query = """
    SELECT 
        TRANSACTION_ID
        , ORDER_TS
        , ITEM_CODE
        , QUANTITY
        , AMOUNT
    FROM 
        `{YourProjectID}.demo.demo_transaction_csv` 
    LIMIT 300
    ;
    """

#Generate query job
query_job = client.query(
    query,
    location="US",
    job_config=job_config,
)

#Run
query_job.result()

Pattern created by query (CREATE TABLE [TABLE_NAME] AS SELECT)

I feel that the pattern of defining a new table with QueryJobConfig.destination is sufficient, but I also follow the familiar method (CREATE TABLE ~ AS SELECT).

After all, I use it unexpectedly. .. ..

Create a new table with a query


detaset_id = "demo"
#Define the table name to write to
table_id = "demo_transaciton_csv_as_select"

#Define the query to write to
query = """
    DROP TABLE IF EXISTS {dataset}.{table} ;
    CREATE TABLE {dataset}.{table} AS 
    SELECT 
        TRANSACTION_ID
        , ORDER_TS
        , ITEM_CODE
        , QUANTITY
        , AMOUNT
        
    FROM 
        `{YourProjectID}.demo.demo_transaction_csv` 
    LIMIT 400
    ;
    """.format(dataset=dataset_id, table=table_id)

#Generate query job
job_config = bigquery.QueryJobConfig()
query_job = client.query(
    query,
    location="US",
    job_config=job_config,
)

#Run(Of course nothing is returned but it is written properly)
query_job.result()

This should cover all the ways to import data. .. ..

Creating a split table

BigQuery is a column-based pay-as-you-go format

  1. Even if you set Limit, the billing amount does not change
  2. The billing amount does not change even if you narrow down the conditions with Where
  3. You will be charged for each additional column

There is a service feature called

It doesn't matter as long as the data volume is small (1TB per month is free for queries), but when dealing with data of several tens of TB or more, you need to be careful.

Then what should I do?

  1. ** Set up a split table **
  2. ** Set up the cluster table **

Is the basic coping method

A table with several TB data should always contain some time series information, so set that information as the field to be split and create a split table.

Note that you cannot change it later unless you define it as a split table when creating the table.

-reference- Overview of partitioned tables

Pattern to put split option in table definition

First, describe the pattern to set the split option at the stage of creating the table.

Set split options


#Describe table definition (time series column is required)
schema = [
    bigquery.SchemaField('TRANSACTION_ID', 'STRING', mode='REQUIRED', description='Purchase history ID'),
    bigquery.SchemaField('ORDER_TS', 'TIMESTAMP', mode='REQUIRED', description='Purchase time'),
    bigquery.SchemaField('ORDER_DT', 'DATE', mode='REQUIRED', description='Purchase date'),
]

detaset_id = "demo"
table_id = "demo_transaction_time_partition1"
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)

#Table object creation
table = bigquery.Table(table_ref, schema=schema)

#Set split options
table.time_partitioning = bigquery.TimePartitioning(
    #Divided by day
    type_=bigquery.TimePartitioningType.DAY,
    #Set the target field
    field="ORDER_DT"
)
table.description = "Time Partition Data"

#Create a split table
table = client.create_table(table)

Pattern created by query (CREATE TABLE [TABLE_NAME] AS SELECT)

You can also create a split table from an existing table with CREATE TABLE [TABLE_NAME] AS SELECT

The best use is when ** recreating a bloated table that has not been set as a split table **

ʻAS SELECT is preceded by PARTITION BY [Time Partition Field] `

Table creation query with split option


detaset_id = "demo"
#Define the table name to write to
table_id = "demo_transaciton_csv_as_select_time_partition"

query = """
    DROP TABLE IF EXISTS {dataset}.{table} ;
    CREATE TABLE {dataset}.{table} 
    PARTITION BY
    ORDER_DT
    AS 
    SELECT 
        TRANSACTION_ID
        , ORDER_TS
        , ORDER_DT
        , ITEM_CODE
        , QUANTITY
        , AMOUNT
        
    FROM 
        `{YourProjectID}.demo.demo_transaction_csv` 
    LIMIT 500
    ;
    """.format(dataset=dataset_id, table=table_id)

#Generate query job
query_job = client.query(
    query,
    location="US"
)
#Run
query_job.result()

Easy! !!

Creating a cluster table

You can set more cluster fields in the split table

Only the cluster field is specified in the option of the split table, so it is excerpted and described.

-reference- Creating and using clustered tables (https://cloud.google.com/bigquery/docs/creating-clustered-tables?hl=ja)

Please refer to the following for the effect of setting the cluster option.

[BigQuery] Clustered Table Survey

Pattern to put cluster options in table definition

Specify additional cluster fields


"""Cluster table must be a split table
table = bigquery.Table(table_ref, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="ORDER_DT"
)
"""
table.clustering_fields = ["ITEM_CODE", "CUSTOMER_ID"]

Pattern created by query (CREATE TABLE [TABLE_NAME] AS SELECT)

Even when specifying in SQL, just add the cluster option CLUSTER BY

Excerpt from the cluster option addition part


query = 
    """
    DROP TABLE IF EXISTS {dataset}.{table} ;
    CREATE TABLE {dataset}.{table} 
    PARTITION BY
    ORDER_DT
    CLUSTER BY
    ITEM_CODE, CUSTOMER_ID
    AS 
    SELECT 
        *
    FROM 
        `{YourProjectID}.demo.demo_transaction_csv` 
    LIMIT 500
    ;
    """.format(dataset=dataset_id, table=table_id)

Export table data

Huh. .. .. Finally, the part to put the data is over. .. ..

Next is the Export part, but the method of exporting the table itself is basically to export to GCS

Export to GCS

Export the contents of the table by specifying the GCS bucket If you do not specify job_config, it will be written as a csv file.

Normally it will be csv, so tables containing nested columns cannot be exported with csv

Export in csv format



#Specify the table to be exported
detaset_id = "demo"
dataset_ref = client.dataset(dataset_id)

table_id = "demo_transaciton_csv"
table_ref = dataset_ref.table(table_id)

#Store files in specified bucket
bucket_name = "{Your Bucket Name}"
output_name = "{}.csv".format(table_id)
destination_uri = "gs://{}/{}".format(bucket_name, output_name)

#Generate export job
extract_job = client.extract_table(
    table_ref,
    destination_uri,
    location="US",
)
#Run
extract_job.result()

Compress the file and export

If you export the table as it is, the amount of data will be large as it is, so I want to set the compression option

ʻExtractJobConfig` to set output options and compress

You can control whether to write the header by setting the print_header option (default is True)

Added compression option(Compress with gzip)


destination_uri = "gs://{YourBucket}/{filename}.gz"
job_config = bigquery.ExtractJobConfig(
  compression="GZIP",
  print_header=True
)

#Generate export job
extract_job = client.extract_table(
    table_ref,
    destination_uri,
    job_config=job_config,
    location="US",
)

#Run
extract_job.result()

Export table with nested data with json (avro)

If there is a nested column, it cannot be exported with csv, so export it with json or avro

Compressable with json, but avro does not support compression options

Nested table in json or Avro


output_name = "{}.json".format(table_id)
destination_uri = "gs://{}/{}".format(bucket_name, output_name)

#Export with json(Do not output header)
job_config = bigquery.ExtractJobConfig(
  destination_format = "NEWLINE_DELIMITED_JSON",
  print_header = False
)

#Run
extract_job = client.extract_table(
    table_ref,
    destination_uri,
    job_config=job_config,
)

extract_job.result()

Export with tsv

By the way, the default is csv, but you can also export with tsv

Set tsv option


# job_Add delimiter option to config
job_config = bigquery.ExtractJobConfig(
    field_delimiter="\t"
)

Delete table

If you want to delete the table, just specify the table name

Delete table


# from google.cloud import bigquery
# project_id = 'YourProjectID'
# client = bigquery.Client(project=project_id)

detaset_id = "{YourDataSetId}"
dataset_ref = client.dataset(dataset_id)

table_id = "{YourTableId}"
table_ref = dataset_ref.table(table_id)

#Delete table
client.delete_table(table_ref)

This is the end of the story around table creation

Recommended Posts

[BigQuery] How to use BigQuery API for Python -Table creation-
How to use OpenPose's Python API
[Python] How to use Typetalk API
[Python] Organizing how to use for statements
How to use "deque" for Python data
python3: How to use bottle (2)
[Python] How to use list 1
How to use Python argparse
Python: How to use pydub
[Python] How to use checkio
[Python] How to use input ()
How to use Python lambda
[Python] How to use virtualenv
python3: How to use bottle (3)
python3: How to use bottle
How to use Python bytes
[For beginners] How to use say command in python!
[Python] How to use the graph creation library Altair
How to use Service Account OAuth and API with Google API Client for python
Python: How to use async with
[Python] How to use Pandas Series
How to use Requests (Python Library)
How to use SQLite in Python
[Python] How to use list 3 Added
How to use Mysql in python
How to use ChemSpider in Python
How to use FTP with Python
Python: How to use pydub (playback)
How to use PubChem in Python
How to use bing search api
How to use python zip function
[python] How to use the library Matplotlib for drawing graphs
How to use machine learning for work? 03_Python coding procedure
I didn't know how to use the [python] for statement
[Python] Summary of how to use pandas
[Introduction to Python] How to use class in Python?
How to install and use pandas_datareader [Python]
[python] How to use __command__, function explanation
How to work with BigQuery in Python
How to use Python Kivy (reference) -I translated Kivy Language of API reference-
[Python] How to use import sys sys.argv
[Introduction to Python] How to use the in operator in a for statement?
How to display multiplication table in python
Memorandum on how to use gremlin python
[Python2.7] Summary of how to use unittest
How to use an external editor for Python development with Grasshopper
python: How to use locals () and globals ()
How to use __slots__ in Python class
How to use Pylint for PyQt5 apps
How to use Python zip and enumerate
[Python] Understand how to use recursive functions
Summary of how to use Python list
How to use GCP's Cloud Vision API
How to use regular expressions in Python
[Python2.7] Summary of how to use subprocess
How to use fingerprint authentication for KDE
How to use is and == in Python
[Blender x Python] How to use modifiers
[Question] How to use plot_surface of python
How to use MkDocs for the first time
[Python] How to use two types of type ()