Create a service account to use the BigQuery API in advance and download the credentials (JSON). https://cloud.google.com/docs/authentication/getting-started?hl=ja
Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
migrate_table.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
For [PATH TO CREDENTIAL]
, specify the JSON path of the credential.
migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
#Delete existing table
client.delete_table(table, not_found_ok=True)
#Creating a table
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
For [DATASET NAME]
and [TABLE NAME]
, specify the data set name and table name to create.
If the not_found_ok
flag is False
when deleting an existing table, an error will occur if the table does not exist, so leave it as True
. An image like DROP TABLE IF EXISTS
in DDL.
When creating a table, you need to specify the column name and type as the schema definition. By default, it is a Nullable column, so if you want to make it a Not Null column, specify the mode.
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
will do.
Import CSV data as initial data. Prepare the data according to the schema definition type in CSV in advance.
import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
#Import initial data
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
If you want to skip the CSV header, you can specify the number of skipped rows with skip_leading_rows
.
migrate_view.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
In the case of a view, the flow is almost the same as that of an entity table.
The difference is that in the case of a view, specify the SQL query directly in view_query
.
The schema definition is automatically constructed from SQL queries and does not need to be specified.
When you want to change the execution query and scheduling settings in a scene where data is regularly refreshed using Scheduled Query.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
I will pass the schedule settings (config
) and update mask (ʻupdate_mask) to ʻupdate_transfer_config
.
ʻUpdate_mask specifies which field to update in
config. Settings that are not included in ʻupdate_mask
will not be updated.
The details of the setting value of config
are as follows.
name Specify the resource name of the schedule. The resource name is displayed in a list of schedules from "Scheduled Query" in the BigQuery console, so you can check it from the configuration information of the corresponding schedule.
destination_dataset_id Specify the name of the data set to save the query results executed in the schedule.
display_name Since it is the display name of the schedule, give it any name.
params.query Specifies the query to execute.
params.destination_table_name_template Specify the name of the table where the query results executed in the schedule are saved.
params.write_disposition
Specifies how to save to the table.
If you specify WRITE_TRUNCATE
, the existing table is replaced with the result of the execute query.
If WRITE_APPEND
is specified, the result of the execute query will be added to the existing table.
schedule Specify the execution timing of the schedule. ex. Run every hour ... every 1 hour Run at midnight every day ... every day 00:00
When using BigQuery as a data mart, it is difficult to manage changes if you create a table from the console, so it is recommended that you can manage it with git if you drop it in the code.
Recommended Posts