Luigi Reverse Lookup Reference Please refer to this post I created a job called Local-> GCS-> BigQuery for hourly processing results.
import luigi
import luigi_bigquery
import pandas as pd
from luigi.contrib.gcs import GCSClient, GCSTarget
from lib.gcp_client import GCPClient, BigqueryLoadTaskEx
from luigi.contrib.bigquery import BigqueryTarget, CreateDisposition, WriteDisposition, SourceFormat
class LoadToGcs( luigi_bigquery.Query ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
def requires( self ):
#Previous job. Outputting files locally in TSV format
return Calculate( time = self.time)
def output( self ):
path = 'gs://test_xxx/output' + self.time.strftime( '/%Y/%m/%d/%H' ) + '/output.txt'
client = GCSClient( oauth_credentials = GCPClient( self.lconfig ).get_credentials() )
return GCSTarget( path, client = client )
def run( self ):
with self.input().open('r') as input:
results = pd.read_csv( input, sep='\t' )
with self.output().open('w') as output:
results.to_csv( output, index=False, encoding='utf8' )
class LoadToTable( BigqueryLoadTaskEx ):
time = luigi.DateHourParameter()
source_format = SourceFormat.CSV
#I want you to append to the table every hour
write_disposition = WriteDisposition.WRITE_APPEND
create_disposition = CreateDisposition.CREATE_IF_NEEDED
max_bad_records = 0
skip_leading_rows = 1
def requires( self ):
#Read files from GCS
return LoadToGcs( time = self.time, )
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def source_uris(self):
return [ self.input().path ]
def output(self):
return BigqueryTarget(
project_id = 'test_project',
dataset_id = 'test_dataset',
#Image of table name: test_table_20161013
table_id = self.time.strftime( 'test_table' + '_%Y%m%d' ),
client = self.get_client()
)
Luigi's BigqueryLoadTask
made the code really simple and impressed.
why
** Due to BigQuery specifications, it is recommended to end the table name with _% Y% m% d
**
The reason is that it is summarized in a dropdown, so it is very easy to see in terms of UI when the number of tables is large.
Reference: http://tech.vasily.jp/entry/bigquery_data_platform
Additional processing will be performed 24 times for one table per day.
However, since the output is BigqueryTarget
, when appending after 1 o'clock (when the table exists), the job is considered to have been executed and the job ends without loading.
write_disposition = WriteDisposition.WRITE_APPEND
I thought that if I wrote this, it would be append, but it is absolutely necessary to go to see Target first due to Luigi's specifications. (Of course)
table_id = self.time.strftime( 'test_table' + '_%Y%m%d%H' )
The quickest solution is to put it in the table name until the time, but the number of tables keeps increasing by 24 per day. I don't want to do it because it doesn't drop down and BigQuery gets pretty dirty.
#Ordinary luigi.Use Task
class LoadToTable( luigi.Task ):
time = luigi.DateHourParameter()
lconfig = luigi.configuration.get_config()
@property
def schema_json( self ):
return 'schemas/xxxx.json'
def requires( self ):
return LoadToGcs( time = self.time, )
def run( self ):
#Use BigqueryClient
bq_client = BigqueryClient(oauth_credentials=GCPClient(self.lconfig).get_credentials())
with open( self.schema_json, 'r' ) as f:
schema = json.load( f )
project_id = 'test_project'
dataset_id = 'test_dataset'
table_id = 'test_table'
job = {
'configuration': {
'load': {
'sourceUris': [
self.input().path
],
'schema': {
'fields': schema
},
'destinationTable': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
},
'sourceFormat': SourceFormat.CSV,
'writeDisposition': WriteDisposition.WRITE_APPEND,
#Since the first line of the original data is the column name, add it
'skipLeadingRows': 1,
'allowQuotedNewlines': 'true'
}
}
}
#Load into BigQuery
bq_client.run_job(project_id, job, dataset=BQDataset(project_id=project_id, dataset_id=dataset_id))
#Create empty file
if not self.dry:
self.output().open('w').close()
def output( self ):
output_path = os.path.join(
'/tmp/work', #Working directory
self.time.strftime( '%Y-%m-%d' ), #date
self.time.strftime( '%H' ), #time
str( self ) #Task name
)
return luigi.LocalTarget( output_path )
Target just puts an empty file locally as proof of job execution.
Don't rely on BigqueryLoadTask
It's done.
Recommended Posts