Python and Treasure Data
Chez Treasure Data, notre vision est de faciliter l'analyse des données pour les analystes et les décideurs. Notre service touche les données brutes avec SQL, mais si vous souhaitez effectuer une analyse plus complexe ou utiliser des scripts et des outils existants, vous voudrez certainement contacter R et Python.
En ce qui concerne R, il peut être connecté à partir du pilote JDBC, et il est en fait utilisé par les clients, mais au fait, je me suis demandé comment le faire depuis Python l'autre jour, et quand j'ai fait un peu de recherche, c'était choquant. Les faits ont été découverts.
~~ Les données Treasure n'ont pas de client Python! ~~ (MISE À JOUR: ** Fabriqué par notre as pythonista **)
Je n'ai même pas essayé de trouver un tiers. Cela tourne le dos aux data scientists qui ont Python / Pandas comme 18e.
Cependant, un autre fait choquant a été découvert ici.
** J'ai trouvé que je pouvais obtenir le résultat d'une requête lancée pour conserver les données en tant que dataframe Pandas en écrivant seulement 60 lignes de code python. ** **
Ce qui suit est mon script écrit en 4 heures, y compris la vérification du comportement de l'API, la révision Python et l'apprentissage de Pandas. C'est aussi grâce à l'API REST pour Treasure Data ... pas à la bibliothèque de requêtes de Python.
td-client-pandas in 60 lines
import requests
import msgpack
from time import sleep
import json
import pandas
class TreasureData:
def __init__(self, apikey):
self._apikey = apikey
self._endpoint = 'https://api.treasuredata.com'
self._base_headers = {"Authorization": "TD1 %s"%self._apikey}
def query(self, database, query, **opts):
job_id = self._query(database, query, **opts)
while True:
job = self._get_job(job_id)
if job["status"] == 'success':
break
sleep(5)
return {"cursor": self.fetch_result(job_id),
"schema": json.loads(job['hive_result_schema'])}
def _get_job(self, job_id):
request_url = "%s/v3/job/show/%d"%(self._endpoint, job_id)
response = requests.get(request_url, headers=self._base_headers)
return response.json()
def fetch_result(self, job_id):
request_url = "%s/v3/job/result/%d"%(self._endpoint, job_id)
response = requests.get(request_url,
params={"format":"msgpack"},
headers=self._base_headers)
unpacker = msgpack.Unpacker()
for chunk in response.iter_content(8192):
unpacker.feed(chunk)
for unpacked in unpacker:
yield unpacked
def _query(self, database, query, **opts):
if opts.has_key("engine") and opts["engine"] == "tqa":
engine = "presto"
else:
engine = "hive"
request_url = "%s/v3/job/issue/%s/%s"%(self._endpoint, engine, database)
response = requests.post(request_url,
params={"query":query},
headers=self._base_headers)
job_id = int(response.json()["job_id"])
return job_id
class TreasureDataConnector:
def __init__(self, td_apikey):
self._td_client = TreasureData(td_apikey)
self._last_job_id = None
def query(self, database, query, **opts):
result = self._td_client.query(database, query, **opts)
columns, _ = zip(*result['schema'])
data = dict(zip(columns, [[] for _ in columns]))
for row in result["cursor"]:
for k, v in zip(columns, row):
data[k].append(v)
return pandas.DataFrame(data)
En fait, ça ressemble à ça.
In [4]: conn = TreasureDataConnector("a05a6256ec6f32949d55271276777f502b53f7a2")
In [5]: df = conn.query('demo', 'select count(1) as foo, host from syslog group by host')
In [6]: df
Out[6]:
foo host
0 1 pulchritude
1 782 pulchritude.local
Yattane!
Result.next()
Pour être honnête, je suis une personne R et Excel, donc je ne connais pas très bien les Pandas (ou plutôt, j'ai écrit Python pour la première fois en 4 ans). Cependant, j'ai pu le faire rapidement, alors j'aimerais que les scientifiques des données Pandas essaient Treasure Data!
Recommended Posts