BigQuery is a very fast data analysis environment provided by Google. It seems that everything is distributed and processed on thousands of machines, and it seems that only Google can do it.
I had a connection and had the opportunity to evaluate BigQuery. It was pretty fast when I touched it. I tried the method of touching from python, so I want to share it.
Even if you do not take the method below
https://github.com/tylertreat/BigQuery-Python
There seems to be a library called. I will leave the description below for reference.
I tried it on ubuntu 13.10.
To use it with python, put the following library. Enter pip
in advance.
sudo pip install --upgrade google-api-python-client
sudo pip install --upgrade oauth2client
sudo pip install httplib
Generate bigquery Oauth authentication and get the p12 key.
Create a client key from the authentication of Google's developer console. A key file called p12 is created, so use it.
I referred to this stack over flow.
http://stackoverflow.com/questions/13212991/how-to-query-bigquery-programmatically-from-python-without-end-user-interaction
I tried both synchronous (sync) and asynchronous (async) methods. The environment I tried was the environment of my Nico Nico Douga dataset, so it's not very general. Apparently, it seems to match normally even if you use Japanese (utf-8).
#!/usr/bin/python
# -*- coding:utf-8 -*-
import httplib2
import logging
import time
from apiclient.discovery import build
from oauth2client.client import SignedJwtAssertionCredentials
import pprint
# REPLACE WITH YOUR Project ID
PROJECT_NUMBER = 'XXXXXXXXXXXX'
# REPLACE WITH THE SERVICE ACCOUNT EMAIL FROM GOOGLE DEV CONSOLE
SERVICE_ACCOUNT_EMAIL ='[email protected]'
KEYFILE='xxxxxxxxxxxxxxxx.p12'
class BQC(object):
def __init__(self):
f = file(KEYFILE, 'rb')
self.key = f.read()
f.close()
def _credential(self):
credentials = SignedJwtAssertionCredentials(
SERVICE_ACCOUNT_EMAIL,
self.key,
scope='https://www.googleapis.com/auth/bigquery')
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery', 'v2')
return http,service
def datalist(self,service,http):
datasets = service.datasets()
response = datasets.list(projectId=PROJECT_NUMBER).execute(http)
print('Dataset list:\n')
for dataset in response['datasets']:
print("%s\n" % dataset['id'])
def show_result(self,result):
pprint.pprint(result)
print 'Query Results:'
for row in result['rows']:
result_row = []
for field in row['f']:
if field['v']:
result_row.append(field['v'])
print ('\t').join(result_row)
def sync_method(self,service,http):
q='SELECT * FROM [nicodata_test.videoinfo] WHERE title like "%{}%" LIMIT 10;'.format(u'Vocaloid'.encode('utf-8'))
print q
#q='SELECT TOP(title, 30) as title, COUNT(*) as revision_count FROM [publicdata:samples.wikipedia] WHERE wp_namespace = 0;'
query_request=service.jobs()
query_data={'query':q}
query_response = query_request.query(projectId=PROJECT_NUMBER,
body=query_data).execute(http)
def async_method(self,service,http):
q='SELECT * FROM [nicodata_test.videoinfo] WHERE title like "%{}%" LIMIT 90;'.format(u'Vocaloid'.encode('utf-8'))
q='SELECT * FROM [nicodata_test.comment_data] WHERE comment like "%{}%" LIMIT 90;'.format(u'Vocaloid'.encode('utf-8'))
print q
query_request=service.jobs()
query_data={'query':q}
query_data = {
'configuration': {
'query': {
'query': q,
}
}
}
ft=time.time()
insertResponse = query_request.insert(projectId=PROJECT_NUMBER,
body=query_data).execute(http)
print 'start query diff={}'.format(time.time()-ft)
while True:
print 'stat_get'
ft2=time.time()
status = query_request.get(projectId=PROJECT_NUMBER, jobId=insertResponse['jobReference']['jobId']).execute(http)
print 'end get diff={}'.format(time.time()-ft2)
currentStatus = status['status']['state']
if 'DONE' == currentStatus:
print 'sql done'
break
else:
print 'Waiting for the query to complete...'
print 'Current status: ' + currentStatus
print time.ctime()
time.sleep(0.1)
currentRow = 0
queryReply = query_request.getQueryResults(
projectId=PROJECT_NUMBER,
jobId=insertResponse['jobReference']['jobId'],
startIndex=currentRow).execute(http)
while(('rows' in queryReply) and currentRow < queryReply['totalRows']):
#self.show_result(queryReply)
currentRow += len(queryReply['rows'])
queryReply = query_request.getQueryResults(
projectId=PROJECT_NUMBER,
jobId=queryReply['jobReference']['jobId'],
startIndex=currentRow).execute(http)
print currentRow
def show(self):
http,service=self._credential()
#self.datalist(service,http)
#result=self.sync_method(service,http)
result=self.async_method(service,http)
#self.show_result(result)
def main():
bqc=BQC()
bqc.show()
if __name__=='__main__':main()
Recommended Posts