As an AWS Professional, I was reincarnated about Spark being a demon world running on K8s (1) -I still do it on Mac-

Hi, my name is Yoda and I am in charge of big data in the DoCoMo Service Innovation Department. AWS Professional and SA have expired. By all means, Smash Bros. [^ 1] is overkill. [^ 1]: In charge, 4 people including myself entered VIP (within 5% of online competition Top 5%) due to inexperience. It is an excellent unlisted company that can acquire world combat power, so please come visit us if you are a student.

As the title suggests, we are developing a commercial service with Apache Spark on K8s. It's been two and a half years since I was reincarnated [^ 2] in the demon world [^ 2], and it was difficult for me to cut and paste the stray game, so I made hands-on for my juniors. [^ 2]: When I thought it was a demon world, AWS, also known as Gojo-sensei, wrote it in a dashing manner with area expansion (EMR on EKS). Come to Tokyo. [^ 3]: So-called transfer. I used to build a web application with React/Redux, but why did this happen?

** (1) ~~ Try Spark and Kubernetes on mac + Machine learning reasoning ~~ Still do it on Mac ** (Main story) (2) ~~ Build Spark application and deploy to Amazon EKS ~~ Managed K8s inferior student (scheduled to be written this winter) (3) ~~ Distributed real-time inference by Spark Streaming ~~ Welcome to the classroom of speed supremacy (scheduled to be written in 2021)

I want to complete it, so please subscribe to the channel and give it a high rating!

1. Overview

image.png Deploy a Spark cluster on your mac using the Kubernetes (K8s) feature that comes with Docker for mac. While experiencing the distributed processing from JupyterLab, we will make inferences with (planned) XGBoost that will lead to the final chapter. I use Python (PySpark) and Scala alternately as the language to use, but since Spark's API is well made, I wonder if both sides can touch it without any discomfort.

2. Preparation

2.1. Docker for Mac with K8s At the very least, all you need is this. In other words, ** I'm sorry for M1 mac **, this hands-on is not supported as of December 2020. (In the first place, if this function becomes unusable in the future due to the influence of Depricate [^ 4], which is a hot topic these days, this hands-on itself will be a piece of paper.)

It is a resource allocated to Docker for Mac,

Is assumed. If you can afford it, if you can increase each item, hands-on will progress, so please increase it. You can open the DashBoard and adjust the number with Resources> Advanced.

Next, enable K8s from DashBoard and restart it, referring to https://docs.docker.com/docker-for-mac/#kubernetes.

2.2. kubectl This command is used to operate K8s. After installing Homebrew on mac

brew install kubectl

I think it's easy to do. Check the operation

kubectl cluster-info

And command

Kubernetes master is running at https://kubernetes.docker.internal:6443
KubeDNS is running at https://kubernetes.docker.internal:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

If it returns, it is a success.

Please note that https://kubernetes.docker.internal:6443 will be the FQDN of the API server that receives the request to K8s.

2.3. Scala/Java/Python Basically, it is not necessary because the application is executed in a container, but if you also put Scala and Java in mac, it is convenient to say that it is convenient because you can try what you are interested in locally.

I will proceed with this hands-on, so if it is aligned, Java alone.

[^ 5]: Probably there will be multiple Java in mac, so google with "mac java version switch" etc.

3. Hands-on

Let's move on to hands-on.

3.1. Build Spark image

K8s controls the application by managing an isolated environment [^ 6] called a container that operates according to a defined runtime. [^ 6]: If you become?, Grab the seniors who are there with Introduction to container technology learned by newcomers who have no experience with containers.

