Sous l'ennui de "Spark? C'est cool avec les attributs de tonnerre!", J'ai essayé un simple apprentissage automatique avec Docker + Spark + Jupyter Notebook. En utilisant les données connues du Titanic, nous avons fait une prédiction par régression linéaire.
Spark est l'une des bibliothèques de traitement distribuées. Beaucoup de gens considèrent le traitement distribué comme Hadoop, mais de mon point de vue, Spark est une bibliothèque qui compense les lacunes de Hadoop. Hadoop est apparu pour la première fois en 2006, puis en 2014.
Hadoop VS Spark
On dit que Spark ci-dessus a compensé les lacunes de Hadoop, mais comme les deux présentent des avantages et des inconvénients, ils sont brièvement résumés dans le tableau.
mérite | Démérite | |
---|---|---|
Hadoop | Peut gérer de grandes quantités de données | Pas bon pour le traitement en temps réel en raison de l'accès au stockage |
Spark | Bon pour le traitement en temps réel par traitement en mémoire | Impossible de gérer des données aussi volumineuses que Hadoop |
En d'autres termes, utilisez Hadoop pour les données trop volumineuses et utilisez Spark si vous souhaitez traiter en temps réel.
Les moteurs de requête de Hadoop sont Presto et Hive, mais Spark dispose d'une variété d'API qui peuvent être facilement appelées à partir de langages tels que Python et Scala.
Docker Setup
Tout d'abord, téléchargez l'image ci-dessous et construisez-la.
$ docker pull jupyter/pyspark-notebook
$ docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook
Vous pouvez ouvrir la note en accédant à l'URL affichée ci-dessus.
Python
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Lecture des données
titanic_df = spark.read.csv('./titanic/train.csv', header='True', inferSchema='True')
#Correspondance de valeur manquante
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})
#Supprimer les colonnes inutiles
titanic_df = titanic_df.drop("Cabin")
#Ajout de colonne par constante
titanic_df = titanic_df.withColumn('Alone', lit(0))
#Insertion de valeur conditionnelle
titanic_df = titanic_df.withColumn("Alone", when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))
#Encodage des étiquettes
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex", "Embarked", "Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)
#Test fractionné
trainingData, testData = feature_vector.randomSplit([0.8, 0.2], seed=9)
Il existe divers autres processus, mais je ne laisserai que les plus remarquables. Voir ci-dessous pour un traitement des données plus détaillé. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5722190290795989/3865595167034368/8175309257345795/latest.html
#Apprentissage
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
lrModel = lr.fit(trainingData)
#inférence
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
#Évaluation
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
De plus, les modèles suivants peuvent être utilisés comme bibliothèque.
C'est lent localement!
Évidemment, il n'y avait aucun avantage à ce niveau de données car il s'agit d'un processus distribué uniquement lors du traitement de données à grande échelle. Je voudrais comparer la vitesse et la précision lorsque je rencontre des données trop volumineuses.
--Docker est pratique --Organisation du système de cadre distribué ――PySpark fonctionnait comme ça ...
Recommended Posts