Under the anxiety of "Spark? It's cool with lightning attributes!", I tried a simple machine learning with Docker + Spark + Jupyter Notebook. Using the familiar Titanic data, we made a prediction by linear regression.
Spark is one of the distributed processing libraries. I think that many people call Hadoop when it comes to distributed processing, but from my understanding, Spark is a library that makes up for the shortcomings of Hadoop. Hadoop first appeared in 2006, and then in 2014.
Hadoop VS Spark
The above Spark is said to have compensated for the shortcomings of Hadoop, but since both have advantages and disadvantages, they are briefly summarized in the table.
merit | Demerit | |
---|---|---|
Hadoop | Can handle large amounts of data | Not good at real-time processing due to storage access |
Spark | Good at real-time processing by on-memory processing | Can't handle as large data as Hadoop |
In other words, if you want to process data that is too large, use Hadoop, and if you want to process it in real time, use Spark.
Also, Hadoop's query engines are Presto and Hive, but Spark has a variety of APIs that can be easily called from languages such as Python and Scala.
Docker Setup
First, download the image below and build it.
$ docker pull jupyter/pyspark-notebook
$ docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook
You can open the note by accessing the URL displayed above.
Python
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Data read
titanic_df = spark.read.csv('./titanic/train.csv', header='True', inferSchema='True')
#Missing value correspondence
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})
#Drop unnecessary columns
titanic_df = titanic_df.drop("Cabin")
#Add column by constant
titanic_df = titanic_df.withColumn('Alone', lit(0))
#Conditional value insertion
titanic_df = titanic_df.withColumn("Alone", when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))
#Label encoding
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex", "Embarked", "Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)
#Test split
trainingData, testData = feature_vector.randomSplit([0.8, 0.2], seed=9)
There are various other processes, but I will leave only the remarkable ones. See below for more detailed data processing. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5722190290795989/3865595167034368/8175309257345795/latest.html
#Learning
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
lrModel = lr.fit(trainingData)
#inference
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
#Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
In addition, the following models can be used as a library.
It's slow locally!
Obviously, there was no benefit with this level of data because it is a distributed process only when processing large-scale data. I would like to compare the speed and accuracy when I come across data that is too large.
--Docker is convenient --Organization of distributed framework system ――PySpark worked like this ...
Recommended Posts