Créez un compte de service pour utiliser l'API BigQuery à l'avance et téléchargez les informations d'identification (JSON). https://cloud.google.com/docs/authentication/getting-started?hl=ja
Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
migrate_table.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
Pour «[PATH TO CREDENTIAL]», spécifiez le chemin JSON des informations d'identification.
migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
#Supprimer la table existante
client.delete_table(table, not_found_ok=True)
#Créer une table
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
Pour [DATASET NAME]
et [TABLE NAME]
, spécifiez le nom de l'ensemble de données et le nom de la table de la destination de création.
Si l'indicateur not_found_ok
est False
lors de la suppression d'une table existante, une erreur se produira si la table n'existe pas, alors laissez-la sur True
. Une image comme DROP TABLE IF EXISTS
dans DDL.
Vous devez spécifier le nom et le type de colonne comme définition de schéma lors de la création de la table. Par défaut, la colonne est Nullable, donc si vous voulez en faire une colonne Not Null, spécifiez le mode.
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
ça ira.
Importez les données CSV en tant que données initiales. Préparez à l'avance les données correspondant au type de définition de schéma dans CSV.
import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
#Importer les données initiales
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
Si vous souhaitez ignorer l'en-tête CSV, vous pouvez spécifier le nombre de lignes ignorées avec skip_leading_rows
.
migrate_view.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
Le flux des vues est presque le même que celui des vraies tables.
La différence est que dans le cas d'une vue, la requête SQL est spécifiée directement dans view_query
.
La définition de schéma est automatiquement construite à partir de requêtes SQL et n'a pas besoin d'être spécifiée.
Lorsque vous souhaitez modifier la requête d'exécution et les paramètres de planification dans une scène où les données sont régulièrement actualisées à l'aide de la requête planifiée.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
Je passerai les paramètres de planification (config
) et le masque de mise à jour (ʻupdate_mask) à ʻupdate_transfer_config
.
ʻUpdate_mask spécifie le champ à mettre à jour dans
config. Les paramètres qui ne sont pas inclus dans ʻupdate_mask
ne seront pas mis à jour.
Les détails de la valeur de réglage de config
sont les suivants.
name Spécifiez le nom de la ressource de la planification. Le nom de la ressource peut être confirmé à partir des informations de configuration de la planification correspondante, car la liste des planifications est affichée à partir de "Requête planifiée" de la console BigQuery.
destination_dataset_id Spécifiez le nom de l'ensemble de données pour enregistrer les résultats de la requête exécutés dans la planification.
display_name Puisqu'il s'agit du nom d'affichage de la planification, donnez-lui n'importe quel nom.
params.query Spécifie la requête à exécuter.
params.destination_table_name_template Spécifiez le nom de la table dans laquelle les résultats de la requête exécutés dans la planification sont enregistrés.
params.write_disposition
Spécifie comment enregistrer dans la table.
Si vous spécifiez WRITE_TRUNCATE
, la table existante sera remplacée par le résultat de la requête d'exécution.
Si WRITE_APPEND
est spécifié, le résultat de la requête d'exécution sera ajouté à la table existante.
schedule Spécifiez le moment d'exécution de la planification. ex. Exécuter toutes les heures ・ ・ ・ toutes les 1 heure Courir à minuit tous les jours ... tous les jours 00h00
Lorsque vous utilisez BigQuery comme magasin de données, il est difficile de gérer les modifications si vous créez une table à partir de la console. Il est donc recommandé de la gérer avec git si vous la déposez dans le code.
Recommended Posts