I want to make a note of Spark's frequently used API (mainly for myself) so that it can be used quickly even when developing for the first time in a long time. I will summarize the Python version for the time being (I may add the Scala version if I have time)
** This cheat sheet is just a cheat sheet ** (arguments may be omitted), so if you have time, please make sure [Official API documentation (Spark Python API Docs)](http://spark.apache. See org / docs / latest / api / python / index.html).
The following assumes the following
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)
RDD
parallelize
sc.parallelize(collection)Make RDDs from lists and tuples
```py
>>> a = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(a)
textFile
sc.textFile(file)Read the file. You can also use wildcards and regular expressions.
```py
>>> rdd = sc.textFile("./words.txt")
wholeTextFiles
sc.wholeTextFiles(dierctory)Enter the entire contents of each file in the directory into one element of the RDD
```py
# $ ls
# a.json b.json c.json
>>> rdd = sc.textWholeFiles("./")
Action
Transformation is executed in order for the first time when Action is executed (delayed execution)
collect
collect()
Returns all elements
>>> print(rdd.collect())
[1, 2, 3, 4, 5]
take
take(n)
First returns n elements
>>> print(rdd.take(3))
[1, 2, 3]
first
first()
Returns the very first element
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.first()
1
top
top(n)
Returns n elements from the largest
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.top(2)
[3, 2]
count
count()
Count and return the number of elements
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.count()
3
mean
mean()
Returns the average
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.mean()
3.0
sum
sum()
Returns the total
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.sum()
6
variance
variance()
Returns the variance
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.variance()
0.6666666666666666
stdev
stdev()
Returns the standard deviation
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.stdev()
0.816496580927726
saveAsTextFile
saveastextfile(file)
Save the file
>>> rdd.saveAsTextFile("./a.txt")
Transformation
Transformation returns a new immutable RDD
filter/map/reduce
filter
filter(f)
Returns an rdd containing only elements for which f is true
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2]
map
map(f)
Returns rdd with f acting on all elements
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.map(lambda x: x * 2).collect()
[2, 4, 6]
flatMap
flatmap(f)
After applying f to all elements, return rdd which expanded the list in the element
>>> rdd = sc.parallelize(["This is a pen", "This is an apple"])
>>> rdd.flatMap(lambda x: x.split()).collect()
['This', 'is', 'a', 'pen', 'This', 'is', 'an', 'apple']
Reduce
reduce(f)
Continue to act f on two elements to get one return value
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda x, y: x + y)
6
Pair RDD is an RDD that has Tuple as an element in Python. Can handle key and value.
To make it, use `keyBy``` or use
`map``` to return a tuple with 2 elements to the element.
keyBy(PairRDD)
keyby(f)
Let f act on the element of ordinary rdd, and return rdd with its return value as key and the original element as value.
>>> rdd = sc.parallelize(["Ken 27 180 83", "Bob 32 170 65", "Meg 29 165 45"])
>>> rdd.keyBy(lambda x: x.split()[0]).collect()
[('Ken', 'Ken 27 180 83'), ('Bob', 'Bob 32 170 65'), ('Meg', 'Meg 29 165 45')]
keys
keys
Returns an rdd consisting only of the keys of the pair rdd
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().collect()
['Ken', 'Bob', 'Taka', 'Ken', 'Bob']
values
values
Returns an rdd consisting only of the vlaue of the pair rdd
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.values().collect()
[2, 3, 1, 3, 2]
flatMapValues
flatmapvalues(f)
Apply flatmap to the value of pairrdd to duplicate the key and make it so-called vertical holding
>>> rdd = sc.parallelize([("Ken", "Yumi,Yukiko"), ("Bob", "Meg, Tomomi, Akira"), ("Taka", "Yuki")])
>>> rdd.flatMapValues(lambda x: x.split(","))
[('Ken', 'Yumi'),
('Ken', 'Yukiko'),
('Bob', 'Meg'),
('Bob', ' Tomomi'),
('Bob', ' Akira'),
('Taka', 'Yuki')]
reduceByKey
reducebykey(f)
Group by elements with the same key and apply reduce to value
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('Taka', 1), ('Bob', 5), ('Ken', 5)]
countByKey
countbykey()
Count how many values of the same key are and return with dict
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.countByKey()
defaultdict(<type 'int'>, {'Ken': 2, 'Bob': 2, 'Taka': 1})
sortByKey
sortbykey
Sort pair rdd by key
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortByKey().collect()
[('aaa', 2), ('abc', 3), ('bac', 1), ('bbb', 3), ('cba', 2)]
leftOuterJoin Left outer join two RDDs and return a pair RDD with a tuple of two elements in value
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1))]
rightOuterJoin Right outer join two RDDs and return a pair RDD with a tuple of two elements in value
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.rightOuterJoin(rdd2).collect()
[('Ken', (1, 1)), ('Kaz', (3, None))]
fullOuterJoin Full outer join two RDDs and return a pair RDD with a tuple of two elements in value
>>> rdd1 = sc.parallelize([("Ken", 1), ("Bob", 2), ("Meg", 3)])
>>> rdd2 = sc.parallelize([("Ken", 1), ("Kaz", 3)])
>>> rdd1.fullOuterJoin(rdd2).collect()
[('Bob', (2, None)), ('Meg', (3, None)), ('Ken', (1, 1)), ('Kaz', (None, 3))]
sortBy
sortby(f)
Sort by the value returned by f
>>> rdd = sc.parallelize([("cba", 2), ("abc", 3), ("bac", 1), ("bbb",
>>> rdd.sortBy(lambda (x, y): x).collect() #Same as sortByKey
intersection
intersection(rdd)
Returns an intersection of two rdd
union
union(rdd)
Returns the union of two rdd
zip
zip(rdd)
Returns a pair rdd with each element of the argument rdd as vlaue
>>> rdd = sc.parallelize([("Ken", 2), ("Bob", 3), ("Taka", 1), ("Ken", 3), ("Bob", 2)])
>>> rdd.keys().zip(rdd.values())
[('Ken', 2), ('Bob', 3), ('Taka', 1), ('Ken', 3), ('Bob', 2)]
distinct Returns an RDD that does not contain the same elements
sample
sample(bool, frac)
Returns the sampled rdd. The first argument determines whether the same element can be duplicated.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.sample(True, 0.5).collect()
[1, 5, 5]
>>> rdd.sample(False, 0.5).collect()
[1, 3, 5]
takeSample
takesmaple(bool, size)
Returns a list of fixed size samples. The first argument determines whether the same element can be duplicated.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.takeSample(True, 2)
[5, 5]
>>> rdd.takeSample(False, 2)
[3, 5]
toDebugString
todebugstring()
Returns the execution plan
print(rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).toDebugString())
(1) PythonRDD[190] at RDD at PythonRDD.scala:43 []
| MapPartitionsRDD[189] at mapPartitions at PythonRDD.scala:374 []
| ShuffledRDD[188] at partitionBy at null:-1 []
+-(1) PairwiseRDD[187] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| PythonRDD[186] at reduceByKey at <ipython-input-114-71f5cb742e13>:1 []
| ParallelCollectionRDD[141] at parallelize at PythonRDD.scala:423 []
persist
persist()
Cache rdd as is (in memory by default). You can set only memory, disk if memory is not possible, disk only, etc. (storagelevel
Specified by)
>>> rdd.persist()
unpersist
unpersist()
Solve the persistence of rdd. Used when changing the persistence level.
>>> from pyspark import StorageLevel
>>> rdd.persist()
>>> rdd.unpersist()
>>> rdd.persist(StorageLevel.DISK_ONLY)
Will be added at any time
word count
>>> rdd.flatMap(lambda x: x.split())\
.map(lambda x: (x, 1))\
.reduceByKey(lambda x, y: x + y)\
.take(5)
DataFrame This is especially convenient when dealing with structured data.
read.json
read.json(file)Read data from json
```py
# $ cat a.json
# {"name":"Ken", "age":35}
# {"name":"Bob", "age":30, "weight":80}
# {"name":"Meg", "age":29, "weight":45}
df = sqlContext.read.json("a.json")
There is `show``` in addition to
collect```, ``
take``` which is the same as RDD
show
show(n)
Display n lines (n is 20 by default)
>>> df.show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
select
select(column)
Returns the selected dataframe, passing a string or column object. You can also enumerate columns to get multiple columns or perform calculations.
>>> df.select("age").show()
+---+
|age|
+---+
| 35|
| 30|
| 29|
+---+
#Same for next
>>> df.select(df.age).show() #Pass a Column object
>>> df.select(df["age"]).show() #Pass a Column object
>>> df.select(df.name, df.age).show()
+----+---+
|name|age|
+----+---+
| Ken| 35|
| Bob| 30|
| Meg| 29|
+----+---+
There are two patterns in Python for accessing the Column object passed by select:
>>> df.age
Column<age>
>>> df["age"]
Column<age>
where/filter
filter(condition)
Returns a dataframe consisting of only lines that meet the string criteria.where
Isfilter
Alias.
>>> df.where(df.age >=30).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
+---+----+------+
sort
sort(column)
Returns a dataframe sorted by the specified column
>>> df.sort(df.age)
+---+----+------+
|age|name|weight|
+---+----+------+
| 29| Meg| 45|
| 30| Bob| 80|
| 35| Ken| null|
+---+----+------+
limit
limit(n)
Returns a dataframe limited to only the first n rows
>>> df.limit(1).show()
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
+---+----+------+
distinct
distinct()
Returns a dataframe consisting only of the distinct result rows
>>> df.distinct().count()
3
join
join(dataframe, on, how)
how default is inner
--on: Column or list of columns
--how: `" inner "```,
"outer"
,
"left_outer"
,
"right_outer"
,
"leftsemi" ` One of
Since the DataFrame is built on the RDD, the original RDD can be retrieved.
>>> print(df.rdd.collect())
[Row(age=35, name=u'Ken', weight=None),
Row(age=30, name=u'Bob', weight=80),
Row(age=29, name=u'Meg', weight=45)]
To retrieve only a specific column, access the corresponding attribute of the Row
object
df.rdd.map(lambda row: (row.age, row.weight)).collect()
[(35, None), (30, 80), (29, 45)]
toJson
tojson()
Convert to rdd in the form of json. after thissaveastextfile
You can save it in json format by calling.
>>> df.toJSON().saveAsTextFile("b.json")
>>> df2 = sqlContext.read.json("/b.json")
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
Spark Streaming and Mllib related items may be added here.
Recommended Posts