Therefore, first of all, we will build the image of the container in which Spark runs using Docker. How to build a Docker image for Spark is in the Spark documentation (https://spark.apache.org/docs/latest/running-on-kubernetes.html#docker-images) ).

  1. Get the Spark library from the Download (https://spark.apache.org/downloads.html) page
  2. Run the shell script included in the library in the environment where Docker runs.

Will create an image. However, the libraries that can be downloaded include those that are not necessary for this operation (libraries for resource management tools other than K8s such as YARN), or conversely the libraries that you want to use in this hands-on (cloud storage such as Amazon S3). Does not include libraries that read and write). This is inconvenient.

3.1.1. Build Spark Library

So it takes time and effort, but let's build Spark from the source code and create a package that includes the functions you want. Documentation, Download the source code from GitHub, run the shell script and the library will be built. You can build Java and Maven on mac, but since this is a big deal, let's build in it using Docker.

Create a working directory and move it.

mkdir spark-images && cd spark-images

Please put the following Dockerfile in it.

spark-images/SparkLib.Dockerfile


ARG JDK_VERSION

FROM maven:3.6.3-jdk-${JDK_VERSION} AS builder

ARG SPARK_VERSION
ARG HADOOP_VERSION
ARG HADOOP_BINARY_VERSION
ARG SCALA_BINARY_VERSION

WORKDIR /workdir
RUN git clone https://github.com/apache/spark.git

WORKDIR /workdir/spark
RUN git checkout v${SPARK_VERSION}
# From 3.0.0, default scala version is 2.12 so this will work in the case of v2.13
RUN dev/change-scala-version.sh ${SCALA_BINARY_VERSION}
RUN dev/make-distribution.sh \
    --tgz \
    -Dhadoop.version=${HADOOP_VERSION} \
    -Pscala-${SCALA_BINARY_VERSION} \
    -Phadoop-${HADOOP_BINARY_VERSION} \
    -Phadoop-cloud \
    -Pkubernetes \
    -Pkinesis-asl

The build contents are changed by the optional argument given to make-distribution.sh. Here, K8s, cloud storage, and Amazon Kinesis are specified to be available.

The docker build command builds the Spark library inside Docker.

JDK_VERSION="11"
SPARK_VERSION="3.0.1"
HADOOP_VERSION="3.2.0"
HADOOP_BINARY_VERSION="3.2"
SCALA_BINARY_VERSION="2.12"

docker build \
  -f ./SparkLib.Dockerfile \
  -t spark${SPARK_VERSION}-hadoop${HADOOP_VERSION}-libraries:java${JDK_VERSION} \
  --build-arg JDK_VERSION=${JDK_VERSION} \
  --build-arg SPARK_VERSION=${SPARK_VERSION} \
  --build-arg HADOOP_VERSION=${HADOOP_VERSION} \
  --build-arg HADOOP_BINARY_VERSION=${HADOOP_BINARY_VERSION} \
  --build-arg SCALA_BINARY_VERSION=${SCALA_BINARY_VERSION} \
  .

As specified in the -t option, you should have an image like spark3.0.1-hadoop3.2.0-libraries: java11. It contains a tar.gz file that can be downloaded from Spark's web page (excluding unnecessary libraries and adding libraries that can operate storage in public clouds such as AWS). is.

3.1.2. Build Spark image

Take out the build result tar.gz and run the script for Docker image build included in it.

SPARK_LIB_IMAGE="spark3.0.1-hadoop3.2.0-libraries:java11" #The image name you just built

SPRRK_LIB=$(docker create ${SPARK_LIB_IMAGE})  #Temporarily run the container from the image
docker cp ${SPRRK_LIB}:/workdir/spark/spark-${SPARK_VERSION}-bin-${HADOOP_VERSION}.tgz /tmp  #mac/tar in the tmp directory.Take out gz

cd .. # spark-It's okay if you go to the work route above the images.

mkdir spark-lib
tar -xvf /tmp/spark-${SPARK_VERSION}-bin-${HADOOP_VERSION}.tgz -C spark-lib

#If you want to run a Spark Streaming job sourced from Kinesis, license convenience tar.Since it is not packaged in gz, copy it at this timing. You don't have to do it this time.
docker cp ${SPRRK_LIB}:/workdir/spark/external/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_${SCALA_BINARY_VERSION}-${SPARK_VERSION}.jar spark-lib/spark-${SPARK_VERSION}-bin-${HADOOP_VERSION}/jars

docker rm -v ${SPRRK_LIB}  #Stop the container because it is unnecessary

cd spark-lib/spark-${SPARK_VERSION}-bin-${HADOOP_VERSION}

./bin/docker-image-tool.sh  #Displaying a simple execution example
./bin/docker-image-tool.sh -t v3.0.1-java11 -b java_image_tag=11-jre-slim build  #Create a running image of Spark based on the Docker image of Java 11

Hopefully you'll have an image named spark: v3.0.1-java11.

3.2. Reference: Operation check by spark-shell

If you have Java 11 on your mac, you can check the startup. (You don't have to do it because you will see it from Jupyter Notebook after this)

Invoking the command spark-shell in the library you just unzipped will start a session (REPL) in interactive mode with Spark. Use this to confirm startup.

java -version #Check the version of Java installed on your mac, note that it must be 11.
SPARK_IMAGE="spark:v3.0.1-java11"

cd /opt/spark/
./bin/spark-shell \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode client \
  --name spark-shell \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.container.image=${SPARK_IMAGE} \
  --conf spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true \
  --conf spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true \
  --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider

After some Warn logs

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.9.1)
Type in expressions to have them evaluated.
Type :help for more information.


scala>

And Scala's REPL session should be fine for the time being. Open another TAB in the terminal and see the pod list.

kubectl get pod 

result

NAME                                          READY   STATUS    RESTARTS   AGE
spark-shell-ad3fd57626459aa8-exec-1           1/1     Running   0          2m38s
spark-shell-ad3fd57626459aa8-exec-2           1/1     Running   0          2m38s

And, if the Executor is running, it has succeeded in communicating with the K8s.

Let's try to get the open data published by AWS that will be used in the future with REPL as it is.

val textFile = spark.read.textFile("s3a://nyc-tlc/misc/taxi _zone_lookup.csv").count
textFile: Long = 266

It may take some time to display the result depending on the environment, but the driving confirmation is successful.

3.3. Deploying Jupyter to K8s

REPL is good, but I'm not good at saving results and visualizing data. Let's make it possible to operate Spark from Jupyter, which everyone loves here. You can start it with docker run as it is, but here, let's deploy to K8s and experience the operation method and resources (only a part of) of K8s.

3.3.1. Build JupyterLab image

Use https://hub.docker.com/r/jupyter/all-spark-notebook. Jupyter and Spark are bundled [^ 7] in this image, and it is possible to operate the Spark cluster from Jupyter .... But I can't access S3 as it is. I want to touch S3, so I'm going to roughly replace the included Spark related libraries with the libraries I built earlier.

[^ 7]: The latest version at the moment (tag: 5cfa60996e84) just seemed to use Spark 3.0.1 and Java 11. For those who want to see this article and try it in the future, it may be better to have only the Java version. (If you need other customs, you can build it from the Dockerfile on GitHub, like Jupyter docs.)

Also, create a working directory and move to it.

mkdir jupyter-lab && cd jupyter-lab

Write a Dockerfile in it,

jupyter-lab/Jupyter.Dockerfile


ARG SPARK_IMAGE

FROM ${SPARK_IMAGE} as spark

FROM jupyter/all-spark-notebook

COPY --from=spark /opt/spark/ /usr/local/spark/

RUN pip install s3fs xgboost --user  #I will need it in the next step, so I will put it in

Build.

docker build \
  -f ./Jupyter.Dockerfile \
  -t jupyter-allspark:v3.0.1-java11 \
  --build-arg SPARK_IMAGE=${SPARK_IMAGE} \
  .

3.3.2. Deploying JupyterLab to K8s

Write a YAML file like the one below

jupyter-lab/jupyter-lab.yaml


apiVersion: apps/v1
kind: Deployment
metadata:
  name: jupyter-lab
  labels:
    app: jupyter-lab
spec:
  replicas: 1
  selector:
    matchLabels:
      app: jupyter-lab
  template:
    metadata:
      labels:
        app: jupyter-lab
    spec:
      containers:
        - name: jupyter
          image: jupyter-allspark:v3.0.1-java11
          command: ["start-notebook.sh"]
          args: ["--NotebookApp.password=sha1:323c126464df:a12dff0b86cf01c3110ce00a433dab7c9d438593"]
          imagePullPolicy: Never
          env:
          - name: JUPYTER_ENABLE_LAB
            value: "true"
          ports:
          - name: jupyter
            containerPort: 8888
          - name: spark-webui
            containerPort: 4040
          - name: spark-driver
            containerPort: 51810

---

apiVersion: v1
kind: Service
metadata:
  name: jupyter-lab
spec:
  type: LoadBalancer
  selector:
    app: jupyter-lab
  ports:
  - name: jupyter
    protocol: TCP
    port: 8888
    targetPort: 8888
  - name: spark-webui
    protocol: TCP
    port: 4040
    targetPort: 4040

---

apiVersion: v1
kind: Service
metadata:
  name: spark-driver-headless
spec:
  type: ClusterIP
  clusterIP: None
  ports:
  - name: "spark-driver"
    protocol: "TCP"
    port: 51810
    targetPort: 51810
  selector:
    app: jupyter-lab

JupyterLab starts by specifying it with the kubectl apply command.

kubectl apply -f jupyter-lab.yaml
deployment.apps/jupyter-lab created
service/jupyter-lab created
service/spark-driver-headless created

Hopefully you'll get a response like this. If you access http: // localhost: 8888 from your browser, the Jupyter screen will be displayed and you will be asked for a password. You can log in to Jupyter by typing my-spark-on-k8s. image.png

3.3.3. Brief explanation: K8s

It may be ??? around here, but for the time being, it is only that ** by expressing resources that cooperate with containers in YAML, you can easily and flexibly control multiple containers **. It's okay if you know. With this hands-on, I only use one mac, but the nice thing about K8s is that you can easily deploy applications with YAML and kubectl even when multiple machines are lined up.

To briefly explain the created resource

-** Pod : The smallest unit of deployment that belongs to a Wokrload resource (a resource to calculate). Even though it is the smallest, it seems to be called a pod (a flock of whales) because multiple containers can be combined into one pod. However, it is easy to write when writing YAML if you have the same image as running with Docker alone, such as having environment variables, specifying startup commands, selecting the port to publish. - Replicaset : A resource that copies pods and operates them in parallel. However, this time the number of replicas is 1, so you may not feel that it is working very effectively. - Deployment **: This is what I wrote on YAML. It has a Replicaset under it, so it has a Pod under it. You can update the pods under it while rolling well by using Deployment. (I don't know if it's elsewhere, but our team uses this as the basic deployment unit. The name is Deployment.)

-** Service **: A resource for exposing the started container to the outside. Basically, K8s pods cannot be accessed from the outside as they are, but by creating a Service, users can access it from a browser. There are several types of Services, and this time we first created ** LoadBlancer ** to access the Jupyter and Spark web UIs. In addition, a service called ** Headless Service ** is started so that other pods can access the target pod by FQDN. Through this, the Spark Executor is accessing the Jupyter container that is the Driver (because the host name is required for communication from the Executor to the Driver). I posted it at the beginning of the article, but it looks like this. image.png

Let's take a look at some of the most frequently used items about kubectl.

kubectl get pod #Get a list of pods

NAME                           READY   STATUS    RESTARTS   AGE
jupyter-lab-5ddfbc9db9-tvzqv   1/1     Running   0          2m49s

I think the person who checked the connection locally has typed it earlier, but it is a command to get a list of pods. Unless you explicitly specify it, I think it is a name with a random number added to the name specified in YAML.

kubectl describe pod jupyter-lab-5ddfbc9db9-tvzqv  #View pod details. Random numbers are different for each pod created, so be careful not to copy

Name:           jupyter-lab-5ddfbc9db9-tvzqv
Namespace:      default

...

Events:
  Type    Reason     Age        From                     Message
  ----    ------     ----       ----                     -------
  Normal  Scheduled  <unknown>                           Successfully assigned default/jupyter-lab-5ddfbc9db9-tvzqv to docker-desktop
  Normal  Pulled     3m35s      kubelet, docker-desktop  Container image "jupyter-allspark:v3.0.1-java11" already present on machine
  Normal  Created    3m35s      kubelet, docker-desktop  Created container jupyter
  Normal  Started    3m35s      kubelet, docker-desktop  Started container jupyter

If you think something is wrong, it's basic to take a look at the last item of the describe pod, Events that occurred in the pod. Often, the container is overstarted and lacks resources.

If the container itself seems to be working properly ..., check the pod log or connect to the pod and try the command.

kubectl logs jupyter-lab-5ddfbc9db9-tvzqv  #Check the log output by Pod to standard output

kubectl exec -it jupyter-lab-5ddfbc9db9-tvzqv bash  #Connect inside the pod

If you want to stop Jupyter, delete the entire Deployment. Even if I erase the pod, it will start up again due to the effect of Replicaset. Also, if you want to stop the work itself, delete the Service as well. You don't have to hit it now.

kubectl delete deployments.apps jupyter-lab

kubectl delete -f jupyter-lab.yaml  #Delete all resources listed in YAML

If you hit it, please restart it with kubectl apply.

3.4. Data manipulation with PySpark

Recently, I have the impression that many data scientists use Python (or R). You can also operate Scala, the development language of Spark, with Jupyter that you started earlier, but here, let's first experience Python, that is, data manipulation using PySpark.

(As you can see after this, Spark's API is well-developed, so I think you don't have to worry too much about the language for analysis purposes.)

3.4.1 Starting Executor and checking its operation

Create a Python notebook with a suitable name and put it in the first cell

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('k8s://https://kubernetes.docker.internal:6443') \
    .appName('pyspark') \
    .config('spark.driver.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true') \
    .config('spark.driver.host', 'spark-driver-headless.default.svc.cluster.local') \
    .config('spark.driver.port', 51810) \
    .config('spark.executor.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true') \
    .config('spark.executor.instances', 2) \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider') \
    .config('spark.kubernetes.container.image', 'spark:v3.0.1-java11') \
    .getOrCreate()

sc = spark.sparkContext

Enter various settings and execute and initialize.

--Set the FQDN of the K8s API server to the value of master. This will allow Jupyter (driver) to make an Executor launch call. --Specify driver.host and driver.port so that you can communicate to Driver via Headless Service --Two Executor containers to start --Set null credentials to access public S3 --Use spark: v3.0.1-java11 for the Executor image

That is the point that should be suppressed.

Also, if you try to display the pod at this timing, I think that the pod of the Exector is running.

kubectl get pod
NAME                              READY   STATUS    RESTARTS   AGE
jupyter-lab-5ddfbc9db9-tvzqv      1/1     Running   0          40m
pyspark-6e181b764a13babe-exec-1   1/1     Running   0          22s
pyspark-6e181b764a13babe-exec-2   1/1     Running   0          21s

Let's access open data hosted by AWS and check if it is working

text_file = spark.read.text('s3a://nyc-tlc/misc/taxi _zone_lookup.csv')
text_file.count()

After waiting for a while, if the value returns as 266, it is successful (it takes time because the file is fetched from S3).

3.4.2. DataFrames operation

Now let's use Spark to manipulate the data. In Spark, you can operate through DataFrames expressed in table format like R and Pandas. (You can play with lower level data structures, but this time it's okay)

