Machine learning in Spark is MLlib, but it seems that it is still inferior to scikit-learn in terms of functionality. For example, scikit-learn can be used to correct when the number of positive and negative examples is uneven during learning, but mllib 1.5 does not yet have such a function [^ class label correction]. I think that the advantages of both sklearn and spark can be utilized if a learner is created in advance with scikit-learn with data that can be stored in memory at such times and it can be used for prediction of large-scale data with pyspark. I am.
Let's Try
You can convert the data to RDD of ndarray and pass it to predict of the model trained by RDD map, but if you do it as it is, the overhead of function call seems to be large, so I would like to process it in batch units of a certain size. I will.
Prepare a Python environment that can use scikit-learn
such as Anaconda in the same path (/ opt / anaconda
etc.) for all spark nodes. If you specify PYSPARK_PYTHON = / opt / anaconda / bin / python3
when executing the spark-submit command, this Python will be used.
Create a learning model in advance. This time I used Random Forest. The data is appropriate.
import numpy as np
from sklearn import ensemble
N = 1000
train_x = np.random.randn(N, 10)
train_y = np.random.binomial(1, 0.1, N)
model = ensemble.RandomForestClassifier(10, class_weight="balanced").fit(train_x, train_y)
So, use this in PySpark as follows:
from pyspark import SparkContext
sc = SparkContext()
test_x = np.random.randn(N * 100, 10)
n_partitions = 10
rdd = sc.parallelize(test_x, n_partitions).zipWithIndex()
# Point 1
def batch(xs):
yield list(xs)
batch_rdd = rdd.mapPartitions(batch)
# Point 2
b_model = sc.broadcast(model)
def split_id_and_data(xs):
xs = list(xs)
data = [x[0] for x in xs]
ids = [x[1] for x in xs]
return data, ids
# Point 3
result_rdd = batch_rdd.map(split_id_and_data) \
.flatMap(lambda x: zip(x[1], b_model.value.predict(x[0])))
for _id, pred in result_rdd.take(10):
print(_id, pred)
sc.stop()
The points are the following three points
RDD [ndarray]
to RDD [list [ndarray]]
. By doing this, you can pass some chunks of data together in model.predict
.b_model.value.predict
. Zip this and ids again and put them in a flatMap and you're done(2016-01-26 postscript) Put together in a list without partition
# Point 1
def batch(xs):
yield list(xs)
batch_rdd = rdd.mapPartitions(batch)
Originally, a method called glom
was prepared for the part.
batch_rdd = rdd.glom()
(2016-01-26 Addendum 2)
DStream also has glom
and flatMap
methods, so you can use them in exactly the same way for Spark Streaming. It seems that it can be said that an anomaly detection learner is created with SVM and applied to streaming data in real time.
[^ Class label correction]: JIRA may have been requested and implemented soon, but CDH5.5 cannot be used because spark is 1.5.
Recommended Posts