Référence Luigi Reverse Veuillez vous référer à ce post J'ai créé une tâche appelée Local-> GCS-> BigQuery pour le traitement des résultats toutes les heures.
import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat
class LoadToGcs( luigi_bigquery.Query ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
def requires( self ):
#Emploi précédent. Sortie de fichiers localement au format TSV
return Calculate( time = self.time)
def output( self ):
path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
return GCSTarget( path, client = client )
def run( self ):
with self.input().open('r') as input:
results = pd.read_csv( input, sep='\t' )
with self.output().open('w') as output:
results.to_csv( output, index=False, encoding='utf8' )
class LoadToTable( BigqueryLoadTaskEx ):
time = luigi.DateHourParameter()
source_format = SourceFormat.CSV
#Je veux que tu ajoutes au tableau toutes les heures
write_disposition = WriteDisposition.WRITE_APPEND
create_disposition = CreateDisposition.CREATE_IF_NEEDED
max_bad_records = 0
skip_leading_rows = 1
def requires( self ):
#Lire des fichiers depuis GCS
return LoadToGcs( time = self.time, )
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def source_uris(self):
return [ self.input().path ]
def output(self):
return BigqueryTarget(
project_id = 'test_project',
dataset_id = 'test_dataset',
#Image du nom de la table: test_table_20161013
table_id = self.time.strftime( 'test_table' + '_%Y%m%d' ),
client = self.get_client()
)
BigqueryLoadTask
de Luigi a rendu le code vraiment simple et impressionné.
Pourquoi
** En raison des spécifications de BigQuery, il est recommandé d'ajouter _% Y% m% d
à la fin lors de la saisie de la date dans le nom de la table **
La raison en est qu'il est résumé dans une liste déroulante, il est donc très facile de le voir en termes d'interface utilisateur lorsque le nombre de tables est important.
Référence: http://tech.vasily.jp/entry/bigquery_data_platform
Pour une table par jour, un traitement supplémentaire sera effectué 24 fois.
Cependant, comme la sortie est BigqueryTarget
, si elle s'ajoute après 1 heure (si la table existe), elle est considérée comme ayant été exécutée et le travail se termine sans chargement.
write_disposition = WriteDisposition.WRITE_APPEND
Je pensais que ce serait une annexe si j'écrivais ceci, mais il est absolument nécessaire d'aller voir Target en premier en raison des spécifications de Luigi. (Bien sûr)
table_id = self.time.strftime( 'test_table' + '_%Y%m%d%H' )
La solution la plus rapide est de le mettre dans le nom de la table jusqu'au moment, mais le nombre de tables continue d'augmenter de 24 par jour. Je ne veux pas le faire, car cela ne tombe pas et cela devient assez sale sur BigQuery.
#Luigi ordinaire.Utiliser la tâche
class LoadToTable( luigi.Task ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def requires( self ):
return LoadToGcs( time = self.time, )
def run( self ):
#Utiliser BigqueryClient
bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
with open( self.schema_json, 'r' ) as f:
schema = json.load( f )
project_id = 'test_project'
dataset_id = 'test_dataset'
table_id = 'test_table'
job = {
'configuration': {
'load': {
'sourceUris': [
self.input().path
],
'schema': {
'fields': schema
},
'destinationTable': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
},
'sourceFormat': SourceFormat.CSV,
'writeDisposition': WriteDisposition.WRITE_APPEND,
#Puisque la première ligne des données d'origine est le nom de la colonne, ajoutez-le
'skipLeadingRows': 1,
'allowQuotedNewlines': 'true'
}
}
}
#Charger dans BigQuery
bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))
#Créer un fichier vide
if not self.dry:
self.output().open('w').close()
def output( self ):
output_path = os.path.join(
'/tmp/work', #Directeur de travail
self.time.strftime( '%Y-%m-%d' ), #Date
self.time.strftime( '%H' ), #temps
str( self ) #Nom de la tâche
)
return luigi.LocalTarget( output_path )
Target place simplement un fichier vide localement comme preuve de l'exécution du travail.
Ne comptez pas sur BigqueryLoadTask
C'est fait.
Recommended Posts