Manipuler des tables BigQuery à partir d'un client Python

Préparation préalable

1. Obtention d'un identifiant

Créez un compte de service pour utiliser l'API BigQuery à l'avance et téléchargez les informations d'identification (JSON). https://cloud.google.com/docs/authentication/getting-started?hl=ja

2. Installation des packages requis

Pipfile


[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"

[requires]
python_version = "3.8"

Recréer la table (Drop⇒Create)

1. Initialisation du client

migrate_table.py


import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

Pour «[PATH TO CREDENTIAL]», spécifiez le chemin JSON des informations d'identification.

2. Régénérez la table

migrate_table.py


table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

#Supprimer la table existante
client.delete_table(table, not_found_ok=True)

#Créer une table
schema = [
    bigquery.SchemaField('id', 'INT64'),
    bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))

Pour [DATASET NAME] et [TABLE NAME], spécifiez le nom de l'ensemble de données et le nom de la table de la destination de création.

Si l'indicateur not_found_ok est False lors de la suppression d'une table existante, une erreur se produira si la table n'existe pas, alors laissez-la sur True. Une image comme DROP TABLE IF EXISTS dans DDL.

Vous devez spécifier le nom et le type de colonne comme définition de schéma lors de la création de la table. Par défaut, la colonne est Nullable, donc si vous voulez en faire une colonne Not Null, spécifiez le mode.

bigquery.SchemaField('id', 'INT64', mode='REQUIRED')

ça ira.

3. Importer des données

Importez les données CSV en tant que données initiales. Préparez à l'avance les données correspondant au type de définition de schéma dans CSV.

import.csv


id, name
1, hogehoge
2, fugafuga

migrate_table.py


#Importer les données initiales
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
    job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()

Si vous souhaitez ignorer l'en-tête CSV, vous pouvez spécifier le nombre de lignes ignorées avec skip_leading_rows.

Afficher la régénération

migrate_view.py


import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)

Le flux des vues est presque le même que celui des vraies tables. La différence est que dans le cas d'une vue, la requête SQL est spécifiée directement dans view_query. La définition de schéma est automatiquement construite à partir de requêtes SQL et n'a pas besoin d'être spécifiée.

Planifier la mise à jour

Lorsque vous souhaitez modifier la requête d'exécution et les paramètres de planification dans une scène où les données sont régulièrement actualisées à l'aide de la requête planifiée.

migrate_schedule.py



import json

from google.cloud import bigquery
from google.oauth2 import service_account

from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery_datatransfer_v1.DataTransferServiceClient(
    credentials=credentials
)

config = google.protobuf.json_format.ParseDict(
    {
        "name": '[RESOURCE NAME]',
        "destination_dataset_id": '[DATASET NAME]',
        "display_name": '[DISPLAY NAME]',
        "data_source_id": "scheduled_query",
        "params": {
            "query": "SELECT * FROM dataset_name.table_name",
            "destination_table_name_template": '[TABLE NAME]',
            "write_disposition": "WRITE_TRUNCATE",
            "partitioning_field": "",
        },
        "schedule": "every 24 hours",
    },
    bigquery_datatransfer_v1.types.TransferConfig(),
)

update_mask = {"paths": ["display_name", "params", "schedule"]}

response = client.update_transfer_config(
    config, update_mask 
) 

Je passerai les paramètres de planification (config) et le masque de mise à jour (ʻupdate_mask) à ʻupdate_transfer_config. ʻUpdate_mask spécifie le champ à mettre à jour dans config. Les paramètres qui ne sont pas inclus dans ʻupdate_mask ne seront pas mis à jour.

Les détails de la valeur de réglage de config sont les suivants.

name Spécifiez le nom de la ressource de la planification. Le nom de la ressource peut être confirmé à partir des informations de configuration de la planification correspondante, car la liste des planifications est affichée à partir de "Requête planifiée" de la console BigQuery. bigquery.png

destination_dataset_id Spécifiez le nom de l'ensemble de données pour enregistrer les résultats de la requête exécutés dans la planification.

