Je veux prendre note de l'API fréquemment utilisée de Spark (principalement pour moi-même) afin qu'elle puisse être utilisée rapidement même lors du développement pour la première fois depuis longtemps. Je vais résumer la version Python pour le moment (je peux ajouter la version Scala si j'ai le temps)
** Cette feuille de triche est juste une feuille de triche ** (les arguments peuvent être omis), donc si vous avez le temps, veuillez vous assurer [Official API Document (Spark Python API Docs)](http: //spark.apache. Voir org / docs / latest / api / python / index.html).
Ce qui suit suppose ce qui suit
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
RDD
parallelize
sc.parallelize(collection)Créer un RDD à partir d'une liste ou d'une touche
```py
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)
textFile
sc.textFile(file)Lisez le fichier. Vous pouvez également utiliser des caractères génériques et des expressions régulières.
```py
>>> rdd = sc.textFile("./words.txt")
wholeTextFiles
sc.wholeTextFiles(dierctory)Entrez le contenu entier de chaque fichier dans le répertoire dans un élément du RDD
```py
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")
Action
La transformation est exécutée dans l'ordre pour la première fois lorsque l'action est exécutée (exécution retardée)
collect
collect()
Renvoie tous les éléments
>>> print(rdd.collect())
[1, 2, 3, 4, 5]
take
take(n)
Retourne d'abord n éléments
>>> print(rdd.take(3))
[1, 2, 3]
first
first()
Renvoie le tout premier élément
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1
top
top(n)
Renvoie n éléments du plus grand
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]
count
count()
Compter et renvoyer le nombre d'éléments
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3
mean
mean()
Renvoie la moyenne
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0
sum
sum()
Renvoie le total
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6
variance
variance()
Renvoie la distribution
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666
stdev
stdev()
Renvoie l'écart type
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726
saveAsTextFile
saveastextfile(file)
Enregistrez le fichier
>>> rdd.saveAsTextFile("./a.txt")
Transformation
La transformation renvoie un nouveau RDD immuable
filter/map/reduce
filter
filter(f)
Renvoie un rdd contenant uniquement les éléments pour lesquels f est vrai
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]
map
map(f)
Renvoie rdd avec f agissant sur tous les éléments
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]
flatMap
flatmap(f)
Après avoir appliqué f à tous les éléments, retournez rdd qui a développé la liste dans l'élément
>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']
Reduce
reduce(f)
Continuez à agir sur deux éléments pour obtenir une valeur de retour
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6
Pair RDD est un RDD qui a Tuple comme élément en Python. Peut gérer la clé et la valeur.
Pour le faire, utilisez
keyBy ou utilisez `` map
pour renvoyer un taple avec 2 éléments à l'élément.
keyBy(PairRDD)
keyby(f)
Laisser f agir sur l'élément de rdd ordinaire et renvoyer rdd avec sa valeur de retour comme clé et l'élément d'origine comme valeur.
>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]
keys
keys
Renvoie un rdd composé uniquement des clés de la paire rdd
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']
values
values
Renvoie un rdd constitué uniquement de vlaue de rdd apparié
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]
flatMapValues
flatmapvalues(f)
Appliquez flatmap à la valeur de pairrdd pour dupliquer la clé et en faire ce que l'on appelle un maintien vertical
>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
('Ken', 'Yukiko'),
('Bob', 'Meg'),
('Bob', ' Tomomi'),
('Bob', ' Akira'),
('Taka', 'Yuki')]
reduceByKey
reducebykey(f)
Regrouper par éléments avec la même clé et appliquer la réduction à la valeur
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]
countByKey
countbykey()
Compter le nombre de valeurs de la même clé et renvoyer avec dict
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})
sortByKey
sortbykey
Trier la paire rdd par clé
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]
leftOuterJoin L'extérieur gauche joint deux RDD et renvoie une paire RDD avec un tuple de deux éléments en valeur
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]
rightOuterJoin Jointure externe droite de deux RDD et retourne une paire de RDD avec un tuple de deux éléments en valeur
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]
fullOuterJoin Jointure externe complète de deux RDD et retour d'une paire de RDD avec un tuple de deux éléments en valeur
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]
sortBy
sortby(f)
Trier par la valeur renvoyée par f
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortBy(lambda (x, y): x).collect() #Identique à sortByKey
intersection
intersection(rdd)
Renvoie une intersection de deux rdd
union
union(rdd)
Renvoie l'union de deux rdd
zip
zip(rdd)
Renvoie une paire rdd avec chaque élément de l'argument rdd comme vlaue
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]
distinct Renvoie un RDD qui ne contient pas les mêmes éléments
sample
sample(bool, frac)
Renvoie le rdd échantillonné. Le premier argument détermine si le même élément peut être dupliqué.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]
takeSample
takesmaple(bool, size)
Renvoie une liste d'échantillons de taille fixe. Le premier argument détermine si le même élément peut être dupliqué.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]
toDebugString
todebugstring()
Renvoie le plan d'exécution
print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
| MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
| ShuffledRDD[188] at partitionBy at null:-1 []
+-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []
persist
persist()
Cache rdd tel quel (en mémoire par défaut). Vous pouvez définir uniquement la mémoire, le disque si la mémoire n'est pas possible, le disque uniquement, etc. (storagelevel
Spécifié par)
>>> rdd.persist()
unpersist
unpersist()
Résolvez la persistance de rdd. Utilisé lors du changement du niveau de persistance.
>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)
Sera ajouté à tout moment
word count
>>> rdd.flatMap(lambda x: x.split())\
.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y)\
.take(5)
DataFrame Ceci est particulièrement pratique lorsqu'il s'agit de données structurées.
read.json
read.json(file)Lire les données de json
```py
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")
Il y a show '' en plus de
collect '', `` take '' qui est identique à RDD
show
show(n)
Afficher n lignes (n vaut 20 par défaut)
>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
select
select(column)
Renvoie le dataframe sélectionné, en passant un objet chaîne ou colonne. Vous pouvez également énumérer des colonnes pour obtenir plusieurs colonnes ou effectuer des calculs.
>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+
#Idem pour le suivant
>>> df.select(df.age).show() #Passer un objet Column
>>> df.select(df["age"]).show() #Passer un objet Column
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
Il existe deux modèles en Python pour accéder à l'objet Column passé par select:
>>> df.age
Column<age>
>>> df["age"]
Column<age>
where/filter
filter(condition)
Renvoie une trame de données composée uniquement de lignes répondant aux critères de chaîne.where
Estfilter
C'est un alias de.
>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
+---+----+------+
sort
sort(column)
Renvoie un dataframe trié par la colonne spécifiée
>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg| 45|
| 30| Bob| 80|
| 35| Ken| null|
+---+----+------+
limit
limit(n)
Renvoie une trame de données limitée aux n premières lignes uniquement
>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
+---+----+------+
distinct
distinct()
Renvoie une trame de données constituée uniquement des lignes de résultat distinctes
>>> df.distinct().count()
3
join
join(dataframe, on, how)
comment la valeur par défaut est intérieure
--on: Colonne ou liste de colonnes
--how: `" intérieur "`
, "extérieur"
,
"gauche_outer"
,
`right_outer"
,
`leftsemi" ` L'un des
Puisque le DataFrame est construit sur le RDD, le RDD original peut être récupéré.
>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
Row(age=30, name=u'Bob', weight=80),
Row(age=29, name=u'Meg', weight=45)]
Accédez aux attributs correspondants de l'objet `` Row '' pour récupérer uniquement des colonnes spécifiques
df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]
toJson
tojson()
Convertissez en rdd sous la forme de json. après çasaveastextfile
Vous pouvez l'enregistrer au format json en appelant.
>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
Des éléments liés à Spark Streaming et Mllib peuvent être ajoutés ici.
Recommended Posts