Streaming Python and SensorTag, Kafka, Spark Streaming-Part 5: Connecting from Jupyter to Spark with Apache Toree

I will prepare a Spark cluster and write some sample code. I think that many people use Jupyter as an execution environment for Python data analysis and machine learning. The purpose is to write Spark apps interactively from Jupyter as well with Apache Toree. You can also use Jupyter with a Scala REPL that can be run from your browser.


Build a Spark cluster with Docker Compose. Many Spark Standalone Cluster images and docker-compose.yml are available on Docker Hub and GitHub.

I tried a few, but semantive / spark is simple and easy to use.

Docker Compose

How to use the semantive / spark image is described in Docker Images For Apache Spark. Docker Hub will be here and GitHub will be here.

I made some changes from docker-compose.yml in the repository. The main change is to specify the public IP address of the virtual machine on the cloud in the SPARK_PUBLIC_DNS and SPARK_MASTER_HOST environment variables, which explicitly specify the image tag to match the Spark version.


version: '2'
    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
      MASTER: spark://master:7077
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <Virtual machine public IP address>
      SPARK_MASTER_HOST: <Virtual machine public IP address>
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8080:8080
      - spark_data:/tmp/data

    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker1
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <Virtual machine public IP address>
      - master
      - 8081:8081
      - spark_data:/tmp/data

    image: semantive/spark:spark-2.1.1-hadoop-2.7.3
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker2
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: <Virtual machine public IP address>
      - master
      - 8082:8082
      - spark_data:/tmp/data

    driver: local

Start Spark Standalone Cluster.

$ docker-compose up -d

Open the Spark Master UI and check the status of the cluster.

http://<Virtual machine public IP address>:8080

Run spark-shell on the Master container to see the Scala and Spark versions. Spark is very fast to develop, and you will run into unexpected errors if you don't check carefully, including the Scala version.

$ docker-compose exec master spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.



Jupyter's Docker image uses the official jupyter / all-spark-notebook. It is an image that can be used up to Scala and Spark.

Apache Toree

Apache Toree is a tool for connecting to Spark clusters from Jupyter. In addition to PySpark, Scala, SparkR and SQL Kernels are provided.

If you look at the Dockerfile, Apache Toree is also installed.

# Apache Toree kernel
RUN pip --no-cache-dir install
RUN jupyter toree install --sys-prefix


Add the Jupyter service to docker-compose.yml for Spark Standalone Cluster.


    image: jupyter/all-spark-notebook:c1b0cf6bf4d6
      - master
      - 8888:8888
      - ./notebooks:/home/jovyan/work
      - ./ivy2:/home/jovyan/.ivy2
      - ./.env
      TINI_SUBREAPER: 'true'
      SPARK_OPTS: --master spark://master:7077 --deploy-mode client --packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3
    command: --NotebookApp.password=sha1:xxx --NotebookApp.iopub_data_rate_limit=10000000

About Jupyter service options

Since Spark Standalone Cluster does not use Hadoop, we have added a setting to use Amazon S3 to the distributed file system. It is convenient to have it in the save destination of sample data and Parquet files.


The jupyter / all-spark-notebook image is updated frequently. The version of Spark and Spark cluster used by Apache Toree will fail and will not start. This time, the version of Spark cluster is 2.1.1, so specify the tag of the image of the same version. jupyter / all-spark-notebook It is inconvenient to know only the ID of the image tag.

Since the version of Spark has already been upgraded to 2.2.0, specify the tag 2.1.1.   Pull the Docker image of the tag and check it with spark-shell.

$ docker pull jupyter/all-spark-notebook:c1b0cf6bf4d6
$ docker run -it --rm \
  jupyter/all-spark-notebook:c1b0cf6bf4d6 \

We have confirmed that the versions of Spark cluster and Spark and Scala are the same.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.


Also check the Jupyter version.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 jupyter --version


These are the two required settings for connecting to a remote Spark from Jupyter using Apache Toree. The TINI_SUBREAPER environment variable uses Tini for init.

If Spark does not use additional Jar files, you can connect to a remote Spark Standalone Cluster simply by specifying the following in the SPARK_OPTS environment variable. Same as normal spark-submit options.

--master spark://master:7077 --deploy-mode client

Add more --packages flags if you have additional Jar files. In this case, it's the package you need to connect to Amazon S3.

--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3


When dealing with large images with visualization tools such as Bokeh, specify the option of Jupyter startup script.


The default Jupyter authentication method is token. I changed to password authentication because it is troublesome to put a different token every time when starting and destroying frequently like Docker container. Use ipython to get the hash value of the password.