As the target data, we will use New York City Taxi and Limousine Commission (TLC) Trip Record Data hosted by AWS as public data, which was used in the operation check earlier. I have released data on getting on and off taxis in New York, and I think it is data that is often used for tutorials on AWS services. It is classified into several types according to the type of taxi, but I will use csv that stores the data of the yellow taxi that runs mainly in Manhattan, which I often see in movies (It is interesting to see the background of this area when I google it) ).

The column definitions in the CSV file are as described in here. According to this, first define the data type of each column, that is, the schema.

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
schema = StructType([
    StructField('vendor_id', IntegerType() , True),
    StructField('tpep_pickup_datetime', TimestampType() , True),
    StructField('tpep_dropoff_datetime', TimestampType() , True),
    StructField('passenger_count', IntegerType() , True),
    StructField('trip_distance', DoubleType() , True),
    StructField('rate_code_id', IntegerType() , True),
    StructField('store_and_fwd_flag', StringType() , True),
    StructField('pu_location_id', IntegerType() , True),
    StructField('do_location_id', IntegerType() , True),
    StructField('payment_type', IntegerType() , True),
    StructField('fare_amount', DoubleType() , True),
    StructField('extra', DoubleType() , True),
    StructField('mta_tax', DoubleType() , True),
    StructField('tip_amount', DoubleType() , True),
    StructField('tolls_amount', DoubleType() , True),
    StructField('improvement_surcharge', DoubleType() , True),
    StructField('total_amount', DoubleType() , True),
    StructField('congestion_surcharge', DoubleType() , True),
])

