(Traduction) Connexion native de Python au système de fichiers Hadoop (HDFS)

INTRODUCTION: Wes McKinney, auteur de pandas, a écrit un blog très intéressant sur les outils de données Python, j'ai donc demandé si je pouvais le traduire et le publier dans la communauté japonaise PyData. J'ai reçu ceci, donc je vais le traduire petit à petit et le publier.

Traduit par: Connectivité HDFS (Native Hadoop File System) en Python

2017/1/3

Jusqu'à présent, de nombreuses bibliothèques Python ont été développées pour interagir avec HDFS, également connu sous le nom de système de fichiers Hadoop. Certains sont via la passerelle Web HDFS HDFS, tandis que d'autres sont des interfaces RPC natives basées sur la mémoire tampon de protocole. Dans cet article, je vais vous donner un aperçu des bibliothèques existantes et vous montrer ce que j'ai fait pour fournir une interface HDFS haute performance dans le développement de l'écosystème d'Arrow.

Ce blog fait suite à un article sur la feuille de route 2017.

Protocole du système de fichiers Hadoop

HDFS fait partie d'Apache Hadoop et sa conception était à l'origine basée sur le système de fichiers Google décrit dans l'article original de MapReduce. HDFS utilise les tampons de protocole de Google (parfois appelés «protobufs» en abrégé) comme protocole filaire natif pour les appels de procédure à distance, ou RPC.

Les systèmes qui interagissent avec HDFS implémenteront généralement le format de messagerie et le protocole RPC de Protobuf, similaires au client Java principal. WebHDFS a été développé pour faciliter la lecture et l'écriture de fichiers par les applications à faible charge, et il fournit une passerelle HTTP ou HTTPS qui permet aux requêtes PUT et GET d'être utilisées à la place des protobufs RPC.

Pour les applications légères, les RPC WebHDFS et protobufs natifs ont un débit de données comparable, mais les connexions natives sont généralement considérées comme hautement évolutives et adaptées à une utilisation en production.

Python a deux interfaces WebHDFS que j'ai utilisées:

Plus loin dans cet article, nous nous concentrerons sur l'interface client native RPC.

Accès depuis Python avec RPC natif

Si vous souhaitez vous connecter à HDFS de manière native à partir d'un langage qui fonctionne bien avec C, comme Python, la manière «officielle» d'Apache Hadoop est d'utiliser libhdfs. libhdfs est un wrapper C basé sur JNI pour les clients Java HDFS. Le principal avantage de libhdfs est qu'il est distribué et pris en charge par les principaux fournisseurs Hadoop et fait partie du projet Apache Hadoop. L'inconvénient est que vous utilisez JNI (la JVM est lancée à partir d'un processus Python) et que vous avez besoin d'une distribution Hadoop Java complète côté client. Il s'agit d'une condition inacceptable pour certains clients et, contrairement à d'autres clients, nécessite un support au niveau de la production. Par exemple, l'application C ++ Apache Impala (Incubation Project) utilise libhdfs pour accéder aux données sur HDFS.

En raison du poids élevé des libhdfs par nature, des interfaces natives alternatives à HDFS ont été développées.

--libhdfs3 est une bibliothèque purement C ++ qui fait maintenant partie d'Apache HAWQ (Incubation Project). libhdfs3 a été développé par Pivotal Labs pour une utilisation dans HAWQ sur les systèmes SQL-on-Hadoop. L'avantage de libhdfs3 est qu'il est hautement compatible avec libhdfs au niveau de l'API C. À un moment donné, libhdfs3 était officiellement susceptible de faire partie d'Apache Hadoop, mais maintenant c'est peu probable (voir HDFS-8707 car une nouvelle bibliothèque C ++ est en cours de développement).

--snakebite: Une implémentation pure Python de l'interface protobuf RPC de Hadoop, développée par Spotify.

Comme snakebite ne fournit pas d'API client complète (par exemple, vous ne pouvez pas écrire de fichiers) et qu'il ne fonctionne pas bien (implémenté uniquement en Python), nous nous concentrerons désormais sur libhdfs et libhdfs3. Je continuerai à le faire.

