I will prepare a Spark cluster and write some sample code. I think that many people use Jupyter as an execution environment for Python data analysis and machine learning. The purpose is to write Spark apps interactively from Jupyter as well with Apache Toree. You can also use Jupyter with a Scala REPL that can be run from your browser.
Spark
Build a Spark cluster with Docker Compose. Many Spark Standalone Cluster images and docker-compose.yml are available on Docker Hub and GitHub.
I tried a few, but semantive / spark is simple and easy to use.
Docker Compose
How to use the semantive / spark
image is described in Docker Images For Apache Spark. Docker Hub will be here and GitHub will be here.
I made some changes from docker-compose.yml in the repository. The main change is to specify the public IP address of the virtual machine on the cloud in the SPARK_PUBLIC_DNS
and SPARK_MASTER_HOST
environment variables, which explicitly specify the image tag to match the Spark version.
docker-compose.yml
version: '2'
services:
master:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.master.Master -h master
hostname: master
environment:
MASTER: spark://master:7077
SPARK_CONF_DIR: /conf
SPARK_PUBLIC_DNS: <Virtual machine public IP address>
SPARK_MASTER_HOST: <Virtual machine public IP address>
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- spark_data:/tmp/data
worker1:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker1
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
SPARK_PUBLIC_DNS: <Virtual machine public IP address>
depends_on:
- master
ports:
- 8081:8081
volumes:
- spark_data:/tmp/data
worker2:
image: semantive/spark:spark-2.1.1-hadoop-2.7.3
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
hostname: worker2
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 4
SPARK_WORKER_MEMORY: 2g
SPARK_WORKER_PORT: 8882
SPARK_WORKER_WEBUI_PORT: 8082
SPARK_PUBLIC_DNS: <Virtual machine public IP address>
depends_on:
- master
ports:
- 8082:8082
volumes:
- spark_data:/tmp/data
volumes:
spark_data:
driver: local
Start Spark Standalone Cluster.
$ docker-compose up -d
Open the Spark Master UI and check the status of the cluster.
http://<Virtual machine public IP address>:8080
Run spark-shell on the Master container to see the Scala and Spark versions. Spark is very fast to develop, and you will run into unexpected errors if you don't check carefully, including the Scala version.
$ docker-compose exec master spark-shell
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Jupyter
Jupyter's Docker image uses the official jupyter / all-spark-notebook. It is an image that can be used up to Scala and Spark.
Apache Toree
Apache Toree is a tool for connecting to Spark clusters from Jupyter. In addition to PySpark, Scala, SparkR and SQL Kernels are provided.
If you look at the Dockerfile, Apache Toree is also installed.
# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix
docker-compose.yml
Add the Jupyter service to docker-compose.yml for Spark Standalone Cluster.
docker-compose.yml
jupyter:
image: jupyter/all-spark-notebook:c1b0cf6bf4d6
depends_on:
- master
ports:
- 8888:8888
volumes:
- ./notebooks:/home/jovyan/work
- ./ivy2:/home/jovyan/.ivy2
env_file:
- ./.env
environment:
TINI_SUBREAPER: 'true'
SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000
Since Spark Standalone Cluster does not use Hadoop, we have added a setting to use Amazon S3 to the distributed file system. It is convenient to have it in the save destination of sample data and Parquet files.
image
The jupyter / all-spark-notebook
image is updated frequently. The version of Spark and Spark cluster used by Apache Toree will fail and will not start. This time, the version of Spark cluster is 2.1.1
, so specify the tag of the image of the same version. jupyter / all-spark-notebook
It is inconvenient to know only the ID of the image tag.
Since the version of Spark has already been upgraded to 2.2.0, specify the tag 2.1.1
.
Pull the Docker image of the tag and check it with spark-shell
.
$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
jupyter/all-spark-notebook:c1b0cf6bf4d6 \
/usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell
We have confirmed that the versions of Spark cluster and Spark and Scala are the same.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Also check the Jupyter version.
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0
These are the two required settings for connecting to a remote Spark from Jupyter using Apache Toree. The TINI_SUBREAPER
environment variable uses Tini for init.
If Spark does not use additional Jar files, you can connect to a remote Spark Standalone Cluster simply by specifying the following in the SPARK_OPTS
environment variable. Same as normal spark-submit options.
--master spark://master:7077 --deploy-mode client
Add more --packages
flags if you have additional Jar files. In this case, it's the package you need to connect to Amazon S3.
--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
--NotebookApp.iopub_data_rate_limit
When dealing with large images with visualization tools such as Bokeh, specify the option of Jupyter startup script.
--NotebookApp.password
The default Jupyter authentication method is token. I changed to password authentication because it is troublesome to put a different token every time when starting and destroying frequently like Docker container. Use ipython to get the hash value of the password.
$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.
The password is generated as follows. Specify the output hash value in the Jupyter startup option.
In [1]: from notebook.auth import passwd
In [2]: passwd()
Enter password:
Verify password:
Out[2]: 'sha1:xxx'
volumes
/ home / jovyan
is the home directory of the user running the Jupyter container. Mount the created notebook or downloaded Jar file on the Docker host.
env_file
Write environment variables in the .env
file and pass them to the container. Specify the access key and secret key to use to connect to Amazon S3.
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
Don't forget to add it to .gitignore so you don't commit to Git.
.env
I'm going to write a sample in Scala and Python that uses Spark and Amazon S3 with Jupyter. In the article Monitoring Real-Time Uber Data Using Apache APIs, Part 1: Spark Machine Learning Use the Uber pickup data you are using as a sample. Here, we simply read the CSV file from S3 and display it.
Starts all services defined in docker-compose.yml.
$ docker-compose up -d
Open Jupyter in your browser and log in with the password you created earlier.
http://<Virtual machine public IP address>:8888
After cloning the repository, put the ʻuber.csvfile from
s3cmd` into a suitable bucket.
$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Bucket name>/uber-csv/
Scala
You can split the code into cells and execute them interactively where you want to see the code below. To write a Scala Notebook, select ʻApache Toree --Scala from the
New` button in the upper right.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
getOrCreate()
sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val schema = StructType(
StructField("dt", TimestampType, true) ::
StructField("lat", DoubleType, true) ::
StructField("lon", DoubleType, true) ::
StructField("base", StringType, true) :: Nil
)
val df =
spark.read.
option("header", false).
schema(schema).
csv("s3a://<Bucket name>/uber-csv/uber.csv")
df.printSchema
df.cache
df.show(false)
For Scala The StructType in the schema can also be written as:
val schema = (new StructType).
add("dt", "timestamp", true).
add("lat", "double", true).
add("lon", "double", true).
add("base", "string", true)
This is the final output of df.show (false)
.
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Python
When writing a Python 3 Notebook, select Python 3
from the New
button in the upper right. Divide the following code into cells at appropriate points and execute it. It differs from Scala in that the additional Jar is specified in the PYSPARK_SUBMIT_ARGS
environment variable.
You can write a Spark app in Python in much the same way as Scala, as shown below.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.getOrCreate()
)
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([
StructField("dt", TimestampType(), True),
StructField("lat", DoubleType(), True),
StructField("lon", DoubleType(), True),
StructField("base", StringType(), True)
])
df = (
spark.read
.option("header", False)
.schema(schema)
.csv("s3a://<Bucket name>/uber-csv/uber.csv")
)
df.printSchema()
df.cache()
df.show(truncate=False)
The final output of df.show (truncate = False)
is the same as the Scala code above.
+---------------------+-------+--------+------+
|dt |lat |lon |base |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows
Recommended Posts