Next, convert the data on S3 to DataFrames according to the defined schema. (Using 2020 data with a small amount of data, probably due to Corona ...)

df_yt_202006 = spark.read.format('csv').schema(schema) \
    .option('delemiter', ',') \
    .option('header', 'true') \
    .option('timestampFormat', "yyyy-MM-dd HH:mm:ss") \
    .load('s3a://nyc-tlc/trip data/yellow_tripdata_2020-06.csv')

Spark actually starts reading data after it is needed (lazy evaluation), so at this point data reading is not actually running (I think that the notebook cell will be completed in an instant). .. The data is read for the first time (in this case, it is acquired from S3) when the operation shown below is performed.

df_yt_202006.show()    #Display the first 20 lines
df_yt_202006.show(50)  #Specify the number of lines
df_yt_202006.first()   #First line
df_yt_202006.tail(10)  #Last 10 lines:take time

df_yt_202006.filter(df_yt_202006['passenger_count'] >= 7).show()  #7 or more passengers. I know if it's a minivan, but ...

If you are running interactively, you may find that reading data is slow many times. If you have plenty of memory, you can speed it up by caching frequently used DataFrames in memory. Note that for lazy evaluation, it is possible to use the cache for the first time after the calculation is executed once for the target DataFrames after the cache instruction.