Python instar face à libhdfs et libhdfs3

Il y a eu de nombreuses tentatives pour créer une interface de niveau C vers libhdfs pour la bibliothèque JNI. Parmi eux se trouvent cyhdfs (en utilisant Cython), libpyhdfs (extension Python C normale) et pyhdfs (en utilisant SWIG). L'un des défis de la construction d'une extension C vers libhdfs est que la bibliothèque partagée libhdfs.so est incluse dans la distribution Hdoop et est distribuée, donc $ LD_LIBRARY_PATH est approprié pour charger cette bibliothèque partagée. Doit être réglé sur. De plus, le libjvm.so de la JVM doit également pouvoir être chargé lors de l'importation. Lorsque ces conditions sont réunies, vous tomberez dans «l'enfer du décor».

Quand je pensais créer une interface C ++ HDFS à utiliser avec Apache Arrow (et aussi Python via PyArrow), j'ai trouvé une implémentation de libhdfs dans le projet SFrame de Turi. Il était censé trouver les deux dans une approche sensée lors du chargement de JVM et de libhdfs à l'exécution. J'ai adopté cette approche avec Arrow et cela a fonctionné. En utilisant cette implémentation, les outils de sérialisation de données d'Arrow (comme Apache Parquet) ont une très faible surcharge d'E / S et fournissent également une interface de fichier Python pratique.

Les API C dans les bibliothèques de pilotes libhdfs et libhdfs3 sont à peu près les mêmes, donc j'ai pu changer de pilote en fonction des arguments de mots clés de Python.

from pyarrow import HdfsClient

#Utilisez libhdfs
hdfs = HdfsClient(host, port, username, 	driver='libhdfs')

#Utilisez libhdfs3
hdfs_alt = HdfsClient(host, port, username, 	driver='libhdfs3')

with hdfs.open('/path/to/file') as f:
    ...

En parallèle, les développeurs du projet Dask ont créé hdfs3, une interface pure Python vers libhdfs3. Il a utilisé des ctypes pour éviter l'extension C. hdfs3 donne accès à d'autres fonctionnalités de libhdfs3 ainsi qu'à une interface de fichier Python.

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
    ...

Performances d'accès aux données de pyarrow.HdfsClient et hdfs3

Pour un cluster CDH 5.6.0 HDFS local, j'ai calculé la moyenne collective des performances de lecture pour des fichiers de 4 Ko à 100 Mo avec trois paramètres différents.

--hdfs3 (utilisez toujours libhdfs3) --pyarrow.HdfsClient avec driver = 'libhdfs' --pyarrow.HdfsClient avec driver = 'libhdfs3'

Vous pouvez obtenir tous ces packages en procédant comme suit:

conda install pyarrow hdfs3 libhdfs3 -c conda-forge

Remarque: le package pyarrow conda-forge n'est actuellement disponible que sous Linux. En théorie, ce problème aurait dû être résolu le 20 janvier 2017. Veuillez nous faire savoir si quelqu'un peut vous aider avec la prise en charge de Windows.

Le nombre de performances est de mégaoctets / seconde («débit»). Le code de référence est à la fin de cet article. Je suis curieux de voir à quoi ressemble ce résultat dans une plus grande variété d'environnements de production et de paramètres Hadoop.

HDFS RPC data perflibhdfs_perf_linear.png

Au moins dans les tests que j'ai effectués, j'ai obtenu les résultats intéressants suivants:

--Libhdfs a montré le débit le plus élevé dans ce test, même s'il est basé sur Java et JNI. --libhdfs3 ne fonctionnait pas bien pour les lectures de petite taille. Cela peut être dû à la latence RPC ou à un problème dont je ne suis pas conscient dans les paramètres.

Ce qui suit est l'axe logarithmique du temps.

HDFS RPC data perflibhdfs_perf_log.png

E / S C ++ natives dans Apache Arrow

