Streaming Python and SensorTag, Kafka, Spark Streaming-Part 5: Connecting from Jupyter to Spark with Apache Toree

I will prepare a Spark cluster and write some sample code. I think that many people use Jupyter as an execution environment for Python data analysis and machine learning. The purpose is to write Spark apps interactively from Jupyter as well with Apache Toree. You can also use Jupyter with a Scala REPL that can be run from your browser.

Spark

Build a Spark cluster with Docker Compose. Many Spark Standalone Cluster images and docker-compose.yml are available on Docker Hub and GitHub.

I tried a few, but semantive / spark is simple and easy to use.

Docker Compose

How to use the semantive / spark image is described in Docker Images For Apache Spark. Docker Hub will be here and GitHub will be here.

I made some changes from docker-compose.yml in the repository. The main change is to specify the public IP address of the virtual machine on the cloud in the SPARK_PUBLIC_DNS and SPARK_MASTER_HOST environment variables, which explicitly specify the image tag to match the Spark version.

docker-compose.yml


version: '2'
services:
  master:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
    environment:
      MASTER: spark://master:7077
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <Virtual machine public IP address>
      SPARK_MASTER_HOST: <Virtual machine public IP address>
    ports:
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8080:8080
    volumes:
      - spark_data:/tmp/data

  worker1:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker1
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8881
      SPARK_WORKER_WEBUI_PORT: 8081
      SPARK_PUBLIC_DNS: <Virtual machine public IP address>
    depends_on:
      - master
    ports:
      - 8081:8081
    volumes:
      - spark_data:/tmp/data

  worker2:
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker2
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 4
      SPARK_WORKER_MEMORY: 2g
      SPARK_WORKER_PORT: 8882
      SPARK_WORKER_WEBUI_PORT: 8082
      SPARK_PUBLIC_DNS: <Virtual machine public IP address>
    depends_on:
      - master
    ports:
      - 8082:8082
    volumes:
      - spark_data:/tmp/data

volumes:
  spark_data:
    driver: local

Start Spark Standalone Cluster.

$ docker-compose up -d

Open the Spark Master UI and check the status of the cluster.

http://<Virtual machine public IP address>:8080

Run spark-shell on the Master container to see the Scala and Spark versions. Spark is very fast to develop, and you will run into unexpected errors if you don't check carefully, including the Scala version.

$ docker-compose exec master spark-shell
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Jupyter

Jupyter's Docker image uses the official jupyter / all-spark-notebook. It is an image that can be used up to Scala and Spark.

Apache Toree

Apache Toree is a tool for connecting to Spark clusters from Jupyter. In addition to PySpark, Scala, SparkR and SQL Kernels are provided.

If you look at the Dockerfile, Apache Toree is also installed.

# Apache Toree kernel
RUN pip --no-cache-dir install https://dist.apache.org/repos/dist/dev/incubator/toree/0.2.0/snapshots/dev1/toree-pip/toree-0.2.0.dev1.tar.gz
RUN jupyter toree install --sys-prefix

docker-compose.yml

Add the Jupyter service to docker-compose.yml for Spark Standalone Cluster.

docker-compose.yml


  jupyter:
    image: jupyter/all-spark-notebook:c1b0cf6bf4d6
    depends_on:
      - master
    ports:
      - 8888:8888
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./ivy2:/home/jovyan/.ivy2
    env_file:
      - ./.env
    environment:
      TINI_SUBREAPER: 'true'
      SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
    command: start-notebook.sh --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000

About Jupyter service options

Since Spark Standalone Cluster does not use Hadoop, we have added a setting to use Amazon S3 to the distributed file system. It is convenient to have it in the save destination of sample data and Parquet files.

image

The jupyter / all-spark-notebook image is updated frequently. The version of Spark and Spark cluster used by Apache Toree will fail and will not start. This time, the version of Spark cluster is 2.1.1, so specify the tag of the image of the same version. jupyter / all-spark-notebook It is inconvenient to know only the ID of the image tag.

Since the version of Spark has already been upgraded to 2.2.0, specify the tag 2.1.1.   Pull the Docker image of the tag and check it with spark-shell.

$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
  jupyter/all-spark-notebook:c1b0cf6bf4d6 \
  /usr/local/spark-2.1.1-bin-hadoop2.7/bin/spark-shell

We have confirmed that the versions of Spark cluster and Spark and Scala are the same.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Also check the Jupyter version.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version
4.3.0

TINI_SUBREAPER and SPARK_OPTS

These are the two required settings for connecting to a remote Spark from Jupyter using Apache Toree. The TINI_SUBREAPER environment variable uses Tini for init.

If Spark does not use additional Jar files, you can connect to a remote Spark Standalone Cluster simply by specifying the following in the SPARK_OPTS environment variable. Same as normal spark-submit options.

--master spark://master:7077 --deploy-mode client

Add more --packages flags if you have additional Jar files. In this case, it's the package you need to connect to Amazon S3.

--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3

--NotebookApp.iopub_data_rate_limit

When dealing with large images with visualization tools such as Bokeh, specify the option of Jupyter startup script.

--NotebookApp.password

The default Jupyter authentication method is token. I changed to password authentication because it is troublesome to put a different token every time when starting and destroying frequently like Docker container. Use ipython to get the hash value of the password.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.