#With Jupyter, divide by cell,%%You can measure the execution time by inserting time in the first line.
df_yt_202006.filter(df_yt_202006['passenger_count'] >= 7).count()  #About 30 seconds
df_yt_202006.cache()  #Cache instruction.
df_yt_202006.filter(df_yt_202006['passenger_count'] >= 7).count()  #It will be cached after this operation is completed for about 30 seconds.
df_yt_202006.filter(df_yt_202006['passenger_count'] >= 7).count()  #Significantly shortened by about 300 milliseconds

image.png

If you don't like being slow, please cache df_yt_202006 for the time being. If you don't have enough memory, follow the steps below to remove it.

df_yt_202006.unpersist()  #purge
from pyspark.sql import SQLContext
SQLContext.getOrCreate(sc).clearCache()  #Click here if you want to forcibly purge all caches
df_yt_202006.filter(df_yt_202006['passenger_count'] >= 7).count()  #About 30 seconds again

3.4.3 Operation with Spark SQL

Even more complicated operations can be expressed and executed in SQL. This area is the amazing Tokoro of Spark.

The call to createOrReplaceTempView () creates a valid SQL-issueable table within that session.

df_yt_202006.createOrReplaceTempView('yellow_tripdata_2020_06')  #The table name is given as an argument

Let's display the time and distance in descending order of boarding fare.

df = spark.sql(
    """
    SELECT
      tpep_pickup_datetime,
      tpep_dropoff_datetime,
      trip_distance,
      pu_location_id,
      do_location_id,
      total_amount
    FROM
      yellow_tripdata_2020_06
    ORDER BY
      total_amount DESC
    LIMIT
      10
    """
)

df.show()  #Disposable DataFrames will be df after this, so be careful before and after when executing with a notebook.

image.png I ran nearly 200 miles for 6 hours, and it feels like a total of about $ 1,100.

I don't know where PickUp/DropOff was done, so let's load another table and join.

df_tzone_lookup = spark.read.format('csv') \
    .option('delemiter', ',') \
    .option('header', 'true') \
    .option('inferschema', 'true') \
    .load('s3a://nyc-tlc/misc/taxi _zone_lookup.csv') 

If you turn on infer schema, the schema will be estimated. Convenient when disposable. (Note that it takes a little time to estimate, and the column name and type may not be visible from the code on GitHub, for example.)

df_tzone_lookup.createOrReplaceTempView('taxi_zone_lookup')
df_joined = spark.sql(
    """
    SELECT
      tpep_pickup_datetime,
      tpep_dropoff_datetime,
      trip_distance,
      pu_location_id,
      pu_lookup.Zone AS pu_zone,
      do_location_id,
      do_lookup.Zone AS do_zone,
      total_amount
    FROM
      yellow_tripdata_2020_06
    LEFT JOIN 
      taxi_zone_lookup AS pu_lookup
    ON 
      pu_location_id = pu_lookup.LocationID
    LEFT JOIN 
      taxi_zone_lookup AS do_lookup
    ON 
      do_location_id = do_lookup.LocationID
    ORDER BY
      total_amount DESC
    """
)
df_joined.show()

image.png

After all, the boarding / alighting place is NA. Is it out of state (can I go?) Or is it an outlier?

Since it can be converted to Pandas, it can also be visualized on Jupyter. After sampling, convert it to Pandas Dataframe and visualize it. (Bokeh is used this time)

df = df_joined.select(['trip_distance','total_amount']) \
    .sample(fraction=(1000/df_joined.count())) \
    .toPandas()

from bokeh.plotting import figure, output_notebook, show

output_notebook()
p = figure(
    title='Trip Distance VS Total Amount',
    x_axis_label='Trip Distance [Mile]',
    y_axis_label='Total Amount [$]'
)
p.circle(df['trip_distance'], df['total_amount'])
show(p)

image.png

There seems to be a correlation, and it seems that there are things that are charged even though the distance is 0, and there are things that are charged negatively even though you are driving. Pretreatment seems necessary (but not done).

3.4.3. Reading multiple files

With the memory of mac, it will be difficult to read too much data, but I will introduce it for the time being.

#List format
df = spark.read.format('csv').schema(schema) \
    .option('delemiter', ',') \
    .option('header', 'true') \
    .option('timestampFormat', "yyyy-MM-dd HH:mm:ss") \
    .load([
        's3a://nyc-tlc/trip data/yellow_tripdata_2020-04.csv',
        's3a://nyc-tlc/trip data/yellow_tripdata_2020-05.csv',
        's3a://nyc-tlc/trip data/yellow_tripdata_2020-06.csv',
    ])
