Python and Treasure Data
At Treasure Data, our vision is to make it easier for analysts and decision makers to analyze data. Our service touches raw data with SQL, but if you want to do more complicated analysis or make use of existing scripts and tools, you will definitely want to reach out to R and Python.
Regarding R, it can be connected from the JDBC driver, and it is actually used by customers, but by the way, I wondered how to do it from Python the other day, and when I did a little research, it was shocking. The facts have been discovered.
~~ Treasure data does not have a Python Client! ~~ (UPDATE: ** Made by our ace pythonista **)
I haven't even tried to find a third party. This turns away from the data scientists who have Python / Pandas as their 18th.
However, a further shocking fact was discovered here.
** I found that I could get the result of a query thrown to treasure data as a Pandas dataframe by writing only 60 lines of python code. ** **
The following is a script written in 4 hours including API behavior check, Python review, and Pandas learning. This is also thanks to the REST API for treasure data ... not Python's requests library.
td-client-pandas in 60 lines
import requests
import msgpack
from time import sleep
import json
import pandas
class TreasureData:
def __init__(self, apikey):
self._apikey = apikey
self._endpoint = 'https://api.treasuredata.com'
self._base_headers = {"Authorization": "TD1 %s"%self._apikey}
def query(self, database, query, **opts):
job_id = self._query(database, query, **opts)
while True:
job = self._get_job(job_id)
if job["status"] == 'success':
break
sleep(5)
return {"cursor": self.fetch_result(job_id),
"schema": json.loads(job['hive_result_schema'])}
def _get_job(self, job_id):
request_url = "%s/v3/job/show/%d"%(self._endpoint, job_id)
response = requests.get(request_url, headers=self._base_headers)
return response.json()
def fetch_result(self, job_id):
request_url = "%s/v3/job/result/%d"%(self._endpoint, job_id)
response = requests.get(request_url,
params={"format":"msgpack"},
headers=self._base_headers)
unpacker = msgpack.Unpacker()
for chunk in response.iter_content(8192):
unpacker.feed(chunk)
for unpacked in unpacker:
yield unpacked
def _query(self, database, query, **opts):
if opts.has_key("engine") and opts["engine"] == "tqa":
engine = "presto"
else:
engine = "hive"
request_url = "%s/v3/job/issue/%s/%s"%(self._endpoint, engine, database)
response = requests.post(request_url,
params={"query":query},
headers=self._base_headers)
job_id = int(response.json()["job_id"])
return job_id
class TreasureDataConnector:
def __init__(self, td_apikey):
self._td_client = TreasureData(td_apikey)
self._last_job_id = None
def query(self, database, query, **opts):
result = self._td_client.query(database, query, **opts)
columns, _ = zip(*result['schema'])
data = dict(zip(columns, [[] for _ in columns]))
for row in result["cursor"]:
for k, v in zip(columns, row):
data[k].append(v)
return pandas.DataFrame(data)
Actually it looks like this.
In [4]: conn = TreasureDataConnector("a05a6256ec6f32949d55271276777f502b53f7a2")
In [5]: df = conn.query('demo', 'select count(1) as foo, host from syslog group by host')
In [6]: df
Out[6]:
foo host
0 1 pulchritude
1 782 pulchritude.local
Yattane!
Result.next()
To be honest, I'm an R and Excel person, so I'm not very familiar with Pandas (or rather, I wrote Python for the first time in 4 years). However, I was able to do it quickly, so I would like Pandas data scientists to try Treasure Data!
Recommended Posts