$ docker run -it --rm jupyter/all-spark-notebook:c1b0cf6bf4d6 ipython
Python 3.6.1 | packaged by conda-forge | (default, May 23 2017, 14:16:20)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.1.0 -- An enhanced Interactive Python. Type '?' for help.

The password is generated as follows. Specify the output hash value in the Jupyter startup option.

In [1]: from notebook.auth import passwd
In [2]: passwd()

Enter password:
Verify password:
Out[2]: 'sha1:xxx'


/ home / jovyan is the home directory of the user running the Jupyter container. Mount the created notebook or downloaded Jar file on the Docker host.


Write environment variables in the .env file and pass them to the container. Specify the access key and secret key to use to connect to Amazon S3.


Don't forget to add it to .gitignore so you don't commit to Git.


Use Spark and Amazon S3 from Jupyter

I'm going to write a sample in Scala and Python that uses Spark and Amazon S3 with Jupyter. In the article Monitoring Real-Time Uber Data Using Apache APIs, Part 1: Spark Machine Learning Use the Uber pickup data you are using as a sample. Here, we simply read the CSV file from S3 and display it.

Starts all services defined in docker-compose.yml.

$ docker-compose up -d

Open Jupyter in your browser and log in with the password you created earlier.

http://<Virtual machine public IP address>:8888

Data preparation

After cloning the repository, put the ʻuber.csvfile froms3cmd` into a suitable bucket.

$ git clone
$ cd spark-ml-kmeans-uber/data
$ s3cmd put uber.csv s3://<Bucket name>/uber-csv/


You can split the code into cells and execute them interactively where you want to see the code below. To write a Scala Notebook, select ʻApache Toree --Scala from the New` button in the upper right.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.

sc.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc.hadoopConfiguration.set("", "true")

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("dt", TimestampType, true) ::
    StructField("lat", DoubleType, true) ::
    StructField("lon", DoubleType, true) ::
    StructField("base", StringType, true) :: Nil

val df =
    option("header", false).
    csv("s3a://<Bucket name>/uber-csv/uber.csv")



For Scala The StructType in the schema can also be written as:

val schema = (new StructType).
    add("dt", "timestamp", true).
    add("lat", "double", true).
    add("lon", "double", true).
    add("base", "string", true)

This is the final output of (false).

|dt                   |lat    |lon     |base  |
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
only showing top 20 rows


When writing a Python 3 Notebook, select Python 3 from the New button in the upper right. Divide the following code into cells at appropriate points and execute it. It differs from Scala in that the additional Jar is specified in the PYSPARK_SUBMIT_ARGS environment variable.

You can write a Spark app in Python in much the same way as Scala, as shown below.

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

from pyspark.sql import SparkSession

spark = (

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("", "true")

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("dt", TimestampType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("base", StringType(), True)

df = (
    .option("header", False)
    .csv("s3a://<Bucket name>/uber-csv/uber.csv")



The final output of (truncate = False) is the same as the Scala code above.

|dt                   |lat    |lon     |base  |
|2014-08-01 00:00:00.0|40.729 |-73.9422|B02598|
|2014-08-01 00:00:00.0|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00.0|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00.0|40.751 |-73.9869|B02598|
|2014-08-01 00:00:00.0|40.7406|-73.9902|B02598|
|2014-08-01 00:00:00.0|40.6994|-73.9591|B02617|
|2014-08-01 00:00:00.0|40.6917|-73.9398|B02617|
|2014-08-01 00:00:00.0|40.7063|-73.9223|B02617|
|2014-08-01 00:00:00.0|40.6759|-74.0168|B02617|
|2014-08-01 00:00:00.0|40.7617|-73.9847|B02617|
|2014-08-01 00:00:00.0|40.6969|-73.9064|B02617|
|2014-08-01 00:00:00.0|40.7623|-73.9751|B02617|
|2014-08-01 00:00:00.0|40.6982|-73.9669|B02617|
|2014-08-01 00:00:00.0|40.7553|-73.9253|B02617|
|2014-08-01 00:00:00.0|40.7325|-73.9876|B02682|
|2014-08-01 00:00:00.0|40.6754|-74.017 |B02682|
|2014-08-01 00:00:00.0|40.7303|-74.0029|B02682|
|2014-08-01 00:00:00.0|40.7218|-73.9973|B02682|
|2014-08-01 00:00:00.0|40.7134|-74.0091|B02682|
|2014-08-01 00:00:00.0|40.7194|-73.9964|B02682|
only showing top 20 rows