#Pattern matching
df = spark.read.format('csv').schema(schema) \
    .option('delemiter', ',') \
    .option('header', 'true') \
    .option('timestampFormat', "yyyy-MM-dd HH:mm:ss") \
    .load('s3a://nyc-tlc/trip data/yellow_tripdata_2020-0{[4-6]*}.csv')

If you do df.count (), you should get 1,136,124. If this is a larger number of files and you want to process them faster ... that's why you use Spark and K8s.

When you're done, press the Restart the Kernel button (update mark) on your notebook to disconnect this session (to open resources). I think the Executor's pod will also disappear, but just in case, use kubectl get pod to check the existence of the Executor, and if it does not disappear, drop it with kubectl delete pod xxx.

3.5. Inference by XGBoost

With Spark, you can also perform distributed machine learning-related tasks. Apply the model (learned in Python) provided by the data scientist to DataFrames, infer with Scala, and so on.

3.5.1. Model preparation

If you use Spark, you can study in a distributed manner, but since the number of pages will increase, you can easily prepare a model at hand. Using the data from April 2020, let's create a regression model that predicts total_amount with trip_distance, pu_location_id,do_location_id,as explanatory variables.

If you run it on Jupyter deployed with K8s as it is, you may run out of memory. In that case,

  1. After saving by downloading the Notebook you want to save, drop Jupyter with kubectl delete -f jupyter -lab.yaml.
  2. Start and access Jupyter with docker run --rm -p 8888 jupyter-allspark: v3.0.1-java11 jupyter-lab by focusing on the resources assigned to Docker.

Please do something like that.

Divide into the following appropriate parts and execute.

from sklearn.model_selection import train_test_split
import pandas as pd
import xgboost as xgb


"""
Parameters
"""
#Hyperparameters, we will tune after this
params = {
    'max_depth':6,
    'min_child_weight': 1,
    'eta':.3,
    'subsample': 1,
    'colsample_bytree': 1,
    'objective':'reg:squarederror',
}
params['eval_metric'] = 'mae' #Evaluation function to MAE
#Parameters other than the above
num_boost_round=999  #Optimize by entering a large value
early_stopping_rounds = 10  #Censor to avoid overfitting
nfold = 5  #Number of cross-validations
#Optimal parameter storage location
best_params = {
    'max_depth': None,
    'min_child_weight': None,
    'subsample': None,
    'colsample_bytree': None,
    'eta': None,
}

"""
Data preparation
"""
df = pd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2020-04.csv'
)[['trip_distance', 'PULocationID', 'DOLocationID', 'total_amount']] \
.rename(columns={'PULocationID': 'pu_location_id', 'DOLocationID': 'do_location_id'})
#Data split
X_train, X_test, y_train, y_test = train_test_split(
    df.drop('total_amount', axis=1), 
    df['total_amount'].ravel(),
    test_size=0.2
)
#Convert to matrix format handled by XGB
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

"""
Learn normally (for comparison)
"""
model = xgb.train(
    params,
    dtrain,
    num_boost_round=num_boost_round,
    evals=[(dtest, 'Test')],
    early_stopping_rounds=early_stopping_rounds,
)

print('Best MAE: {:.2f} with {} rounds'.format(
    model.best_score,
    model.best_iteration+1
))  #Best score and best Boosting Round

"""
Hyperparameter tuning
"""
# max_depth:Maximum tree depth
# min_child_weight:Minimum weight
gridsearch_params = [
    (max_depth, min_child_weight)
    for max_depth in range(8,12)
    for min_child_weight in range(5,9)
]
min_mae = float('Inf')
for max_depth, min_child_weight in gridsearch_params:
    #Grid search within the defined range of values
    print('CV with max_depth={}, min_child_weight={}'.format(
        max_depth,
        min_child_weight
    ))
    params['max_depth'] = max_depth
    params['min_child_weight'] = min_child_weight
    #Cross-validation
    cv_results = xgb.cv(
        params,
        dtrain,
        num_boost_round=num_boost_round,
        seed=42,
        nfold=5,
        metrics={'mae'},
        early_stopping_rounds=10
    )
    mean_mae = cv_results['test-mae-mean'].min()
    boost_rounds = cv_results['test-mae-mean'].argmin()
    print('\tMAE {} for {} rounds'.format(mean_mae, boost_rounds))
    #Keeping the best score
    if mean_mae < min_mae:
        min_mae = mean_mae
        best_params.update(max_depth=max_depth, min_child_weight=min_child_weight)
        
# subsample:Data usage rate in the row direction
# colsample:Data usage rate in the column direction
gridsearch_params = [
    (subsample, colsample)
    for subsample in [i/10. for i in range(7,11)]
    for colsample in [i/10. for i in range(7,11)]
]
min_mae = float('Inf')
for subsample, colsample in reversed(gridsearch_params):
    #Grid search in descending order
    print('CV with subsample={}, colsample={}'.format(
        subsample,
        colsample
    ))
    params['subsample'] = subsample
    params['colsample_bytree'] = colsample
    #Cross-validation
    cv_results = xgb.cv(
        params,
        dtrain,
        num_boost_round=num_boost_round,
        seed=42,
        nfold=nfold,
        metrics={'mae'},
        early_stopping_rounds=early_stopping_rounds,
    )
    mean_early_stopping_rounds = cv_results['test-mae-mean'].min()
    boost_rounds = cv_results['test-mae-mean'].argmin()
    print('\tMAE {} for {} rounds'.format(mean_mae, boost_rounds))
    if mean_mae < min_mae:
        min_mae = mean_mae
        best_params.update(subsample=subsample, colsample_bytree=colsample)

