Ce que je voulais faire, c'était importer les informations de la feuille de calcul dans BigQuery et mettre à jour les données de différence.
À l'origine, le processus était exécuté selon le flux suivant.
Cependant, dans ce cas, les données n'étaient souvent pas mises à jour même si les données étaient insérées immédiatement après la création d'une nouvelle table.
Lors de l'exécution de SQL, bien que l'on puisse voir que l'exécution est terminée avec le statut de job id
,
Parfois, le décalage horaire était assez grand et je pouvais l'insérer avec le sommeil,
Je cherchais une méthode stable car je ne pouvais pas le faire.
Il a été résolu en changeant le flux ci-dessus en flux suivant.
Pour un traitement qui crée une table et télécharge les données séparément, mais en même temps Quand je l'ai changé, ça a bien fonctionné. C'est naturel de demander, mais je ne savais pas comment l'écrire, alors comment après avoir créé le tableau Je regardais juste les méthodes de "comment saisir l'achèvement de l'exécution?" Et "comment effectuer le traitement en attente?".
Cette fois, seules les données de différence des informations mises à jour dans la feuille de calcul J'écris le contenu pour réfléchir.
Les données de la feuille de calcul sont disponibles ci-dessous.
Nom japonais sur la première ligne, colonne pour BigQuery sur la deuxième ligne, type de données sur la troisième ligne, C'est une structure de données que les données réelles sont contenues dans la 4ème ligne et les suivantes.
Je voulais vraiment inclure les paramètres PK, mais Cette fois, il n'est pas inclus comme objectif, donc si l'ID dans la colonne A correspond, il sera mis à jour S'ils ne correspondent pas, ils seront mis à jour.
La table BigQuery sera mise à jour comme suit.
J'ai pratiqué les quatre choses suivantes.
① Créer une table en exécutant SQL ⇒ Mettre à jour ② Créez une table avec google-cloud-bigquery ⇒ Mise à jour à partir de pandas ③ Créer / mettre à jour une table avec pandas-gbq ④ Créer / mettre à jour une table avec une requête google-cloud-big
① et ② sont des modèles qui ne peuvent pas être téléchargés après la création d'une table dans le flux initialement prévu. ③ et ④ sont des modèles qui ont réussi en téléchargeant des données en même temps que la création de la table.
Je pense que certaines personnes ont les mêmes problèmes, j'ai donc énuméré ① et ② également.
Vous trouverez ci-dessous l'introduction du code.
Authentification
from google.colab import auth
auth.authenticate_user()
Lorsque vous exécutez ce qui précède, le code d'authentification suivant s'affiche.
Un autre article montre également comment le faire sans code d'autorisation, Si vous souhaitez fusionner avec des données de tables de différents PJ Je pense que vous devriez entrer le code de vérification à chaque fois.
Référence: Comment exécuter avec Google Colaboratory sans code d'autorisation
L'opération de feuille de calcul et l'opération BigQuery sont écrites dans un article séparé, les détails sont donc omis. Comment utiliser les feuilles de calcul en Python (comment utiliser gspread) Comment utiliser BigQuery avec Python
Se préparer à utiliser la feuille de calcul
!pip install gspread
from google.colab import auth
from oauth2client.client import GoogleCredentials
import gspread
#Processus d'authentification
auth.authenticate_user()
gc = gspread.authorize(GoogleCredentials.get_application_default())
ss_id = `ID de la feuille de calcul`
sht_name = 'Nom de la feuille'
workbook = gc.open_by_key(ss_id)
worksheet = workbook.worksheet(sht_name)
values = worksheet.get_all_values()
values
# [['ID', 'Nom', 'âge', 'Choses favorites'],
# ['id', 'name', 'age', 'favorit'],
# ['int64', 'string', 'int64', 'string'],
# ['1', 'Bob', '25', 'apple'],
# ['2', 'Tom', '32', 'orange'],
# ['3', 'Jay', '28', 'grape']]
Prêt à utiliser BigQuery
project_id = 'ID du projet GCP'
dateset = 'Nom du jeu de données'
table_name = 'nom de la table'
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
Jusque-là, ce sera la partie partagée entre ① et ④.
Je ne sais pas si l'exécution de l'expression SQL est correcte, C'est une méthode qui peut être exécutée avec le même SQL que celui utilisé avec l'éditeur de requêtes.
Je pense que le mérite est que vous pouvez voir le statut par «facile à écrire» et «ID de poste».
Création SQL pour la création de table temporaire
table_id = f'{project_id}.{dateset}.{table_name}'
query = f'create or replace table `{table_id}_temp`(\n'
for column, dtype in zip(values[1],values[2]):
query += f'{column} {dtype},\n'
query = query[:-2] + ')'
query
# create or replace table `{project_id}.{dateset}.{table_name}_temp`(
# id int64,
# name string,
# age int64,
# favorit string)
Exécution SQL
#Courir
result = client.query(query)
#Vérifier l'état du travail
client.get_job(result.job_id, location='asia-northeast1').state
# 'DONE'
La méthode de création du tableau jusqu'à présent est différente entre ① et ③, Le SQL pour mettre à jour les différences suivantes est couramment utilisé.
Création SQL pour mise à jour
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
query
# merge into `{project_id}.{dateset}.{table_name}` as T
# using `{project_id}.{dateset}.{table_name}_temp` as S
# on T.id = S.id
# when matched then
# update set
# name = S.name,
# age = S.age,
# favorit = S.favorit
# when not matched then
# insert(id, name, age, favorit)
# values(id, name, age, favorit)
Exécution SQL pour la mise à jour
client.query(query)
Après avoir tout exécuté, lorsque vous regardez le résultat de la requête sur BigQuery Il a réussi comme le montre l'image ci-dessous.
Cependant, si vous regardez le tableau, il sera dans un état vide comme indiqué ci-dessous. Le statut du travail est "DONE", veuillez donc le refléter ...
Les données sont reflétées après environ 5 minutes en raison d'un problème de serveur. C'était inutile car je voulais le faire fonctionner en continu immédiatement.
Il s'agit d'un cas où les informations de la feuille de calcul sont téléchargées avec le contenu répertorié ici. https://googleapis.dev/python/bigquery/latest/usage/tables.html
Supprimer la table et créer un schéma
table_id = f'{project_id}.{dateset}.{table_name}'
client = bigquery.Client(project = project_id)
#Supprimer s'il y a une table
client.delete_table(f'{table_id}_temp', not_found_ok=True)
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column, dtype, mode="NULLABLE"))
schema
# [SchemaField('id', 'INT64', 'NULLABLE', None, ()),
# SchemaField('name', 'STRING', 'NULLABLE', None, ()),
# SchemaField('age', 'INT64', 'NULLABLE', None, ()),
# SchemaField('favorit', 'STRING', 'NULLABLE', None, ())]
Créer une table temporaire
table = bigquery.Table(f'{table_id}_temp', schema=schema)
client.create_table(table)
Ce qui suit est le même code que ①, il est donc résumé.
Créer et exécuter SQL pour la mise à jour
df = pd.DataFrame(values[3:], columns=values[1])
#Suppression des blancs, traitement des blancs
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
#Mettre à jour la table temporaire
client.insert_rows(table, df.values.tolist())
#Création de SQL pour la mise à jour
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1][1:]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
#Exécutez SQL pour mettre à jour les données
client.query(query)
Même avec cela, la probabilité qu'une table de person_temp
soit créée était faible comme dans ①.
J'utilisais beaucoup ce code lors de la liaison avec DP,
À ce moment-là, je m'endors environ 20 secondes pour que ça marche.
Je ne m'attendais pas à grand-chose car certains autres blogs disaient que c'était mal. Je suis content de l'avoir essayé.
Créer un DataFrame
#Nécessite une installation pour utiliser
!pip install pandas-gbq
df = pd.DataFrame(values[3:], columns=values[1])
#Suppression des blancs, traitement des blancs
df = df.dropna(subset=[values[1][0]])
df = df.replace({'': None})
La création de ce schéma crée un type de dictionnaire de «nom» et «type».
Créer un schéma
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append({'name': column, 'type': dtype})
schema
# [{'name': 'id', 'type': 'int64'},
# {'name': 'name', 'type': 'string'},
# {'name': 'age', 'type': 'int64'},
# {'name': 'favorit', 'type': 'string'}]
Mettre à jour vers la table temporaire
df.to_gbq(f'{dateset}.{table_name}_temp', project_id, if_exists='replace', table_schema=schema)
Créer et exécuter SQL pour la mise à jour
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
J'ai pu mettre à jour ici sans aucun problème.
Cependant, si vous regardez le site ci-dessous, il est dit que vous devriez passer de pandas-gbq
à google-cloud-bigquery
.
https://cloud.google.com/bigquery/docs/pandas-gbq-migration?hl=ja
Donc, ensuite, j'ai décrit la méthode en utilisant google-cloud-bigquery
.
④ Créer / mettre à jour une table avec une requête google-cloud-big
Installez pyarrow
#Installez pyarrow
!pip install pyarrow
table_id = f'{project_id}.{dateset}.{table_name}'
df = pd.DataFrame(values[3:], columns=values[1])
#Suppression des blancs, traitement des blancs
df = df.replace({'': None})
df = df.dropna(subset=[values[1][0]])
Création d'un schéma et modification du type de données d'un DataFrame
schema = []
for column, dtype in zip(values[1],values[2]):
schema.append(bigquery.SchemaField(column,dtype))
if dtype != 'string':
df[column] = df[column].astype(dtype)
schema
google-cloud-bigquery est différent de pandas-gbq Une erreur se produit si les types de données de la table et DataFrame ne correspondent pas. Le cadre officiel est plus fin ...
Vous pouvez vérifier si le type de données a changé avec df.dtpyes.
dtypes
df.dtypes
# id int64
# name object
# age int64
# favorit object
# dtype: object
Ensuite, effectuez les réglages à effectuer avec load_table_from_dataframe ()
pour être exécutés plus tard.
«config» signifie «config» en japonais.
Vous trouverez ci-dessous une liste d'attributs pouvant être définis. https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.LoadJobConfig.html
Cette fois, schema
et write_disposition
seront définis pour écraser la table existante lorsqu'elle existe.
paramètres Jon
job_config = bigquery.LoadJobConfig(
schema = schema,
write_disposition = 'WRITE_TRUNCATE'
)
Télécharger les données df
job = client.load_table_from_dataframe(
df, f'{table_id}_temp', job_config=job_config
)
# Wait for the load job to complete.
job.result()
Ce qui suit est le même code que ①, il est donc résumé.
Créer et exécuter SQL pour la mise à jour
query = f'''merge into `{project_id}.{dateset}.{table_name}` as T
using `{project_id}.{dateset}.{table_name}_temp` as S
on T.{values[1][0]} = S.{values[1][0]}
when matched then
update set
'''
insert_query = ''
for val in values[1]:
query += f'{val} = S.{val},\n'
insert_query += f'{val}, '
insert_query = insert_query[:-2]
query = query[:-2] + '\n'
query += 'when not matched then\n'
query += f'insert({insert_query})\n'
query += f'values({insert_query})'
client.query(query)
Je pense que ③ est meilleur car il est plus facile de spécifier le type de données. ④ va-t-il plus vite?
Une autre chose qui m'inquiète, c'est quand la quantité de données augmente. Étant donné que la limite d'insertion dans la requête google-cloud-big était de 10 000 enregistrements Si vous voulez faire plus que cela, vous devrez peut-être effectuer chaque traitement.
Dans ce cas, remplacez la table uniquement la première fois. A partir de la deuxième fois, il ne semble y avoir aucun problème si le processus est inséré.
Recommended Posts