L'une des raisons de la création d'interfaces d'E / S de type HDFS dans la bibliothèque pyarrow est qu'elles utilisent toutes une couche commune de gestion de la mémoire et ont une surcharge de copie très faible (éventuellement zéro). , Parce que vous pouvez transmettre les données. D'autre part, une bibliothèque qui expose uniquement l'interface de fichier Python entraîne une surcharge car la mémoire est traitée par l'objet de chaîne d'octets dans l'interpréteur Python.

Les détails du système d'E / S C ++ d'Arrow dépassent le cadre de cet article, mais je publierai à ce sujet sur ce blog à l'avenir.

Code du banc

import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE

hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')

hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
    f.write(data)

def read_chunk(f, size):
    # do a random seek
    f.seek(random.randint(0, size))
    return f.read(size)

def ensemble_average(runner, niter=10):
    start = time.clock()
    gc.disable()
    data_chunks = []
    for i in range(niter):
        data_chunks.append(runner())
    elapsed = (time.clock() - start) / niter
    gc.enable()
    return elapsed

def make_test_func(fh, chunksize):
    def runner():
        return read_chunk(fh, chunksize)
    return runner

KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]

handles = {
    ('pyarrow', 'libhdfs'): hdfs.open(path),
    ('pyarrow', 'libhdfs3'): hdfscpp.open(path),
    ('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}

timings = []
for (library, driver), handle in handles.items():
    for chunksize, niter in zip(chunksizes, iterations):
        tester = make_test_func(handle, chunksize)
        timing = ensemble_average(tester, niter=niter)
        throughput = chunksize / timing

        result = (library, driver, chunksize, timing, throughput)
        print(result)
        timings.append(result)

results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']
	
plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)

plt.savefig('results2.png')

Recommended Posts

(Traduction) Connexion native de Python au système de fichiers Hadoop (HDFS)
Importer un fichier Excel depuis Python (enregistré dans DB)
Du dessin de fichier au graphique en Python. Élémentaire élémentaire
Changements de Python 3.0 à Python 3.5
Changements de Python 2 à Python 3.0
[Python] Changer l'entrée standard du clavier en fichier texte
[Python] Conversion de WGS84 en système de coordonnées orthogonales plan
Procédure pour convertir un fichier python en exe à partir de la construction de l'environnement Ubunts
Script Python qui crée un fichier JSON à partir d'un fichier CSV
Publier de Python vers Slack
Flirter de PHP à Python
[Pour les débutants] Script dans 10 lignes (4. Connexion de python à sqlite3)
Passer de python2.7 à python3.6 (centos7)
Connectez-vous à sqlite depuis python
Système Python OCR Augmentez les caractères des images pour améliorer l'efficacité du travail
Exécuter le script Python à partir du fichier de commandes
[Python] Ecrire dans un fichier csv avec Python
Sortie vers un fichier csv avec Python
[Lambda] [Python] Publier sur Twitter depuis Lambda!
Connectez-vous à la base de données utf8mb4 à partir de python
Python (de la première fois à l'exécution)
Publier une image de Python sur Tumblr
Comment accéder à wikipedia depuis python
Python pour passer d'une autre langue
Téléchargement de fichiers vers Azure Storage (Python)
[Introduction à Python3 Day 21] Chapitre 10 Système (10.1 à 10.5)
N'a pas changé de Python 2 à 3
Mettre à jour Mac Python de 2 à 3
[Python] Comment convertir un fichier db en csv
[Python] Simulation de fluide: de linéaire à non linéaire
De Python à l'utilisation de MeCab (et CaboCha)
[Français] vignette réticulée: interface R vers Python
Script pour générer un répertoire à partir d'un fichier json
Comment mettre à jour Google Sheets à partir de Python
[Python] Convertit les délimiteurs de fichier csv en délimiteurs de tabulation
Convertir un fichier psd en png en Python
Je veux utiliser jar de python
Connexion de python à MySQL sur CentOS 6.4
Comment accéder à RDS depuis Lambda (python)
Créer un fichier deb à partir d'un package python
Python> Numéros de sortie de 1 à 100, 501 à 600> Pour csv
Convertir de Markdown en HTML en Python
Explication API pour toucher mastodonte de python
Connectez-vous à l'API Websocket de Coincheck depuis Python