INTRODUCTION: Wes McKinney, the author of pandas, wrote a very interesting blog about Python data tools, so I asked if I could translate it and publish it to the Japanese PyData community. I received this, so I will translate it little by little and publish it.
Translated by: Native Hadoop file system (HDFS) connectivity in Python
2017/1/3
So far, many Python libraries have been developed for interacting with HDFS, also known as the Hadoop File System. Some are via the HDFS Web HDFS gateway, while others are native Protocol Buffer-based RPC interfaces. In this post, I'll give you an overview of existing libraries and show you what I've done to provide a high-performance HDFS interface in Arrow's ecosystem development.
This blog is a follow-up to a post on the 2017 roadmap.
HDFS is part of Apache Hadoop, and its design was originally based on the Google File System described in the original MapReduce paper. HDFS uses Google's Protocol Buffers (sometimes called "protobufs" for short) as a native wire protocol for remote procedure calls, or RPCs.
Systems that interact with HDFS will typically implement Protobuf's messaging format and RPC protocol, similar to the main Java client. WebHDFS was developed to make it easier for low-load applications to read and write files, and it provides an HTTP or HTTPS gateway that allows PUT and GET requests to be used instead of protobufs RPC.
For light-duty applications, WebHDFS and native protobufs RPC have comparable data throughput, but native connectivity is generally considered to be highly scalable and suitable for production use.
Python has two WebHDFS interfaces that I've used:
Later in this article, we'll focus on the native RPC client interface.
If you want to connect to HDFS in a native way from a language that works well with C, like Python, the "official" way in Apache Hadoop is to use libhdfs. libhdfs is a JNI-based C wrapper for HDFS Java clients. The main benefit of libhdfs is that it is distributed and supported by major Hadoop vendors and is part of the Apache Hadoop project. The downside is that you are using JNI (the JVM is launched from within a Python process) and you need a complete Hadoop Java distribution on the client side. This is an unacceptable condition for some clients, and unlike other clients, it requires production-level support. For example, the C ++ application Apache Impala (incubation project) uses libhdfs to access data on HDFS.
Due to the heavy nature of libhdfs, alternative native interfaces to HDFS have been developed.
--libhdfs3 is a purely C ++ library that is now part of the Apache HAWQ (Incubation Project). libhdfs3 was developed by Pivotal Labs for use in HAWQ on SQL-on-Hadoop systems. The nice thing about libhdfs3 is that it is highly compatible with libhdfs at the C API level. At one point libhdfs3 was officially likely to be part of Apache Hadoop, but now it's unlikely (see HDFS-8707 as a new C ++ library is under development).
--snakebite: A pure Python implementation of Hadoop's protobuf RPC interface, developed by Spotify.
Snakebite doesn't provide a comprehensive client API (for example, you can't write files) and it doesn't perform well (implemented purely in Python), so from now on we'll focus on libhdfs and libhdfs3. I will continue to do it.
There have been many attempts to build a C-level interface to the JNI library's libhdfs. Among them are cyhdfs (using Cython), libpyhdfs (normal Python C extension), and pyhdfs (using SWIG). One of the challenges in building a C extension to libhdfs is that the shared library libhdfs.so is included in the Hdoop distribution and is distributed, so $ LD_LIBRARY_PATH is appropriate to load this shared library. Is to be set to. In addition, the JVM's libjvm.so must also be able to be loaded during the import. When these conditions are combined, you will fall into the "setting hell".
When I was thinking of building a C ++ HDFS interface for use with Apache Arrow (and also Python via PyArrow), I found an implementation of libhdfs in Turi's SFrame project. It was supposed to find both in a wise approach when loading the JVM and libhdfs at runtime. I took this approach with Arrow and it worked. Using this implementation, Arrow's data serialization tools (like Apache Parquet) have very low I / O overhead and also provide a convenient Python file interface.
The C APIs in the libhdfs and libhdfs3 driver libraries are pretty much the same, so I was able to switch drivers according to Python's keyword arguments.
from pyarrow import HdfsClient
#Use libhdfs
hdfs = HdfsClient(host, port, username, driver='libhdfs')
#Use libhdfs3
hdfs_alt = HdfsClient(host, port, username, driver='libhdfs3')
with hdfs.open('/path/to/file') as f:
...
In parallel with this, the developers of the Dask project created hdfs3, a pure Python interface to libhdfs3. It used ctypes to avoid C extensions. hdfs3 provides access to other features of libhdfs3 as well as a Python file interface.
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
...
For a local CDH 5.6.0 HDFS cluster, I calculated the collective mean of read performance for files sized from 4KB to 100MB with three different settings.
--hdfs3 (always use libhdfs3) --pyarrow.HdfsClient with driver ='libhdfs' --pyarrow.HdfsClient with driver ='libhdfs3'
You can get all of these packages by doing the following:
conda install pyarrow hdfs3 libhdfs3 -c conda-forge
Note: The pyarrow conda-forge package is currently only available on Linux. In theory, this issue should have been resolved on January 20, 2017. Please let us know if anyone can help with Windows support.
The performance number is megabytes / second ("throughput"). Benchmark code is at the end of this post. I'm curious to see what this result looks like in a wider variety of production environments and Hadoop settings.
HDFS RPC data perf
At least in the tests I did, I got the following interesting results:
--Libhdfs showed the highest throughput in this test, even though it is Java and JNI based. --libhdfs3 did not perform well for small size reads. This may be due to RPC latency or a problem I'm not aware of in the settings. --Strictly compared to libhdfs3, pyarrow is about hdfs310-15% higher. This seems to be mainly due to memory handling / copying due to the difference between ctypes (hdfs3) and C ++ (pyarrow).
The following is the logarithmic axis of time.
HDFS RPC data perf
One of the reasons for building HDFS-like I / O interfaces within the pyarrow library is that they all use a common layer of memory management and only have very low (possibly zero) copy overhead. , Because you can pass the data. On the other hand, a library that exposes only the Python file interface incurs some overhead because the memory is processed by the byte string object in the Python interpreter.
The details of Arrow's C ++ I / O system are beyond the scope of this article, but I'll post about it on this blog in the future.
Benchmark code
import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE
hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')
hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
f.write(data)
def read_chunk(f, size):
# do a random seek
f.seek(random.randint(0, size))
return f.read(size)
def ensemble_average(runner, niter=10):
start = time.clock()
gc.disable()
data_chunks = []
for i in range(niter):
data_chunks.append(runner())
elapsed = (time.clock() - start) / niter
gc.enable()
return elapsed
def make_test_func(fh, chunksize):
def runner():
return read_chunk(fh, chunksize)
return runner
KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]
handles = {
('pyarrow', 'libhdfs'): hdfs.open(path),
('pyarrow', 'libhdfs3'): hdfscpp.open(path),
('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}
timings = []
for (library, driver), handle in handles.items():
for chunksize, niter in zip(chunksizes, iterations):
tester = make_test_func(handle, chunksize)
timing = ensemble_average(tester, niter=niter)
throughput = chunksize / timing
result = (library, driver, chunksize, timing, throughput)
print(result)
timings.append(result)
results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']
plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)
plt.savefig('results2.png')
Recommended Posts