display_name Puisqu'il s'agit du nom d'affichage de la planification, donnez-lui n'importe quel nom.

params.query Spécifie la requête à exécuter.

params.destination_table_name_template Spécifiez le nom de la table dans laquelle les résultats de la requête exécutés dans la planification sont enregistrés.

params.write_disposition Spécifie comment enregistrer dans la table. Si vous spécifiez WRITE_TRUNCATE, la table existante sera remplacée par le résultat de la requête d'exécution. Si WRITE_APPEND est spécifié, le résultat de la requête d'exécution sera ajouté à la table existante.

schedule Spécifiez le moment d'exécution de la planification. ex. Exécuter toutes les heures ・ ・ ・ toutes les 1 heure Courir à minuit tous les jours ... tous les jours 00h00

en conclusion

Lorsque vous utilisez BigQuery comme magasin de données, il est difficile de gérer les modifications si vous créez une table à partir de la console. Il est donc recommandé de la gérer avec git si vous la déposez dans le code.

Recommended Posts

Manipuler des tables BigQuery à partir d'un client Python
Manipuler riak depuis python
Comment lancer AWS Batch à partir de l'application cliente Python
Toucher les objets Python d'Elixir
python / Créer un dict à partir d'une liste.
Un client HTTP simple implémenté en Python
Vider les tables BigQuery dans GCS à l'aide de Python
"Première recherche élastique" commençant par un client python
Appeler des commandes depuis Python (édition Windows)
Envoyer un message de Slack à un serveur Python
Exécuter des scripts Python à partir d'applications C # GUI
Modifier Excel à partir de Python pour créer un tableau croisé dynamique
Comment ouvrir un navigateur Web à partir de python
Créer un tableau C à partir d'une feuille Python> Excel
[Python] La route du serpent (6) Manipuler les pandas
Un mémorandum sur l'appel de Python à partir de Common Lisp
Créer une nouvelle tâche Todoist à partir d'un script Python
Comment générer un objet Python à partir de JSON
sql à sql
"Kit Python" qui appelle des scripts Python depuis Swift
MeCab de Python
EC2 (Python3) -> BigQuery
Créer un arbre de décision à partir de 0 avec Python (1. Présentation)
Appel de scripts Python à partir de Python intégré en C ++ / C ++
Créer un objet datetime à partir d'une chaîne en Python (Python 3.3)
Exécutez des fichiers Python à partir de HTML en utilisant Django
Lire ligne par ligne à partir d'un fichier avec Python
Exécutez des scripts Python à partir d'Excel (en utilisant xlwings)
Extraire des données d'une page Web avec Python
Une application cliente python qui télécharge et supprime des fichiers de S3 en spécifiant un compartiment
BigQuery-Python s'est avéré utile lors de l'utilisation de BigQuery à partir de Python
Faire une copie d'un fichier Google Drive à partir de Python
Recevez des données de dictionnaire à partir de programmes Python avec AppleScript
Manipuler des fichiers Excel à partir de python avec xlrd (mémo personnel)
J'ai essayé d'exécuter python à partir d'un fichier chauve-souris
Points Python du point de vue d'un programmeur en langage C
D'un livre que les programmeurs peuvent apprendre ... (Python): Pointer
Étapes de l'installation de Python 3 à la création d'une application Django
De l'achat d'un ordinateur à l'exécution d'un programme sur python
Envisagez la conversion de Python récursif en non récursif
Script Python qui crée un fichier JSON à partir d'un fichier CSV
[Python] Comment appeler une fonction de c depuis python (édition ctypes)
[Python] Démarrez un fichier de commandes à partir de Python et passez des variables.
Utilisez Thingsspeak de Python
Algorithme A * (édition Python)
Touchez MySQL depuis Python 3
[Python] Prenez une capture d'écran
Exploitez Filemaker depuis Python
Utiliser fluentd de python
Configuration du client Python StatsD
Créer un module Python
Accéder à bitcoind depuis python
Changements de Python 3.0 à Python 3.5
Changements de Python 2 à Python 3.0
Python depuis ou import
Utilisez MySQL depuis Python
Exécutez Python à partir d'Excel