# ETA:Learning rate
min_mae = float('Inf')
for eta in [.3, .2, .1, .05, .01, .005]:
    print('CV with eta={}'.format(eta))
    params['eta'] = eta
    %time cv_results = xgb.cv(params, dtrain, num_boost_round=num_boost_round, seed=42, nfold=nfold, metrics=['mae'], early_stopping_rounds=early_stopping_rounds)
    mean_mae = cv_results['test-mae-mean'].min()
    boost_rounds = cv_results['test-mae-mean'].argmin()
    print('\tMAE {} for {} rounds\n'.format(mean_mae, boost_rounds))
    if mean_mae < min_mae:
        min_mae = mean_mae
        best_params.update(eta=eta)

print(params)
params.update(
    max_depth=best_params['max_depth'],
    min_child_weight =best_params['min_child_weight'],
    subsample=best_params['subsample'],
    colsample_bytree=best_params['colsample_bytree'],
    eta=best_params['eta'],
)
print(params)

"""
Relearn with tuned parameters
"""
model_t = xgb.train(
    params,
    dtrain,
    num_boost_round=num_boost_round,
    evals=[(dtest, "Test")],
    early_stopping_rounds=early_stopping_rounds,
)
print('Best MAE: {:.2f} with {} rounds'.format(
    model_t.best_score,
    model_t.best_iteration+1
))
num_boost_round = model_t.best_iteration + 1

best_model = xgb.train(
    params,
    dtrain,
    num_boost_round=num_boost_round,
    evals=[(dtest, "Test")],
)
"""
Save
"""
best_model.save_model('xgb.model')

Is this okay for junior data scientists? If you show

--For the first comparison, ** the test data is used for learning, so it is not an accurate comparison ** (actual score was better for the one made for comparison) --If you want to tune high, ** should you search for all combinations **?

I just wanted a model, so forgive me and I can't beat Dedede.

After saving, right-click the file on Jupyter> Download and extract xgb.model from Jupyter.

Also, check the version of xgboost with pip freeze | grep xgb (this time it was 1.2.1).

3.5.2. Spark Image Update

Since it is necessary to install OpenMP in order to run XGBoost, add a library based on the Spark image created earlier.

Dockerfile:spark-images/SparkXgb.Dockerfile:


ARG SPARK_IMAGE

FROM ${SPARK_IMAGE}

RUN apt-get update && apt-get install -y \
  libgomp1
cd spark-images  #Place the above Dockerfile next to the first Spark Dockerfile
SPARK_IMAGE="spark:v3.0.1-java11"  #Maybe the shell variable disappears so it's reposted
docker build \
  -f ./SparkXgb.Dockerfile \
  -t spark:v3.0.1-java11-xgb \
  --build-arg SPARK_IMAGE=${SPARK_IMAGE}
  .

cd .. #Return to the work route for the time being

3.5.3. Uploading jars and models

Get the library to run XGBoost on Spark. It's probably better to match the version with XGB on the Python side (although it seems to have some compatibility)

wget https://repo1.maven.org/maven2/ml/dmlc/xgboost4j_2.12/1.2.1/xgboost4j_2.12-1.2.1.jar
wget https://repo1.maven.org/maven2/ml/dmlc/xgboost4j-spark_2.12/1.2.1/xgboost4j-spark_2.12-1.2.1.jar

When running Spark, you can transfer jars and other files to the Executor from the Driver, so use that function to distribute the library and model. (Actually, I wish I could distribute OpenMP as well, but it seemed to be difficult)

Upload the two jar files and xgb.model to the directory where you plan to create your Jupyter notebook by dragging and dropping via your browser.

image.png (It is a good notebook to be able to do such loose things)

I like CLI! For those who say, check out the kubectl cp command and try it.

3.5.4. Distributed inference in Spark

Now finally create a notebook for the main subject, spylon-kernel. (Spylon-kernel: The one that can run Scala on Jupyter. I can run the latest version of Spark for some reason that seems not to be maintained. There was a plugin called Apache Toree that can run in the same way in Docker distributed by Jupyter, but now It seems that it is currently excluded)

Make the initial settings and start the Spark cluster. (Note that this initial setting seems to be called Python, not Scala)

%%init_spark
launcher.master = 'k8s://https://kubernetes.docker.internal:6443'
launcher.deploy_mode = 'client'
launcher.conf.set('spark.app.name', 'spark-from-jupyter')
launcher.conf.set('spark.driver.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true')
launcher.conf.set('spark.driver.host', 'spark-driver-headless.default.svc.cluster.local')
launcher.conf.set('spark.driver.port', 51810)
launcher.conf.set('spark.executor.extraClassPath', '/opt/spark/work-dir/xgboost4j_2.12-1.2.1.jar:/opt/spark/work-dir/xgboost4j-spark_2.12-1.2.1.jar')
launcher.conf.set('spark.executor.extraJavaOptions', '-Dio.netty.tryReflectionSetAccessible=true')
launcher.conf.set('spark.executor.instances', 2)
launcher.conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
launcher.conf.set('spark.kubernetes.container.image', 'spark:v3.0.1-java11-xgb')
launcher.jars = ['/home/jovyan/xgboost4j_2.12-1.2.1.jar', 'xgboost4j-spark_2.12-1.2.1.jar']
launcher.files = ['file:///home/jovyan/xgb.model']

