INTRODUCTION: Wes McKinney, auteur de pandas, a écrit un blog très intéressant sur les outils de données Python, j'ai donc demandé si je pouvais le traduire et le publier dans la communauté japonaise PyData. J'ai reçu ceci, donc je vais le traduire petit à petit et le publier.
Traduit par: Connectivité HDFS (Native Hadoop File System) en Python
2017/1/3
Jusqu'à présent, de nombreuses bibliothèques Python ont été développées pour interagir avec HDFS, également connu sous le nom de système de fichiers Hadoop. Certains sont via la passerelle Web HDFS HDFS, tandis que d'autres sont des interfaces RPC natives basées sur la mémoire tampon de protocole. Dans cet article, je vais vous donner un aperçu des bibliothèques existantes et vous montrer ce que j'ai fait pour fournir une interface HDFS haute performance dans le développement de l'écosystème d'Arrow.
Ce blog fait suite à un article sur la feuille de route 2017.
HDFS fait partie d'Apache Hadoop et sa conception était à l'origine basée sur le système de fichiers Google décrit dans l'article original de MapReduce. HDFS utilise les tampons de protocole de Google (parfois appelés «protobufs» en abrégé) comme protocole filaire natif pour les appels de procédure à distance, ou RPC.
Les systèmes qui interagissent avec HDFS implémenteront généralement le format de messagerie et le protocole RPC de Protobuf, similaires au client Java principal. WebHDFS a été développé pour faciliter la lecture et l'écriture de fichiers par les applications à faible charge, et il fournit une passerelle HTTP ou HTTPS qui permet aux requêtes PUT et GET d'être utilisées à la place des protobufs RPC.
Pour les applications légères, les RPC WebHDFS et protobufs natifs ont un débit de données comparable, mais les connexions natives sont généralement considérées comme hautement évolutives et adaptées à une utilisation en production.
Python a deux interfaces WebHDFS que j'ai utilisées:
Plus loin dans cet article, nous nous concentrerons sur l'interface client native RPC.
Si vous souhaitez vous connecter à HDFS de manière native à partir d'un langage qui fonctionne bien avec C, comme Python, la manière «officielle» d'Apache Hadoop est d'utiliser libhdfs. libhdfs est un wrapper C basé sur JNI pour les clients Java HDFS. Le principal avantage de libhdfs est qu'il est distribué et pris en charge par les principaux fournisseurs Hadoop et fait partie du projet Apache Hadoop. L'inconvénient est que vous utilisez JNI (la JVM est lancée à partir d'un processus Python) et que vous avez besoin d'une distribution Hadoop Java complète côté client. Il s'agit d'une condition inacceptable pour certains clients et, contrairement à d'autres clients, nécessite un support au niveau de la production. Par exemple, l'application C ++ Apache Impala (Incubation Project) utilise libhdfs pour accéder aux données sur HDFS.
En raison du poids élevé des libhdfs par nature, des interfaces natives alternatives à HDFS ont été développées.
--libhdfs3 est une bibliothèque purement C ++ qui fait maintenant partie d'Apache HAWQ (Incubation Project). libhdfs3 a été développé par Pivotal Labs pour une utilisation dans HAWQ sur les systèmes SQL-on-Hadoop. L'avantage de libhdfs3 est qu'il est hautement compatible avec libhdfs au niveau de l'API C. À un moment donné, libhdfs3 était officiellement susceptible de faire partie d'Apache Hadoop, mais maintenant c'est peu probable (voir HDFS-8707 car une nouvelle bibliothèque C ++ est en cours de développement).
--snakebite: Une implémentation pure Python de l'interface protobuf RPC de Hadoop, développée par Spotify.
Comme snakebite ne fournit pas d'API client complète (par exemple, vous ne pouvez pas écrire de fichiers) et qu'il ne fonctionne pas bien (implémenté uniquement en Python), nous nous concentrerons désormais sur libhdfs et libhdfs3. Je continuerai à le faire.
Il y a eu de nombreuses tentatives pour créer une interface de niveau C vers libhdfs pour la bibliothèque JNI. Parmi eux se trouvent cyhdfs (en utilisant Cython), libpyhdfs (extension Python C normale) et pyhdfs (en utilisant SWIG). L'un des défis de la construction d'une extension C vers libhdfs est que la bibliothèque partagée libhdfs.so est incluse dans la distribution Hdoop et est distribuée, donc $ LD_LIBRARY_PATH est approprié pour charger cette bibliothèque partagée. Doit être réglé sur. De plus, le libjvm.so de la JVM doit également pouvoir être chargé lors de l'importation. Lorsque ces conditions sont réunies, vous tomberez dans «l'enfer du décor».
Quand je pensais créer une interface C ++ HDFS à utiliser avec Apache Arrow (et aussi Python via PyArrow), j'ai trouvé une implémentation de libhdfs dans le projet SFrame de Turi. Il était censé trouver les deux dans une approche sensée lors du chargement de JVM et de libhdfs à l'exécution. J'ai adopté cette approche avec Arrow et cela a fonctionné. En utilisant cette implémentation, les outils de sérialisation de données d'Arrow (comme Apache Parquet) ont une très faible surcharge d'E / S et fournissent également une interface de fichier Python pratique.
Les API C dans les bibliothèques de pilotes libhdfs et libhdfs3 sont à peu près les mêmes, donc j'ai pu changer de pilote en fonction des arguments de mots clés de Python.
from pyarrow import HdfsClient
#Utilisez libhdfs
hdfs = HdfsClient(host, port, username, driver='libhdfs')
#Utilisez libhdfs3
hdfs_alt = HdfsClient(host, port, username, driver='libhdfs3')
with hdfs.open('/path/to/file') as f:
...
En parallèle, les développeurs du projet Dask ont créé hdfs3, une interface pure Python vers libhdfs3. Il a utilisé des ctypes pour éviter l'extension C. hdfs3 donne accès à d'autres fonctionnalités de libhdfs3 ainsi qu'à une interface de fichier Python.
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
...
Pour un cluster CDH 5.6.0 HDFS local, j'ai calculé la moyenne collective des performances de lecture pour des fichiers de 4 Ko à 100 Mo avec trois paramètres différents.
--hdfs3 (utilisez toujours libhdfs3) --pyarrow.HdfsClient avec driver = 'libhdfs' --pyarrow.HdfsClient avec driver = 'libhdfs3'
Vous pouvez obtenir tous ces packages en procédant comme suit:
conda install pyarrow hdfs3 libhdfs3 -c conda-forge
Remarque: le package pyarrow conda-forge n'est actuellement disponible que sous Linux. En théorie, ce problème aurait dû être résolu le 20 janvier 2017. Veuillez nous faire savoir si quelqu'un peut vous aider avec la prise en charge de Windows.
Le nombre de performances est de mégaoctets / seconde («débit»). Le code de référence est à la fin de cet article. Je suis curieux de voir à quoi ressemble ce résultat dans une plus grande variété d'environnements de production et de paramètres Hadoop.
HDFS RPC data perf
Au moins dans les tests que j'ai effectués, j'ai obtenu les résultats intéressants suivants:
--Libhdfs a montré le débit le plus élevé dans ce test, même s'il est basé sur Java et JNI. --libhdfs3 ne fonctionnait pas bien pour les lectures de petite taille. Cela peut être dû à la latence RPC ou à un problème dont je ne suis pas conscient dans les paramètres.
Ce qui suit est l'axe logarithmique du temps.
HDFS RPC data perf
L'une des raisons de la création d'interfaces d'E / S de type HDFS dans la bibliothèque pyarrow est qu'elles utilisent toutes une couche commune de gestion de la mémoire et ont une surcharge de copie très faible (éventuellement zéro). , Parce que vous pouvez transmettre les données. D'autre part, une bibliothèque qui expose uniquement l'interface de fichier Python entraîne une surcharge car la mémoire est traitée par l'objet de chaîne d'octets dans l'interpréteur Python.
Les détails du système d'E / S C ++ d'Arrow dépassent le cadre de cet article, mais je publierai à ce sujet sur ce blog à l'avenir.
Code du banc
import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE
hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')
hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
f.write(data)
def read_chunk(f, size):
# do a random seek
f.seek(random.randint(0, size))
return f.read(size)
def ensemble_average(runner, niter=10):
start = time.clock()
gc.disable()
data_chunks = []
for i in range(niter):
data_chunks.append(runner())
elapsed = (time.clock() - start) / niter
gc.enable()
return elapsed
def make_test_func(fh, chunksize):
def runner():
return read_chunk(fh, chunksize)
return runner
KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]
handles = {
('pyarrow', 'libhdfs'): hdfs.open(path),
('pyarrow', 'libhdfs3'): hdfscpp.open(path),
('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}
timings = []
for (library, driver), handle in handles.items():
for chunksize, niter in zip(chunksizes, iterations):
tester = make_test_func(handle, chunksize)
timing = ensemble_average(tester, niter=niter)
throughput = chunksize / timing
result = (library, driver, chunksize, timing, throughput)
print(result)
timings.append(result)
results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']
plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)
plt.savefig('results2.png')
Recommended Posts