-Introduction to machine learning using ** Java ** and ** Apache Spark ** --Hands-on from how to use ** Apache Spark ** to actual machine learning (learning, regression) with a step-up method based on "price estimation" --Use ** Gradient Boosting Tree ** of ** Supervised Learning ** for machine learning algorithms --Post to the step-up method in multiple times --Click here for full source code https://github.com/riversun/spark-gradient-boosting-tree-regression-example
It's finally the machine learning phase. This time, we will create a "price estimation engine" that actually works.
Last time executed the process of replacing the categorical variable represented by the string with a numerical value in the pipeline.
And since the digitized category variable name was processed so that ** Index ** was added to the end (suffix), The Dataset has ** materialIndex, shapeIndex, brandIndex, shopIndex, weight, price ** as numeric variables.
In this example, we want to estimate (predict) the price of the accessory, but the data to be predicted (** price ** here) is called the ** objective variable **.
When you want to estimate (predict) the price of an accessory, what determines the price of the accessory?
If the accessory is made of diamond, it will be expensive, and the heavier the gem or precious metal, the more valuable it will be. Also, if the product is made by a famous brand, it is likely to have a premium.
As you can see, there are some reasons, that is, ** causes **, above and below the price of accessories.
The data that causes this is called the ** explanatory variable **.
Cause: Explanatory variable Result ... Objective variable
So The variables in the Dataset are divided into the explanatory variables and the objective variables as follows.
Explanatory variables ・ ・ ・ ** materialIndex, shapeIndex, brandIndex, shopIndex, weight ** Objective variable ・ ・ ・ ** price **
From now on, we will use this data for learning.
Learning is a wide variety of methods by making it possible to discover and express (although not always human-understandable) the relationships and rules between explanatory variables and objective variables using mathematical formulas and ** learning algorithms **. Has been proposed.
The relationship between the explanatory variable and the objective variable acquired by learning is called a ** learning model ** (or machine learning model, or simply a model).
Of course, in order to unravel and approximate unknown relationships, there must be a considerable amount of data, so machine learning is equal to or more than an algorithm ** the quantity and quality of data is important **. It is said.
This time, we will prepare ** gradient boosting tree ** as a learning algorithm, and ** 500 accessory price data (no missing values) ** as data.
When the objective variable (= price) that you want to predict by machine learning is a continuous value like this time, such prediction is called ** regression ** (regression).
And the learning model for regression is called ** Regressor **.
On the other hand, when the objective variable is binary or when multiple classes (categories) are required, it is called ** classification ** (clasification). The learning model for classification is called ** Classifier **.
In order to use it for learning, it processes to combine multiple variables (columns) into one.
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class GBTRegressionStep03_part01 {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "c:\\Temp\\winutil\\");// for windows
org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.ERROR);
org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.ERROR);
SparkSession spark = SparkSession
.builder()
.appName("GradientBoostingTreeGegression")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("dataset/gem_price_ja.csv");
List<String> categoricalColNames = Arrays.asList("material", "shape", "brand", "shop");
List<StringIndexer> stringIndexers = categoricalColNames.stream()
.map(col -> new StringIndexer()
.setStringOrderType("frequencyDesc")
.setInputCol(col)
.setOutputCol(col + "Index"))
.collect(Collectors.toList());
String[] indexedCategoricalColNames = stringIndexers// (1)
.stream()
.map(StringIndexer::getOutputCol)
.toArray(String[]::new);
String[] numericColNames = new String[] { "weight" };// (2)
VectorAssembler assembler = new VectorAssembler()// (3)
.setInputCols(array(indexedCategoricalColNames, numericColNames))
.setOutputCol("features");
PipelineStage[] indexerStages = stringIndexers.toArray(new PipelineStage[0]);// (5)
PipelineStage[] pipelineStages = array(indexerStages, assembler);// (6)
Pipeline pipeline = new Pipeline().setStages(pipelineStages);// (7)
pipeline.fit(dataset).transform(dataset).show(10);// (8)
}
@SuppressWarnings("unchecked")
public static <T> T[] array(final T[] array1, final T... array2) {
final Class<?> type1 = array1.getClass().getComponentType();
final T[] joinedArray = (T[]) Array.newInstance(type1, array1.length + array2.length);
System.arraycopy(array1, 0, joinedArray, 0, array1.length);
System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
return joinedArray;
}
}
** (1) ** ・ ・ ・ *** StringIndexer *** Process to make an array of column names that will be the output (outputCol). It will be an array of column names of categorical variables quantified as shown below.
[materialIndex,shapeIndex,brandIndex,shopIndex]
** (2) ** ・ ・ ・ An array of column names that take a numeric variable. This time only ** weight **
** (3) ** ・ ・ ・ Vectorize multiple numeric variables.
*** VectorAssembler # setInputCols *** is the column name you want to vectorize. Here, all column names (** materialIndex, shapeIndex, brandIndex, shopIndex, weight **) that are candidates for explanatory variables are specified [^ 1].
[^ 1]: Depending on the machine learning algorithm, it may not be possible to learn well without understanding the explanatory variables well and selecting the explanatory variables (Feature Selection). For example, high correlation between explanatory variables can lead to the problem of multicollinearity (multicollinearity). This time, the algorithm is a decision tree system (non-linear model) and it is an introductory part, so all the explanatory variable candidates are included. *** array function *** is a function for joining arrays. The vectorized result is added to the Dataset as the column name specified by *** setOutputCol ***.
VectorAssembler assembler = new VectorAssembler()// (3)
.setInputCols(array(indexedCategoricalColNames, numericColNames))
.setOutputCol("features");
** (5)-(7) ** ・ ・ ・ Set ** StringIndexer ** and ** VectorAssembler ** in ** Pipeline **.
PipelineStage[] indexerStages = stringIndexers.toArray(new PipelineStage[0]);// (5)
PipelineStage[] pipelineStages = array(indexerStages, assembler);// (6)
Pipeline pipeline = new Pipeline().setStages(pipelineStages);// (7)
** (8) ** ・ ・ ・ Execute this pipeline processing
The execution result is as follows.
You can see that a vector column called ** features ** has been added.
That is, *** VectorAssembler *** makes multiple variables a vector with a single "** features **" name with ** weight, materialIndex, shapeIndex, brandIndex, shopIndex **.
This ** features **-like data is called a ** feature vector **.
Well, I'm finally ready Finally, ** build a learning model ** and actually ** predict the price of accessories ** (regression).
Immediately, the learning and prediction code looks like this:
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class GBTRegressionStep03_part02 {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("GradientBoostingTreeGegression")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("dataset/gem_price_ja.csv");
List<String> categoricalColNames = Arrays.asList("material", "shape", "brand", "shop");
List<StringIndexer> stringIndexers = categoricalColNames.stream()
.map(col -> new StringIndexer()
.setStringOrderType("frequencyDesc")
.setInputCol(col)
.setOutputCol(col + "Index"))
.collect(Collectors.toList());
String[] indexedCategoricalColNames = stringIndexers
.stream()
.map(StringIndexer::getOutputCol)
.toArray(String[]::new);
String[] numericColNames = new String[] { "weight" };
VectorAssembler assembler = new VectorAssembler()
.setInputCols(array(indexedCategoricalColNames, numericColNames))
.setOutputCol("features");
GBTRegressor gbtr = new GBTRegressor()// (1)
.setLabelCol("price")
.setFeaturesCol("features")
.setPredictionCol("prediction");
PipelineStage[] indexerStages = stringIndexers.toArray(new PipelineStage[0]);
PipelineStage[] pipelineStages = array(indexerStages, assembler, gbtr);// (2)
Pipeline pipeline = new Pipeline().setStages(pipelineStages);
long seed = 0;
Dataset<Row>[] splits = dataset.randomSplit(new double[] { 0.7, 0.3 }, seed);// (3)
Dataset<Row> trainingData = splits[0];// (4)
Dataset<Row> testData = splits[1];// (5)
PipelineModel pipelineModel = pipeline.fit(trainingData);// (6)
Dataset<Row> predictions = pipelineModel.transform(testData);// (7)
predictions.select("id", "material", "shape", "weight", "brand", "shop", "price", "prediction").show(10);// (8)
}
@SuppressWarnings("unchecked")
public static <T> T[] array(final T[] array1, final T... array2) {
final Class<?> type1 = array1.getClass().getComponentType();
final T[] joinedArray = (T[]) Array.newInstance(type1, array1.length + array2.length);
System.arraycopy(array1, 0, joinedArray, 0, array1.length);
System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
return joinedArray;
}
}
** (1) ** ・ ・ ・ Create a ** gradient boosting tree ** estimator for supervised learning. Specify the column name (** price **) you want to predict in *** setLabelCol ***, For *** setFeaturesCol *, specify the column name ( features ) of the feature vector, *** setPredictionCol *** specifies a new column name ( prediction **) to store the prediction results.
GBTRegressor gbtr = new GBTRegressor()// (1)
.setLabelCol("price")
.setFeaturesCol("features")
.setPredictionCol("prediction");
** (2) ** ・ ・ ・ Added a learning device (** gbt **) to Pipeline. Now, in addition to the categorical variable indexing and vectorization processing, the learner is also added during learning.
PipelineStage[] pipelineStages = array(indexerStages, assembler, gbtr);// (2)
Pipeline pipeline = new Pipeline().setStages(pipelineStages);
** (3) ** ・ ・ ・ *** dataset.randomSplit *** divides the original Dataset randomly at a ratio of 70%: 30%
** (4) ** ・ ・ ・ Of the data set divided, 70% is used as training data.
** (5) ** ・ ・ ・ Of the data set divided, 30% is used as test data.
** (6) ** ・ ・ ・ *** Pipeline.fit *** to put the training data into the trainer and train it. After learning, *** pipelineModel *** can be obtained as a trained ** learning model ** (machine learning model). You will be able to perform regression (prediction of ** price **) using this ** learning model **.
PipelineModel pipelineModel = pipeline.fit(trainingData);// (6)
** (7) ** ・ ・ ・ *** Execute ** prediction ** (regression) using the trained model with pipelineModel.transform (testData) ***! Predict the price of the accessory in the test data specified here. A Dataset containing the prediction results (specifically, ** prediction ** column) is returned.
Dataset<Row> predictions = pipelineModel.transform(testData);// (7)
** (8) ** ・ ・ ・ Display the execution result. In *** show ***, the column name you want to display is specified and displayed.
Yes, the predicted execution result is as follows
The ** prediction ** on the far right of the table and the ** price ** on the second from the right are the prediction results from machine learning and the answers contained in the original data, respectively.
The difference between ** prediction ** and ** price ** is the difference between the prediction and the answer, but I think you can predict a value close to that.
I haven't tuned it in particular, but I made a simple "accessory price estimation engine" ^ _ ^
** Continue to next time **
Next time, I would like to deal with ** evaluation indicators of learning results, hyperparameter tuning and grid search **.
Recommended Posts