First edition: 2020/3/18
Authors: Soichi Takashige, Masahiro Ito, Hitachi, Ltd.
In this post, we will introduce the design know-how of data preprocessing and the performance verification result of data preprocessing when designing a system incorporating a machine learning model.
In the third installment, we will introduce the performance improvement know-how and verification results in data preprocessing using Spark, which is a parallel distributed processing platform.
** Post list: **
In my previous post, I showed that if you have a large amount of data and you want to preprocess data with Python using a single node, you will run out of memory. In such cases, it is often effective to use a parallel distributed processing platform. This time, we will introduce data preprocessing by Spark, which is a typical parallel and distributed processing platform.
In the development of a system that utilizes machine learning, as described in the first post, the PoC for confirming the effectiveness of machine learning is first performed, and then the production system is developed based on the result. However, at this PoC stage, data preprocessing is often implemented in Python. Therefore, if you want to use Spark when developing a production system, you need to rewrite your Python code for Spark. This time, the data preprocessing by Pandas data frame for BigBench business scenario # 5 was rewritten to Spark processing according to the following guidelines.
Initialization
import numpy as np
import pandas as pd
```python:After(spark)
import pyspark
import pyspark.sql
form pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
spark = SparkSession.builder.master(…) #Described the connection settings with Spark
sc = spark.getContext()
hive_context = HiveContext(sc)
Create data frame
df = pd.DataFrame()
```python:After(spark)
df = spark.createDataFrame(data, schema)
Read File (CSV)
df = pd.read_csv(filename)
```python:After(spark)
df = spark.read.csv(filename)
Get rows with non-null values
df.loc[df[“columnname”]notnull(),:]
```python:After(spark)
df.filter(df[“columnname”].isNotNull())
Access to specific rows of data
df.loc[rownumber]
```python:After(spark)
#Modifying the logic so that access to a specific row is time consuming and does not require row access
The way you access a particular string of data is the same for pandas and spark.
df[‘columnname’]
Row filtering using the value of a certain column as a search condition (filter, etc.)
df.loc[df[‘columnname’] == value]
```python:After(spark)
df.filter(df[‘columnname’] == value)
Merge / inner join
pd.merge(df1, df2, how=’inner’, left_on=”l-key” right_on=”r-key”)
```python:After(spark)
df1.join(df2, df1.l_key == df2.r_key, ‘inner’)
Data grouping
def function(df): #Do something return data df.groupby(“key”).apply(function)
```python:After(spark)
out_schema = StructType(...) #out_schema is the definition of the data schema output by the function
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def function(df):
#Do something * Depending on the Pandas UDF specifications, it may be necessary to rewrite (described later)
return data
df.groupby(“key”).apply(function)
Pandas UDF is one of the ways to execute Python processing using Pandas in Spark. Pandas UDF is a mechanism of cooperation between Python and Spark provided by Spark, and it is possible to perform parallel and distributed processing of processing implemented in Python (Pandas) in Spark. The three functions shown in Table 1 are available in Pandas UDF and can be called from Spark dataframe operations.
Table 1 Types of Python functions that can be used with Pandas UDF
# | Processing function type | Explanation of the processing that can be performed by the function | Use scene |
---|---|---|---|
1 | SCALAR | Onepandas.Series を受け取り、処理を行ったうえで、Onepandas.Series return it. The element name and type must match on the input and output. |
Applies to the next iteration for a data string
|
2 | AGG | One or morepandas.Series Receive,After some processing, onepandas.Series return it. The number of input and output data does not have to match. |
Applies to aggregation processing
|
3 | GROUPED_MAP | Onepandas.DataFrame Is received, some processing is performed on each element, and as a result, anotherpandas.DataFrame Returns one. Input and output data rows, column names, etc. do not have to match |
Applies to each dataset in the grouped dataset
|
When writing groupby.apply
processing in Pandas, by registering the Python function passed to that ʻapply as the above GROUPED_MAP function, parallel execution by Spark without man-hours such as reimplementation Can be realized. However, the Pandas and Spark ʻapply
functions have slightly different specifications and may require some tweaking. In this implementation, it is necessary to deal with the difference between the following two specifications, so we have rewritten it. In the explanation, df
is the variable name passed to the data frame, keyname
is the column name of the data frame, and function
is the function name that defines the iterative process.
As a result of rewriting the implementation of data preprocessing of BigBench business scenario # 5 by Python shown in the second post to Spark using the know-how mentioned so far, the final result is as shown in Figure 1 below. It became a code.
import pandas as pd
import numpy as np
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType, when
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)
web_clickstreams = hive_context.read.table("bigbench.web_clickstreams")
item = hive_context.read.table("bigbench.item")
customer = hive_context.read.table("bigbench.customer")
customer_demographics = hive_context.read.table("bigbench.customer_demographics")
#Processing ①
data = web_clickstreams.filter(web_clickstreams[“wcs_user_sk”].isNotNull())
data = data.join(item, data["wcs_item_sk"] == item["i_item_sk"], 'inner')
#Process (2): Group by user ID
grouped_users = data.groupby('wcs_user_sk')
#Process ③ Type definition: Define the output data type of iterative processing
types = ["wcs_user_sk", "clicks_in_category"]+["clicks_in_%d"%i for i in range(1,8)]
out_schema = StructType([StructField(i, IntegerType(), True) for i in types])
#Process ③ Registration: Register the contents of the iterative process as a function in Pandas UDF
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def summarize_per_user(wcs_user_sk_contents):
wcs_user_sk_index = wcs_user_sk_contents['wcs_user_sk'][0]
#Processing ③-1, ③-2
clicks_in_category = \
len(wcs_user_sk_contents[wcs_user_sk_contents['i_category'] == i_category_index])
clicks_in = [0] * 8
for name, df in wcs_user_sk_contents.groupby('i_category_id'):#Optimized loop once
if name < len(clicks_in):
clicks_in[name] = len(df.index)
#Processing ③-3
return pd.DataFrame([wcs_user_sk_index, clicks_in_category] + clicks_in[1:],\
columns=types)
#Process ③ Execution
i_category_index = 'Books'
data = grouped_users.apply(summarize_per_user)
#Processing ④
data = data.join(customer, data["wcs_user_sk"] == customer["c_customer_sk"], 'inner')
#Processing ⑤
data = data.join(customer_demographics, \
data["c_current_cdemo_sk"] == customer_demographics["cd_demo_sk"], 'inner')
#Processing ⑥
data.withColumn('college_education',
when(data["cd_education_status"] == 'Advanced Degree', 1)\
.when(data["cd_education_status"] == 'College', 1)\
.when(data[“cd_education_status”] == '4 yr Degree', 1)\
.when(data[“cd_education_status”] == '2 yr Degree', 1)\
.otherwise(0))
data.withColumn('male', when(data[“cd_gender”] == 'M', 1).otherwise(0))
#Save results
data.write.mode('append').parquet('answer_q05_0100.parquet')
Figure 1 Data preprocessing source code for BigBench scenario # 5 by Spark
From here, we will verify the performance of data preprocessing by Spark implemented in Fig. 1. Figure 2 below shows the service layout of the Spark cluster built as an environment for parallel and distributed processing in this verification. This time, we are building a Spark cluster using the Cloudera distribution, assuming an on-premises use case. In addition, one node of Worker Node is used for the verification of single node processing by Python, which was done in the second post. Also, when processing in Spark, the actual parallel and distributed processing is performed on 3 Worker Nodes.
Figure 2 Service layout of this verification environment
Next, Table 2 shows the specifications of this verification environment. This time, I am using IaaS (EC2 instance) on AWS as a verification machine. Five 1TB HDDs (EBS) are connected to the EC2 instance for Worker Node, and 5TB capacity is connected per node. However, since HDFS writes data in 3 multiplexes, the effective data capacity is about 5TB.
Table 2 Hardware specifications of verification environment
Manager Node | Master Node | Worker Node×3 | |
---|---|---|---|
Verification environment | AWS EC2 | AWS EC2 | AWS EC2 |
OS | CentOS 7 64bit | CentOS 7 64bit | CentOS 7 64bit |
CPU(Number of cores) | 2 | 4 | 96 (32 x 3 nodes) |
Memory(GB) | 16 | 32 | 768 (256GB x 3 nodes) |
HDD(GB) | 80GB | 80GB | 15TB※(1TB x 5 HDD x 3 nodes) |
The software versions used for verification are shown in Table 3 below.
Table 3 Software version of verification environment
software | version |
---|---|
Cloudera distribution | CDH 6.3.0 |
Spark | 2.4.0 |
Hive | 2.1.1 |
YARN | 2.5.0 |
HDFS | 3.0.0 |
Python | 3.7.3 |
Pandas | 0.24.2 |
Numpy | 1.16.4 |
In addition to the performance measurement results for the processing methods 1 and 2 below, which were verified in the second post last time, we will measure and compare the performance (3.) when parallel distributed processing is performed by Spark. ..
Run the code in Figure 4 from the second post on Python.
Execute the code optimized in Figure 5 of the second post against the code in Figure 4 of the second post on Python.
Execute parallel and distributed processing on Spark using the logic-optimized Python single-node processing code used in 2. and modified code for Spark processing (shown in Figure 1). ..
Spark execution parameters for executing tasks are set as shown in Table 4. Each Worker Node starts one worker process (Executor) and allocates it so that memory and core can be used exclusively in it.
Table 4 Spark execution parameters
# | item | Set value | Remarks |
---|---|---|---|
1 | Number of Executors | 3 | Assuming that each node is started one by one |
2 | Executor memory size | 128 GB | |
3 | Number of cores per executor | 30 | Allocate and use 30 cores out of 32 cores of the machine |
In the measurement, the total time required for the following three processes will be measured in the same way as the measurement for the second post.
Reading data from a data source into memory
Preprocessing such as data combination and aggregation for the read data
Write the processing result to the data store
In addition, the data to be measured was set to the same settings as in Part 2 (Table 3 in the second post).
Figure 3 shows the results of evaluating the processing time by executing each of the four types of processing for each data size for BigBench's business scenario # 5.
Figure 3 Data preprocessing time measurement results for each input data size
In the single node processing by Python, as the result in the second post, when the input data size is increased, at the stage of the data set smaller than the actual data set size (about 50 GB) Processing will not be possible due to lack of memory. On the other hand, when parallel and distributed processing was performed with Spark, the processing was completed normally even with the data set assumed for production, and the processing could be completed even with a larger size.
Figure 4 shows the progress of CPU, memory, and disk I / O usage when processing 22GB of data with Spark. It can be confirmed that the calculation time was greatly reduced because all the CPUs of each worker node were used 100%. Memory is also distributed to each node. Since each of the three nodes has the overhead of the underlying OS function, the memory usage of the entire cluster is larger than that of a single machine, and the memory used after starting the program of business scenario # 5 is per worker node. It is about 130-140GB, and the total of 3 nodes is about 410GB. In addition, as a result of processing (1) Inner join and processing (2) group by processing (see Fig. 3 posted in Part 2), you can see how disk write I / O is occurring. This I / O occurs because the combined or sorted data is stored on disk.
Figure 4 Temporal changes in CPU, memory, and I / O usage in a Spark environment
With the introduction of Spark, it has become possible to perform preprocessing on large-scale data that will be forcibly terminated due to insufficient memory when executed on Python on a single server. When dealing with data that exceeds the memory capacity limit of one machine, the introduction of Spark is considered appropriate.
Python cannot take advantage of multiple CPUs, so even if the data size increases, processing is executed sequentially on a single CPU. On the other hand, in Spark, the data is divided for each node, and the processing for each data is assigned to different nodes and CPU cores for parallel processing, so the processing time can be greatly reduced.
When using Pandas on Python, all the data to be processed is read into memory and processed sequentially by a single CPU. Also, in the example of business scenario # 5 this time, the repetition time increases purely in proportion to the data size.
In the processing with Spark, data is read into memory and written to disk for each partition divided by a certain size. Also, when any process that requires data exchange between the divided nodes (JOIN, GROUP BY, sorting, etc.) is performed, the result is written to disk. By processing large data for each partition in this way, even if the data size to be handled exceeds the amount of physical memory installed, the processing can be completed, but the processing time becomes a disk IO bottleneck and sudden processing Time can increase more than linear.
In Fig. 3, when the input data size is about 150GB, the processing time suddenly increases non-linearly, but this is because the data size to be handled exceeds the installed memory amount, disk IO occurs and it becomes a bottleneck. I am.
We introduced the results of performance evaluation and the know-how for improving performance at that time, targeting sample work that performs preprocessing such as data binding and aggregation for large-scale table data.
If the data to be processed is not so large (up to several tens of GB), the preprocessing with Python on a single node can be completed in a few hours, so the data preprocessing is performed with the preprocessing code in Python. Is possible. At that time, it is possible to shorten the processing time by implementing the logic optimization introduced in this series.
On the other hand, if the amount of data to be processed is large, Python may take too long to process or the process may fail, so applying a distributed processing platform such as Spark is a realistic option. When the data to be learned used in machine learning is about 22GB, the processing time can be shortened by about 94% compared to Python that operates on a single node (single thread) by using parallel distributed processing by Spark for preprocessing. I confirmed. We also confirmed that when using Spark with a Worker 3 node, Python can process data up to about 4 times the amount of data that can be processed with a single node while maintaining performance.
Recommended Posts