Import the required libraries.

import ml.dmlc.xgboost4j.scala.Booster
import ml.dmlc.xgboost4j.scala.XGBoost
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressionModel
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, DoubleType, StringType, TimestampType}

Cast black magic to call XGBoost. (Because it is processed by REPL like Notebook, please be assured that it will be called in a decent way when running as an application)

import scala.reflect.runtime.{universe => ru}
val m = ru.runtimeMirror(getClass.getClassLoader)
val classXGBoostRegressionModel = ru.typeOf[XGBoostRegressionModel].typeSymbol.asClass
val cm = m.reflectClass(classXGBoostRegressionModel)
val ctor = ru.typeOf[XGBoostRegressionModel].decl(ru.termNames.CONSTRUCTOR).asTerm.alternatives(0).asMethod
val ctorm = cm.reflectConstructor(ctor)

val regressionModelRef = ctorm("model", XGBoost.loadModel("xgb.model")).asInstanceOf[XGBoostRegressionModel]

Set the column name to be input and the column name to be output.

val regressionModel = regressionModelRef
    .setFeaturesCol("features")
    .setPredictionCol("prediction")

Create DataFrames as you did for PySpark earlier.

val schema = StructType(List(
    StructField("vendor_id", IntegerType, true),
    StructField("tpep_pickup_datetime", TimestampType, true),
    StructField("tpep_dropoff_datetime", TimestampType, true),
    StructField("passenger_count", IntegerType, true),
    StructField("trip_distance", DoubleType, true),
    StructField("rate_code_id", IntegerType, true),
    StructField("store_and_fwd_flag", StringType, true),
    StructField("pu_location_id", IntegerType, true),
    StructField("do_location_id", IntegerType, true),
    StructField("payment_type", IntegerType, true),
    StructField("fare_amount", DoubleType, true),
    StructField("extra", DoubleType, true),
    StructField("mta_tax", DoubleType, true),
    StructField("tip_amount", DoubleType, true),
    StructField("tolls_amount", DoubleType, true),
    StructField("improvement_surcharge", DoubleType, true),
    StructField("total_amount", DoubleType, true),
    StructField("congestion_surcharge", DoubleType, true),
))

val df = spark.read.format("csv").schema(schema)
    .option("delemiter", ",")
    .option("header", "true")
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
    .load("s3a://nyc-tlc/trip data/yellow_tripdata_2020-06.csv")

Create a column that concatenates the data used for input to XGBoost. (If you are interested, please insert .show. If you have a lot of memory, it is easier to have .cache.)

val vectorAssembler = new VectorAssembler()
    .setInputCols(Array("trip_distance", "pu_location_id", "do_location_id"))
    .setOutputCol("features")

val df_w_features = vectorAssembler.transform(df)

You can infer as it is, but narrow down the columns to make it easier to see.

val xgbInput = df_w_features.select("features", "total_amount")

Finally, perform inference.

val result = regressionModel.transform(xgbInput)

result.show

image.png

It seems to work. Everyone around us draws a CDF, so I'll give it a try. Let's roughly find the cumulative frequency by the residual.

import org.apache.spark.sql.functions.{abs, asc, row_number}
import org.apache.spark.sql.expressions.Window

val result_w_ar = result
  .withColumn("abs_residual", abs(result("prediction")-result("total_amount")))
val windowSpec  = Window.orderBy(asc("abs_residual"))
val result_cdf = result_w_ar
  .withColumn("cdf", row_number.over(windowSpec)/result_w_ar.count)

result_cdf.select("abs_residual", "cdf").createOrReplaceTempView("cdf")

It's tricky, but it's also possible in this notebook to create a TempView in Spark, create a Python cell and pass it to PySpark to make it a Pandas DF.

%%python
df = spark.sql("SELECT * FROM cdf").toPandas()
from bokeh.plotting import figure, output_file, show

output_file("cdf.html")

p = figure(
    title='abs_residal cd',
    x_axis_label='abs residal of total_amount prediction [$]',
    y_axis_label='[%]',
)
p.line(df['abs_residual'], df['cdf']*100)
show(p)

With bokeh, it seems that there is no choice but to make it an HTML file once. Sorry. image.png It seems like it's horribly removed (it feels like pretreatment).

3.5.5. Brief explanation: Spark

Well, I've been pushing forward so far, but I will briefly explain Spark.

To put it simply, one of the major features is that you can describe the process through DataFrames without being conscious of it even if the process spans multiple machines (this time, virtually multiple K8s). It feels like I'm running a platform ... Actually, there are times when I need to be aware of the underside, which can be difficult again).

If you access http: // localhost: 4040, you can browse the Spark WebUI and get some idea of ​​the details of the process. (It's difficult to understand which process corresponds to which screen until you get used to it, but please click variously.)

image.png

↑ It is a capture of Stage when inferring with XGB, but you can see that it is divided into two and processed. For example, even if the data becomes huge, if you prepare multiple machines, you can divide the data and process it at high speed.

Detailed details will be available next time, even when actually running on multiple machines. For the time being, it's enough to know that even difficult processing can be described relatively easily.

3.6. Finally

It was a rush, but I think it was packed with essence. After that, please wait patiently for the next time while checking it by google as appropriate.

Recommended Posts

As an AWS Professional, I was reincarnated about Spark being a demon world running on K8s (1) -I still do it on Mac-
It was a life I wanted to OCR on AWS Lambda to locate the characters.