The password is generated as follows. Specify the output hash value in the Jupyter startup option.

In [1]: from notebook.auth import passwd
In [2]: passwd()

Enter password:
Verify password:
Out[2]: 'sha1:xxx'

volumes

/ home / jovyan is the home directory of the user running the Jupyter container. Mount the created notebook or downloaded Jar file on the Docker host.

env_file

Write environment variables in the .env file and pass them to the container. Specify the access key and secret key to use to connect to Amazon S3.

AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx

Don't forget to add it to .gitignore so you don't commit to Git.

.env

Use Spark and Amazon S3 from Jupyter

I'm going to write a sample in Scala and Python that uses Spark and Amazon S3 with Jupyter. In the article Monitoring Real-Time Uber Data Using Apache APIs, Part 1: Spark Machine Learning Use the Uber pickup data you are using as a sample. Here, we simply read the CSV file from S3 and display it.

Starts all services defined in docker-compose.yml.

$ docker-compose up -d

Open Jupyter in your browser and log in with the password you created earlier.

http://<Virtual machine public IP address>:8888

Data preparation

After cloning the repository, put the ʻuber.csvfile froms3cmd` into a suitable bucket.

$ git clone https://github.com/caroljmcdonald/spark-ml-kmeans-uber
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Bucket name>/uber-csv/

Scala

You can split the code into cells and execute them interactively where you want to see the code below. To write a Scala Notebook, select ʻApache Toree --Scala from the New` button in the upper right.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    getOrCreate()

sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("dt", TimestampType, true) ::
    StructField("lat", DoubleType, true) ::
    StructField("lon", DoubleType, true) ::
    StructField("base", StringType, true) :: Nil
)

val df = 
    spark.read.
    option("header", false).
    schema(schema).
    csv("s3a://<Bucket name>/uber-csv/uber.csv")

df.printSchema

df.cache
df.show(false)

For Scala The StructType in the schema can also be written as:

val schema = (new StructType).
    add("dt", "timestamp", true).
    add("lat", "double", true).
    add("lon", "double", true).
    add("base", "string", true)

This is the final output of df.show (false).

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Python

When writing a Python 3 Notebook, select Python 3 from the New button in the upper right. Divide the following code into cells at appropriate points and execute it. It differs from Scala in that the additional Jar is specified in the PYSPARK_SUBMIT_ARGS environment variable.

You can write a Spark app in Python in much the same way as Scala, as shown below.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .getOrCreate()
)

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("dt", TimestampType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("base", StringType(), True)
])

df = (
    spark.read
    .option("header", False)
    .schema(schema)
    .csv("s3a://<Bucket name>/uber-csv/uber.csv")
)

df.printSchema()

df.cache()
df.show(truncate=False)

The final output of df.show (truncate = False) is the same as the Scala code above.

+---------------------+-------+--------+------+
|dt                   |lat    |lon     |base  |
+---------------------+-------+--------+------+
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
+---------------------+-------+--------+------+
only showing top 20 rows

Recommended Posts

Streaming Python and SensorTag, Kafka, Spark Streaming-Part 5: Connecting from Jupyter to Spark with Apache Toree
Stream processing of Python and SensorTag, Kafka, Spark Streaming --Part 6: Window aggregation of PySpark Streaming from Jupyter
Stream processing for Python and SensorTag, Kafka, Spark Streaming --Part 1: Raspberry Pi 3
Word Count with Apache Spark and python (Mac OS X)
Operate Jupyter with REST API to extract and save Python code
Create folders from '01' to '12' with python
How to run Jupyter and Spark on Mac with minimal settings
How to build Python and Jupyter execution environment with VS Code
[Python] Try to recognize characters from images with OpenCV and pyocr
From Python to using MeCab (and CaboCha)
Fractal to make and play with Python
Connecting from python to MySQL on CentOS 6.4
CentOS 6.4 with Python 2.7.3 with Apache with mod_wsgi and Django
Porting and modifying doublet-solver from python2 to python3.
Linking python and JavaScript with jupyter notebook
WEB scraping with python and try to make a word cloud from reviews
Scraping tabelog with python and outputting to CSV
MessagePack-Try to link Java and Python with RPC
Interactively display algebraic curves with Python and Jupyter
[Python] How to read data from CIFAR-10 and CIFAR-100
From Python environment construction to virtual environment construction with anaconda
Perform a Twitter search from Python and try to generate sentences with Markov chains.
Extract images and tables from pdf with python to reduce the burden of reporting
Procedure to load MNIST with python and output to png
How to scrape image data from flickr with python
From Kafka to KSQL --Easy environment construction with docker
Try to operate DB with Python and visualize with d3
From buying a computer to running a program with python
Get mail from Gmail and label it with Python3
Investigate Java and python data exchange with Apache Arrow
Something to enjoy with Prim Pro (X-Play) and Python
ODBC access to SQL Server from Linux with Python
A learning roadmap that allows you to develop and publish services from scratch with Python
Try to generate a cyclic peptide from an amino acid sequence with Python and RDKit
I made a server with Python socket and ssl and tried to access it from a browser
[Python / Ruby] Understanding with code How to get data from online and write it to CSV
Precautions when inputting from CSV with Python and outputting to json to make it an exe