In February 2017, Yahoo! released a library that distributes TensorFlow with Spark. http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep
I personally played with Docker, but what about Keras? So, this time I will try to run Dist-Keras that distributes Keras with Spark with Docker.
The goal is to make use of the deep learning library in a scalable manner. By utilizing GPGPU, I feel that deep learning is focusing on scale-up.
However, if we combine scale-up, which requires the performance of arithmetic units to speed up processing, and scale-out, which requires the number of arithmetic units, we should be able to perform faster processing. Furthermore, I personally think that poor people who do not have a GPU will try their best at scale-out (this one is more urgent (laughs)). Therefore, I installed Keras, a deep learning library, in a Docker container, started multiple containers, and tried to distribute the load by scale-out. Since it is not possible to scale out with normal Keras, this time I will use a library that runs Keras with Spark.
Keras on Spark
It seems that there are two libraries that distribute Keras with Spark.
The first is Elephas, which seems to have been around for about a year. http://maxpumperla.github.io/elephas/
The other is Dist-Keras, which is issued by CERN (European Organization for Nuclear Research!). http://joerihermans.com/work/distributed-keras/ https://db-blog.web.cern.ch/blog/joeri-hermans/2017-01-distributed-deep-learning-apache-spark-and-keras
CERN is the original story of STEINS; GATE's SERN. CERN also seems to know Rintarou Okabe. http://gigazine.net/news/20140623-ama-cern/
This time, I will run CERN's Dist-Keras with Docker and run MNIST. I chose Dist-Keras simply because it's been updated recently and it works.
I also touched Elephas halfway, so I may start playing again soon.
The reason Dist-Keras runs on Docker is that containers make it easier to set up an environment than virtual machines. It's okay to have multiple EC2s, but it's a problem if the cost is high, so I started 3 containers of Docker in one virtual machine. One is a Spark master / worker (slave) and the rest are workers. It will be configured like this.
Dist-Keras Read more about Dist-Keras, as CERN is enthusiastic about it.
https://db-blog.web.cern.ch/blog/joeri-hermans/2017-01-distributed-deep-learning-apache-spark-and-keras
I will translate some of them.
Distributed Keras Distributed Keras is a distributed deep learning framework created using Apache Spark and Keras. The purpose is to greatly improve the learning time of machine learning and make it possible to analyze datasets that exceed the memory capacity. The project started in August 2016 in collaboration with CMS.
Architecture Basically, the learning process is passed to the Spark worker by a Lambda function. However, when passing multiple parameters, such as the parameter server port number, we wrapped them all in an object and made it a learning function that could use the parameters needed for Spark. To get a bird's eye view of the whole, let's explain with the following figure. The learning object first starts the parameter server with the Spark driver. Then start the worker process. It contains all the parameters and processes needed to train a Keras model. Furthermore, in order to prepare a worker who performs the required number of parallel processes, the data set is divided by a predetermined capacity. However, when processing big data, it is desirable to increase the elements of parallel processing. By doing so, you can avoid a situation where one worker is idle while another less capable worker has not finished spinning the batch (this is called the struggler problem). In this case, go to Spark Documentation As you can see, it is recommended to set the distributed element to 3.
However, there may be times when you need to consider using larger distributed elements. Basically, distributed processing is proportional to the number of partitions (number of partitions). For example, suppose you assign 20 workers to a task and set the variance element to 3. Spark divides the dataset into 60 shards. And before workers can start processing partitions, they must first load all the Python libraries needed to handle the task, and then deserialize and compile the Keras model. This process adds a large amount of overhead. As such, this technique is only useful when dealing with large datasets on non-heterogeneous systems that require long warm-up overhead (that is, when each worker has different hardware or value loads).
Dist-Keras Instrallation Dist-Keras is published on Github. https://github.com/cerndb/dist-keras
Spark and Keras must be installed before installing Dist-Keras. The installation method of these is as follows. https://spark.apache.org/docs/latest/index.html https://keras.io/ja/
To install Dist-Keras, use the following command.
## if you need git, run "yum -y install git" beforehand
git clone https://github.com/JoeriHermans/dist-keras
cd dist-keras
pip install -e .
I tried it on the virtual machine CentOS 7.3 and the Docker container (CentOS 7.3) and it installed without getting stuck.
Well, it's been a long time, but what we're doing this time is to run Dist-Keras in a Docker container and distribute Keras model learning among the containers. We will deploy the following configuration again.
Dist-Keras on Docker Dist-Keras doesn't have a Dockerfile or Docker image, so I made it myself. https://github.com/shibuiwilliam/distkeras-docker
A git clone will download the Dockerfile and spark_run.sh.
git clone https://github.com/shibuiwilliam/distkeras-docker.git
Go to the distkeras-docker directory and do docker build.
cd distkeras-docker
docker build -t distkeras .
It will take some time, but let's wait slowly. The main things that docker build does are:
--Installation of necessary tools --Installation and configuration of Python and Jupyter Notebook --Installation and configuration of Standalone Spark --Installing Dist keras --Added SparkMaster & Jupyter Notebook startup scripts spark_master.sh and spark_slave.sh
The base container is CentOS. Spark is 2.1.0 and Keras is 2.0.2, both of which are using the latest version.
Let's move it as soon as docker build is completed. This time, we will start 3 Docker containers to scale out.
# docker dist-keras for spark master and slave
docker run -it -p 18080:8080 -p 17077:7077 -p 18888:8888 -p 18081:8081 -p 14040:4040 -p 17001:7001 -p 17002:7002 \
-p 17003:7003 -p 17004:7004 -p 17005:7005 -p 17006:7006 --name spmaster -h spmaster distkeras /bin/bash
# docker dist-keras for spark slave1
docker run -it --link spmaster:master -p 28080:8080 -p 27077:7077 -p 28888:8888 -p 28081:8081 -p 24040:4040 -p 27001:7001 \
-p 27002:7002 -p 27003:7003 -p 27004:7004 -p 27005:7005 -p 27006:7006 --name spslave1 -h spslave1 distkeras /bin/bash
# docker dist-keras for spark slave2
docker run -it --link spmaster:master -p 38080:8080 -p 37077:7077 -p 38888:8888 -p 38081:8081 -p 34040:4040 -p 37001:7001 \
-p 37002:7002 -p 37003:7003 -p 37004:7004 -p 37005:7005 -p 37006:7006 --name spslave2 -h spslave2 distkeras /bin/bash
It consists of Spark master / worker (spmaster), worker 1 (spslave1), and worker 2 (spslave2). Run the Spark master and worker with spmaster, and join spslave1 and spslave2 as workers (slave) to the Spark cluster.
Spark master and worker startup scripts are provided as /opt/spark_master.sh and /opt/spark_slave.sh, respectively. Since it is moved to the / opt / directory when each container is started, the master and worker can be started by executing the command there.
#Run on Spark master
#Master and worker start, worker joins cluster
sh spark_master.sh
# Spark spslave1,Run in 2
#Worker starts and joins master cluster
sh spark_slave.sh
You now have one master and three workers in your Spark cluster. You can check it on the Spark Master console. http://<spark master>:18080
In this area, I would like to start multiple units at once with Docker compose or Swarm, but this is also my homework at a later date.
Let's try MNIST with Dist-Keras. MNIST sample code is provided by Dist-Keras. The directory is / opt / dist-keras / examples, which contains the following sample data and programs.
[root@spm examples]# tree
.
|-- cifar-10-preprocessing.ipynb
|-- data
| |-- atlas_higgs.csv
| |-- mnist.csv
| |-- mnist.zip
| |-- mnist_test.csv
| `-- mnist_train.csv
|-- example_0_data_preprocessing.ipynb
|-- example_1_analysis.ipynb
|-- kafka_producer.py
|-- kafka_spark_high_throughput_ml_pipeline.ipynb
|-- mnist.ipynb
|-- mnist.py
|-- mnist_analysis.ipynb
|-- mnist_preprocessing.ipynb
|-- spark-warehouse
`-- workflow.ipynb
This time, mnist.py is executed, but some code needs to be changed according to the environment.
Copy the original file, back it up, and apply the following changes.
cp mnist.py mnist.py.bk
Add the following at the beginning.
from pyspark.sql import SparkSession
Change Spark parameters for this environment. The intent of the change is as follows.
--Using Spark2 --Using the local environment --Define master url in local environment --Changed the number of workers from 1 to 3
# Modify these variables according to your needs.
application_name = "Distributed Keras MNIST"
using_spark_2 = True # False→True
local = True # False→True
path_train = "data/mnist_train.csv"
path_test = "data/mnist_test.csv"
if local:
# Tell master to use local resources.
# master = "local[*]" comment out
master = "spark://spm:7077" # add
num_processes = 1
num_executors = 3 # 1→3
else:
# Tell master to use YARN.
master = "yarn-client"
num_executors = 20
num_processes = 1
Change worker memory from 4G to 2G. This is not a mandatory change as it is simply tailored to your environment.
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "2g") # 4G→2G
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
You are now ready. Let's run MNIST.
python mnist.py
You can check the execution status on the Spark Master console. http://<spark master>:18080
Of course, you can also open details such as Jobs and Executer.
Jobs screen
Executer screen
Data is loaded into 3 workers and MNIST is being learned. By the way, the model of MNIST is as follows.
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv2d_1 (Conv2D) (None, 26, 26, 32) 320
_________________________________________________________________
activation_1 (Activation) (None, 26, 26, 32) 0
_________________________________________________________________
conv2d_2 (Conv2D) (None, 24, 24, 32) 9248
_________________________________________________________________
activation_2 (Activation) (None, 24, 24, 32) 0
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 12, 12, 32) 0
_________________________________________________________________
flatten_1 (Flatten) (None, 4608) 0
_________________________________________________________________
dense_1 (Dense) (None, 225) 1037025
_________________________________________________________________
activation_3 (Activation) (None, 225) 0
_________________________________________________________________
dense_2 (Dense) (None, 10) 2260
_________________________________________________________________
activation_4 (Activation) (None, 10) 0
=================================================================
Total params: 1,048,853.0
Trainable params: 1,048,853.0
Non-trainable params: 0.0
_________________________________________________________________
The program will be completed in about 30 minutes.
The result of trying with 3 workers is as follows.
Training time: 1497.86584091
Accuracy: 0.9897
Number of parameter server updates: 3751
With a learning time of less than 25 minutes, the accuracy of the correct answer rate was 0.9897. so-so.
By the way, the result of trying with one worker is as follows.
Training time: 1572.04011703
Accuracy: 0.9878
Number of parameter server updates: 3751
~~ ・ ・ ・ 3 workers or 1 worker can only take about 1 minute (-_-;) ~~
~~ Is this ... the choice of Steins Gate ... ~~
This time, I started the Docker container on the same server, but I think that the original load distribution is realized across servers. In the future, I would like to start the Dist-Keras container on multiple servers and ECS, configure a Spark cluster, and distribute the load.
[2017/04/16 postscript] We have verified how to improve performance. http://qiita.com/cvusk/items/f54ce15f8c76a396aeb1
[2017/05/26 postscript] I clustered it with Kubernetes. http://qiita.com/cvusk/items/42a5ffd4e3228963234d
Recommended Posts