Lightning fast cluster computing. A library that distributes batch processing on a large scale. It does a good job of distributed processing. You can use SQL. Streaming data can be used. Can be used for machine learning. Graph theory can be used. Can be loaded with deep learning. These make full use of memory and distribute the cluster at high speed.
Ubuntu
sudo apt-get install -y openjdk-8-jdk
Mac
brew cask install java
Ubuntu
sudo apt install maven
mac
brew install maven
Let / usr / local / spark be SPARK_HOME. Select any version. http://ftp.riken.jp/net/apache/spark/
Ubuntu
wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz
$ tar zxvf spark-1.6.2-bin-hadoop2.6.tgz
$ sudo mv spark-1.6.2-bin-hadoop2.6 /usr/local/
$ sudo ln -s /usr/local/spark-1.6.2-bin-hadoop2.6 /usr/local/spark
Add the following to .bashrc
Ubuntu
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
Mac
brew install apache-spark
python
$ spark-shell --master local[*]
(Omission)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
(Omission)
scala> val textFile = sc.textFile("/usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = /usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29
scala> wordCounts.collect()
res0: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), ...(Omission)..., (>>>,1), (programming,1), (T...
scala>
Check if it works on the console. When using with python
python
./bin/pyspark
Add the following to .bashrc
python
#spark
export SPARK_HOME=/usr/local/spark/spark-1.6.2-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin
#jupyter spark
export PYSPARK_PYTHON=$PYENV_ROOT/shims/python #Match the path according to the environment
export PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
python
source .bashrc
pyspark
Executing the pyspark command launches jupyter. If you get an error that doesn't grab spark's RDD, restarting the kernel fixed it.
Parallel execution becomes possible.
Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Python
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
Scala
val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt
Java
JavaRDD<String> distFile = sc.textFile("data.txt");
Python
distFile = sc.textFile("data.txt")
Get the data with textFile and put it on rdd Convert with map Aggregate with reduce
Scala
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
Python
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
Scala
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
Python
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
Scala
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
Python
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
Scala
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Python
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
conversion | meaning |
---|---|
map(func) | Convert to a new distributed dataset formed by passing each element of the source with the func function. |
filter(func) | Select the source element for which func returns true and return a new dataset. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (func must return Seq instead of a single item). |
mapPartitions(func) | Similar to a map, but it runs individually on each partition (block) of the RDD, so when running on a type T RDD, func is an Iterator. |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but func is given an integer value that represents the index of the partition. Therefore, when running with type T RDD, func is (Int, Iterator) |
sample(withReplacement, fraction, seed) | Samples a fractional part of the data with or without substitution using the specified random number generator seed. |
union(otherDataset) | Returns a new dataset containing the union of the elements and arguments in the source dataset. |
intersection(otherDataset) | Returns a new RDD that contains the intersection of the elements and arguments in the source dataset. |
distinct([numTasks])) | Returns a new dataset that contains different elements of the source dataset. |
groupByKey([numTasks]) | When called on a (K, V) set of datasets, (K, Iterable) |
reduceByKey(func, [numTasks]) | When called on a (K, V) set of datasets, the value of each key is (V, V)=>Aggregated using the typed reduce function func (K, V) V.Like groupByKey, the number of reduce tasks can be set via the optional second argument. |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called with a dataset of (K, V) pairs, it returns a dataset of (K, U) pairs. Here, the value of each key is aggregated using the specified join function and a neutral "zero" value. Allow aggregate value types that are different from input value types, while avoiding unnecessary allocations. As with groupByKey, the number of reduce tasks can be set with the optional second argument. |
sortByKey([ascending], [numTasks]) | When K is called on a dataset of (K, V) pairs that implements Ordered, the dataset of (K, V) pairs sorted by ascending or descending key, as specified by the Boolean ascending argument. Returns. |
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), it returns a dataset of pairs (K, (V, W)) that contains all the element pairs for each key. Outer joins are supported by leftOuterJoin, rightOuterJoin, and fullOuterJoin. |
cogroup(otherDataset, [numTasks]) | (K、(Iterable |
cartesian(otherDataset) | When called against a dataset of type T and type U, it returns a dataset of (T, U) pairs (all pairs of elements). |
pipe(command, [envVars]) | Shell commands for each partition of the RDD (eg Perl or bash script. RDD elements are written to the process's stdin and the lines printed to stdout are returned as RDD. |
coalesce(numPartitions) | Reduce the number of partitions in the RDD to numPartitions. This is useful for performing operations more efficiently after filtering large datasets. |
repartition(numPartitions) | Randomly reshuffle the data in the RDD to create more or fewer partitions and balance between those partitions. This will always shuffle all the data on your network. |
repartitionAndSortWithinPartitions(partitioner) | Repartitions the RDD according to the specified partitioner and sorts the records by key within each resulting partition. This is more efficient than calling subdivision and it is more efficient to sort within each partition as you can push the sort to the shuffle mechanism. |
action | meaning |
---|---|
reduce(func) | Use the func function (which takes two arguments and returns one) to aggregate the elements of the dataset. Functions must be commutative and associative so that they can be calculated correctly in parallel. |
collect() | The driver program returns all the elements of the dataset as an array. This is usually useful after filters and other operations that return a small enough subset of the data. |
count() | Returns the number of elements in the dataset. |
first() | Returns the first element of the dataset (similar to take (1)). |
take(n) | Returns an array containing the first n elements of the dataset. |
takeSample(withReplacement, num, [seed]) | Returns an array containing a random sample of the num elements of the dataset, pre-specified with a random number generator seed, with or without substitutions. |
takeOrdered(n, [ordering]) | Returns the first n elements of the RDD, using either natural order or a custom comparator. |
saveAsTextFile(path) | Describe the elements of the data file as a text file (or set of text files) in a specific directory on your local file system, HDFS, or any other Hadoop-supported file system. Spark calls each element's toString to convert it to a line of text in the file. |
saveAsSequenceFile(path) | |
(Java and Scala) | Writes the elements of the data file as a Hadoop Sequence File to the specified path of the local file system, HDFS, or other file system supported by Hadoop. It is available in RDDs for key / value pairs that implement Hadoop's Writable interface. In Scala, you can also use types that can be implicitly converted to Writable (Spark includes conversions to basic types such as Int, Double, and String). |
saveAsObjectFile(path) | |
(Java and Scala) | Write the elements of your dataset in a simple format using Java serialization.Java serialization is SparkContext.It can be loaded using objectFile (). |
countByKey() | Only available for type (K, V) RDDs. Returns a hashmap of (K, Int) pairs, counting each key. |
foreach(func) | Run the func function for each element of the dataset. This is typically done for side effects such as accumulator updates and interactions with external storage systems. Note: Changing variables other than Accumulators outside of foreach () can lead to undefined behavior. For more information, see Understanding Closures. |
I forked because it was very easy to understand what was used in overseas competitions. Since it is Jupyter, only execute it in order from the top.
Click here for source https://github.com/miyamotok0105/spark-py-notebooks
About reading and parallelizing files
About map, filter, collect
Explains the RDD sampling method.
A brief introduction to some RDD pseudo-set operations.
About RDD actions reduce, fold, aggregate.
How to handle key / value pairs for aggregating and exploring data.
A notebook that presents MLlib's basic statistics for local vector types, Exploratory Data Analysis and model selection.
Classification of labeled points and logistic regression for network attacks in MLlib. Application of model selection method using correlation matrix and hypothesis test.
A method that helps explain the use of tree-based methods and the selection of models and features.
This notebook infers the schema for a dataset of network interactions. Based on that, we use Spark's SQL DataFrame abstraction to perform more structured exploratory data analysis.
Iris data clustering process.
data_file = "./kddcup.data_10_percent.gz"
#General creation
raw_data = sc.textFile(data_file)
#Create parallel
raw_data = sc.parallelize(data_file)
#Filter conversion
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)
#Map conversion
csv_data = raw_data.map(lambda x: x.split(","))
Returns an array containing random samples of the num elements of the dataset, pre-specified with a random number generator seed.
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_raw_data = raw_data.filter(lambda x: "normal." in x)
#Subtraction
attack_raw_data = raw_data.subtract(normal_raw_data)
#Cartesian product (cartesian product)
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))
Product recommendations to customers using a matrix of users and items. From this matrix, it can be said that it is a mechanism to analyze the correlation of users and make recommendations based on the assumption that similar users will buy the products they are buying. Reference
Collaborative filtering --Recommendation based on user behavior
Content-based (content-based) filtering --Similarity sorted by item feature vector and recommended
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model = ALS.train(ratings, rank, numIterations)
predictions_all = model.predictAll(sc.parallelize(f_XY)).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))
https://www.oreilly.co.jp/books/9784873117508/
Download source. Completely Scala. And this book has a pretty strong Scala color. I'm writing on the premise that I know Spark.
https://github.com/sryza/aas.git
git checkout 1st-edition
Get data
wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
tar xzvf profiledata_06-May-2005.tar.gz
result
profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt
Source https://github.com/sryza/aas/blob/1st-edition/ch03-recommender/src/main/scala/com/cloudera/datascience/recommender/RunRecommender.scala
BigDL(torch base) https://github.com/intel-analytics/BigDL TensorFlow https://github.com/yahoo/TensorFlowOnSpark keras https://github.com/maxpumperla/elephas
http://qiita.com/joemphilips/items/de5d12723b9b88b5b090
Certainly this works. I was in trouble because I got an error with permmision, but I feel that I got an error because I didn't have enough folders and files in the first place. I remember adding something by looking at the error log of Spark or something.
http://spark.apache.org/docs/latest/programming-guide.html Mastering Apache Spark 2 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-transformers.html
http://yuroyoro.hatenablog.com/entry/20100317/1268819400
Lists are important in Scala Use Case class Immutable program
http://en.wikipedia.org/wiki/Collaborative_filtering http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866 https://www.slideshare.net/hoxo_m/ss-53305070
https://www.youtube.com/watch?v=qIs4nNFgi0s
Recommended Posts