Précautions lors de la sortie vers une table BigQuery toutes les heures avec Luigi

Process-> Local-> Local to GCS-> Création d'une tâche appelée GCS to BigQuery

Référence Luigi Reverse Veuillez vous référer à ce post J'ai créé une tâche appelée Local-> GCS-> BigQuery pour le traitement des résultats toutes les heures.

import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat

class LoadToGcs( luigi_bigquery.Query ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    def requires( self ):
        #Emploi précédent. Sortie de fichiers localement au format TSV
        return Calculate( time = self.time)

    def output( self ):
        path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
        client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
        return GCSTarget( path, client = client )

    def run( self ):
        with self.input().open('r') as input:
            results = pd.read_csv( input,  sep='\t' )

        with self.output().open('w') as output:
            results.to_csv( output, index=False, encoding='utf8' )


class LoadToTable( BigqueryLoadTaskEx ):

    time               = luigi.DateHourParameter()

    source_format      = SourceFormat.CSV
    #Je veux que tu ajoutes au tableau toutes les heures
    write_disposition  = WriteDisposition.WRITE_APPEND
    create_disposition = CreateDisposition.CREATE_IF_NEEDED
    max_bad_records    = 0
    skip_leading_rows  = 1

    def requires( self ):
        #Lire des fichiers depuis GCS
        return LoadToGcs( time = self.time, )

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def source_uris(self):
        return [ self.input().path ]

    def output(self):
        
        return BigqueryTarget(
                project_id = 'test_project',
                dataset_id = 'test_dataset',
                #Image du nom de la table: test_table_20161013
                table_id   = self.time.strftime( 'test_table' + '_%Y%m%d' ),
                client     = self.get_client()
        )

BigqueryLoadTask de Luigi a rendu le code vraiment simple et impressionné.

Point de trébuchement

Seul minuit est téléchargé sur la table du jour

Pourquoi

Parce que la cible est une grande requête.

** En raison des spécifications de BigQuery, il est recommandé d'ajouter _% Y% m% d à la fin lors de la saisie de la date dans le nom de la table ** La raison en est qu'il est résumé dans une liste déroulante, il est donc très facile de le voir en termes d'interface utilisateur lorsque le nombre de tables est important. Référence: http://tech.vasily.jp/entry/bigquery_data_platform

Pour une table par jour, un traitement supplémentaire sera effectué 24 fois. Cependant, comme la sortie est BigqueryTarget, si elle s'ajoute après 1 heure (si la table existe), elle est considérée comme ayant été exécutée et le travail se termine sans chargement.

write_disposition  = WriteDisposition.WRITE_APPEND

Je pensais que ce serait une annexe si j'écrivais ceci, mais il est absolument nécessaire d'aller voir Target en premier en raison des spécifications de Luigi. (Bien sûr)

table_id   = self.time.strftime( 'test_table' + '_%Y%m%d%H' )

La solution la plus rapide est de le mettre dans le nom de la table jusqu'au moment, mais le nombre de tables continue d'augmenter de 24 par jour. Je ne veux pas le faire, car cela ne tombe pas et cela devient assez sale sur BigQuery.

Comment charger dans BigQuery à l'aide de BigqueryClient

Définissez Target sur local ou GCS pour créer un fichier vide


#Luigi ordinaire.Utiliser la tâche
class LoadToTable( luigi.Task ):

    time      = luigi.DateHourParameter()
    lconfig   = luigi.configuration.get_config()

    @property
    def schema_json( self ):
        return 'schemas/xxxx.json'

    def requires( self ):
        return LoadToGcs( time = self.time, )

    def run( self ):
        #Utiliser BigqueryClient
        bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
        with open( self.schema_json, 'r' ) as f:
            schema = json.load( f )

        project_id = 'test_project'
        dataset_id = 'test_dataset'
        table_id = 'test_table'

        job = {
            'configuration': {
                'load': {
                    'sourceUris': [
                        self.input().path
                    ],
                    'schema': {
                        'fields': schema
                    },
                    'destinationTable': {
                        'projectId': project_id,
                        'datasetId': dataset_id,
                        'tableId': table_id
                    },
                    'sourceFormat': SourceFormat.CSV,
                    'writeDisposition': WriteDisposition.WRITE_APPEND,
                    #Puisque la première ligne des données d'origine est le nom de la colonne, ajoutez-le
                    'skipLeadingRows': 1,
                    'allowQuotedNewlines': 'true'
                }
            }
        }

        #Charger dans BigQuery
        bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))

        #Créer un fichier vide
        if not self.dry:
            self.output().open('w').close()

    def output( self ):
        output_path = os.path.join(
                '/tmp/work', #Directeur de travail
                self.time.strftime( '%Y-%m-%d' ),        #Date
                self.time.strftime( '%H' ),              #temps
                str( self )                              #Nom de la tâche
        )
        return luigi.LocalTarget( output_path )

Target place simplement un fichier vide localement comme preuve de l'exécution du travail. Ne comptez pas sur BigqueryLoadTask C'est fait.

Recommended Posts

Précautions lors de la sortie vers une table BigQuery toutes les heures avec Luigi
Précautions lors de la saisie à partir de CSV avec Python et de la sortie vers json pour faire exe
Une note à laquelle j'étais accro lors de la création d'une table avec SQL Alchemy
Précautions lors de l'installation de tensorflow avec anaconda
Précautions lors de l'utilisation de six avec Python 2.5
[mémo] Envoyez des informations de table à GCS avec BigQuery Schedule et Cloud Functions
Points à noter lors de la résolution de problèmes DP avec Python
Comment utiliser BigQuery en Python
Il est plus pratique d'utiliser csv-table lors de l'écriture d'une table avec python-sphinx