Informatique en cluster ultra-rapide. Une bibliothèque qui distribue le traitement par lots à grande échelle. Il fait un bon travail de traitement distribué. Vous pouvez utiliser SQL. Les données en streaming peuvent être utilisées. Peut être utilisé pour l'apprentissage automatique. La théorie des graphes peut être utilisée. L'apprentissage profond peut être placé. Ceux-ci utilisent pleinement la mémoire et distribuent le cluster à grande vitesse.
Ubuntu
sudo apt-get install -y openjdk-8-jdk
Mac
brew cask install java
Ubuntu
sudo apt install maven
mac
brew install maven
Soit / usr / local / spark SPARK_HOME. Sélectionnez n'importe quelle version. http://ftp.riken.jp/net/apache/spark/
Ubuntu
wget http://ftp.riken.jp/net/apache/spark/spark-1.6.2/spark-1.6.2-bin-hadoop2.6.tgz
$ tar zxvf spark-1.6.2-bin-hadoop2.6.tgz
$ sudo mv spark-1.6.2-bin-hadoop2.6 /usr/local/
$ sudo ln -s /usr/local/spark-1.6.2-bin-hadoop2.6 /usr/local/spark
Ajoutez ce qui suit à .bashrc
Ubuntu
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
Mac
brew install apache-spark
python
$ spark-shell --master local[*]
(Omission)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
(Omission)
scala> val textFile = sc.textFile("/usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = /usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:27
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29
scala> wordCounts.collect()
res0: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), ...(Omission)..., (>>>,1), (programming,1), (T...
scala>
Vérifiez si cela fonctionne sur la console. Lors de l'utilisation avec python
python
./bin/pyspark
Ajoutez ce qui suit à .bashrc
python
#spark
export SPARK_HOME=/usr/local/spark/spark-1.6.2-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin
#jupyter spark
export PYSPARK_PYTHON=$PYENV_ROOT/shims/python #Faites correspondre le chemin en fonction de l'environnement
export PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
python
source .bashrc
pyspark
L'exécution de la commande pyspark lance jupyter. Si vous obtenez une erreur qui ne récupère pas le RDD de spark, vous pouvez la corriger en redémarrant le noyau.
L'exécution parallèle devient possible.
Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Python
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
Scala
val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt
Java
JavaRDD<String> distFile = sc.textFile("data.txt");
Python
distFile = sc.textFile("data.txt")
Récupérez les données avec textFile et mettez-les sur rdd Convertir avec la carte Agréger avec réduire
Scala
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
Python
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
Scala
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
Python
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
Scala
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
Python
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
Scala
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Python
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
conversion | sens |
---|---|
map(func) | Convertissez en un nouvel ensemble de données distribué formé en passant chaque élément de la source avec la fonction func. |
filter(func) | Sélectionne l'élément source pour lequel func renvoie true et renvoie un nouvel ensemble de données. |
flatMap(func) | Similaire à map, mais chaque élément d'entrée peut être mappé à 0 ou plusieurs éléments de sortie (func doit renvoyer Seq au lieu d'un seul élément). |
mapPartitions(func) | Similaire à une carte, mais exécuté individuellement sur chaque partition (bloc) du RDD, donc lors de l'exécution sur un RDD de type T, func est un itérateur |
mapPartitionsWithIndex(func) | Similaire à mapPartitions, mais func reçoit une valeur entière qui représente l'index de la partition. Par conséquent, lors de l'exécution avec le type T RDD, func est (Int, Iterator) |
sample(withReplacement, fraction, seed) | Utilise le générateur de nombres aléatoires spécifié pour échantillonner une partie fractionnaire des données avec ou sans substitution. |
union(otherDataset) | Renvoie un nouvel ensemble de données contenant la somme des éléments et des arguments de l'ensemble de données source. |
intersection(otherDataset) | Renvoie un nouveau RDD contenant les parties communes des éléments et arguments de l'ensemble de données source. |
distinct([numTasks])) | Renvoie un nouvel ensemble de données contenant différents éléments de l'ensemble de données source. |
groupByKey([numTasks]) | Lorsqu'il est appelé avec l'ensemble (K, V) d'ensembles de données, (K, Iterable) |
reduceByKey(func, [numTasks]) | Lorsqu'elle est appelée avec une paire (K, V) d'ensembles de données, la valeur de chaque clé est (V, V)=>Agrégé à l'aide de la fonction de réduction typée func (K, V) V.Comme avec groupByKey, le nombre de tâches de réduction peut être défini via le deuxième argument facultatif. |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | Lorsqu'il est appelé avec une paire (K, V) d'ensembles de données, il renvoie une paire (K, U) d'ensembles de données. Ici, la valeur de chaque clé est agrégée à l'aide de la fonction de jointure spécifiée et d'une valeur neutre "zéro". Autorisez les types de valeur agrégée qui sont différents des types de valeur d'entrée, tout en évitant les allocations inutiles. Comme avec groupByKey, le nombre de tâches de réduction peut être défini avec le deuxième argument facultatif. |
sortByKey([ascending], [numTasks]) | Lorsque K est appelé avec un ensemble de données de paires (K, V) qui implémente Ordered, un ensemble de données de paires (K, V) triées par clé ascendante ou décroissante, comme spécifié par l'argument ascendant booléen. Retour. |
join(otherDataset, [numTasks]) | Lorsqu'il est appelé avec des ensembles de données de type (K, V) et (K, W), il renvoie un ensemble de données de paires (K, (V, W)) contenant toutes les paires d'éléments pour chaque clé. Les jointures externes sont prises en charge par leftOuterJoin, rightOuterJoin et fullOuterJoin. |
cogroup(otherDataset, [numTasks]) | (K、(Iterable |
cartesian(otherDataset) | Lorsqu'il est appelé sur un ensemble de données de type T et de type U, il renvoie un ensemble de données de paires (T, U) (toutes les paires d'éléments). |
pipe(command, [envVars]) | Commandes shell pour chaque partition du RDD (par exemple, script Perl ou bash. Les éléments RDD sont écrits dans le stdin du processus et les lignes imprimées sur stdout sont renvoyées sous forme de RDD. |
coalesce(numPartitions) | Réduisez le nombre de partitions dans le RDD à numPartitions. Ceci est utile pour effectuer des opérations plus efficacement après le filtrage d'un grand ensemble de données. |
repartition(numPartitions) | Remaniez aléatoirement les données dans le RDD pour créer plus ou moins de partitions et équilibrer ces partitions. Cela garantit que toutes les données du réseau sont toujours mélangées. |
repartitionAndSortWithinPartitions(partitioner) | Repartitionne le RDD en fonction du partitionneur spécifié et trie les enregistrements par clé dans chaque partition résultante. C'est plus efficace que d'appeler la subdivision et il est plus efficace de trier dans chaque partition car vous pouvez pousser le tri vers le mécanisme de lecture aléatoire. |
action | sens |
---|---|
reduce(func) | Utilisez la fonction func (qui prend deux arguments et en renvoie un) pour agréger les éléments de l'ensemble de données. Les fonctions doivent être convertibles et associatives pour pouvoir être calculées correctement en parallèle. |
collect() | Le programme pilote renvoie tous les éléments de l'ensemble de données sous forme de tableau. Cela est généralement utile après les filtres et autres opérations qui renvoient un sous-ensemble suffisamment petit de données. |
count() | Renvoie le nombre d'éléments dans l'ensemble de données. |
first() | Renvoie le premier élément de l'ensemble de données (similaire à take (1)). |
take(n) | Renvoie un tableau contenant les n premiers éléments de l'ensemble de données. |
takeSample(withReplacement, num, [seed]) | Renvoie un tableau contenant des échantillons aléatoires des nombres éléments de l'ensemble de données, pré-spécifiés avec des graines de générateur aléatoires, avec ou sans substitutions. |
takeOrdered(n, [ordering]) | Renvoie les n premiers éléments du RDD, en utilisant l'ordre naturel ou un comparateur personnalisé. |
saveAsTextFile(path) | Décrivez les éléments du fichier de données sous forme de fichier texte (ou ensemble de fichiers texte) dans un répertoire spécifique sur votre système de fichiers local, HDFS ou tout autre système de fichiers pris en charge par Hadoop. Spark appelle le toString de chaque élément pour le convertir en une ligne de texte dans le fichier. |
saveAsSequenceFile(path) | |
(Java and Scala) | Écrit les éléments du fichier de données sous forme de fichier de séquence Hadoop dans le chemin spécifié du système de fichiers local, HDFS ou de tout autre système de fichiers pris en charge par Hadoop. Il est disponible dans le RDD des paires clé / valeur qui implémentent l'interface Writable de Hadoop. Dans Scala, vous pouvez également utiliser des types qui peuvent être implicitement convertis en Writable (Spark inclut la conversion de types de base tels que Int, Double, String). |
saveAsObjectFile(path) | |
(Java and Scala) | Utilisez la sérialisation Java pour décrire les éléments d'un ensemble de données dans un format simple.La sérialisation Java est SparkContext.Il peut être chargé en utilisant objectFile (). |
countByKey() | Uniquement disponible pour les RDD de type (K, V). Renvoie une table de hachage de paires (K, Int), en comptant chaque clé. |
foreach(func) | Exécutez la fonction func pour chaque élément de l'ensemble de données. Ceci est généralement effectué pour les effets secondaires tels que les mises à jour des accumulateurs et les interactions avec les systèmes de stockage externes. Remarque: la modification de variables autres que les accumulateurs en dehors de foreach () peut entraîner un comportement indéfini. Pour plus d'informations, voir Comprendre les fermetures. |
J'ai bifurqué parce qu'il était très facile de comprendre ce qui était utilisé dans les compétitions à l'étranger. Puisqu'il s'agit d'un Jupyter, exécutez-le uniquement dans l'ordre du haut.
Cliquez ici pour la source https://github.com/miyamotok0105/spark-py-notebooks
A propos de la lecture et de la parallélisation des fichiers
À propos de la carte, du filtrage, de la collecte
Explique la méthode d'échantillonnage RDD.
Une brève introduction à certaines opérations de pseudo-ensembles RDD.
À propos des actions RDD réduire, plier, agréger.
Comment gérer les paires clé / valeur pour agréger et explorer les données.
Un cahier qui présente les statistiques de base de MLlib pour les types de vecteurs locaux, l'analyse exploratoire des données et la sélection de modèles.
Classification des points étiquetés et régressions logistiques pour les attaques réseau dans MLlib. Application de la méthode de sélection du modèle utilisant la matrice de corrélation et le test d'hypothèse.
Une méthode qui aide à expliquer l'utilisation de méthodes basées sur des arbres et la sélection de modèles et de fonctionnalités.
Ce bloc-notes déduit le schéma d'un ensemble de données d'interactions réseau. Sur cette base, nous utilisons l'abstraction SQL DataFrame de Spark pour effectuer une analyse de données exploratoire plus structurée.
Processus de regroupement des données d'iris.
data_file = "./kddcup.data_10_percent.gz"
#Création générale
raw_data = sc.textFile(data_file)
#Créer un parallèle
raw_data = sc.parallelize(data_file)
#Conversion de filtre
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)
#Conversion de carte
csv_data = raw_data.map(lambda x: x.split(","))
Renvoie un tableau contenant des échantillons aléatoires des nombres éléments de l'ensemble de données, pré-spécifiés avec une graine de générateur aléatoire.
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_raw_data = raw_data.filter(lambda x: "normal." in x)
#Soustraire
attack_raw_data = raw_data.subtract(normal_raw_data)
#Produit cartésien (ensemble de produits direct)
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))
Il s'agit d'une recommandation de produit aux clients en utilisant une matrice d'utilisateurs et d'articles. À partir de cette matrice, on peut dire qu'il s'agit d'un mécanisme pour analyser la corrélation des utilisateurs et faire des recommandations basées sur l'hypothèse que des utilisateurs similaires achèteront les produits qu'ils achètent. Référence
Filtrage coopératif --Recommandation basée sur le comportement de l'utilisateur
Filtrage basé sur le contenu (basé sur le contenu) --Similarité triée par vecteur de caractéristique d'élément et recommandée
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
model = ALS.train(ratings, rank, numIterations)
predictions_all = model.predictAll(sc.parallelize(f_XY)).map(lambda r: ((r[0], r[1]), limitter(r[2]) ))
https://www.oreilly.co.jp/books/9784873117508/
Télécharger la source. Complètement Scala. Et ce livre a une couleur Scala assez forte. J'écris en partant du principe que je connais Spark.
https://github.com/sryza/aas.git
git checkout 1st-edition
Obtenez des données
wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz
tar xzvf profiledata_06-May-2005.tar.gz
résultat
profiledata_06-May-2005/
profiledata_06-May-2005/artist_data.txt
profiledata_06-May-2005/README.txt
profiledata_06-May-2005/user_artist_data.txt
profiledata_06-May-2005/artist_alias.txt
La source https://github.com/sryza/aas/blob/1st-edition/ch03-recommender/src/main/scala/com/cloudera/datascience/recommender/RunRecommender.scala
BigDL(torch base) https://github.com/intel-analytics/BigDL TensorFlow https://github.com/yahoo/TensorFlowOnSpark keras https://github.com/maxpumperla/elephas
http://qiita.com/joemphilips/items/de5d12723b9b88b5b090
Cela fonctionne certainement. J'étais en difficulté parce que j'ai eu une erreur avec l'autorisation, mais j'ai l'impression d'avoir une erreur parce que je n'avais pas assez de dossiers et de fichiers au départ. Je me souviens avoir ajouté quelque chose en regardant le journal des erreurs de Spark ou quelque chose.
http://spark.apache.org/docs/latest/programming-guide.html Mastering Apache Spark 2 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-transformers.html
http://yuroyoro.hatenablog.com/entry/20100317/1268819400
La liste est importante dans Scala Utilisez la classe Case Programme immuable
http://en.wikipedia.org/wiki/Collaborative_filtering http://d.hatena.ne.jp/EulerDijkstra/20130407/1365349866 https://www.slideshare.net/hoxo_m/ss-53305070
https://www.youtube.com/watch?v=qIs4nNFgi0s
Recommended Posts