From Spark Ver 1.3, a function called Spark Dataframe has been added. The features are as follows.
filter
and select
.groupBy → agg
In other words, it is simple code and faster processing than writing in RDD map
or filter
. Assuming that data pre-processing is done by RDD, it is better to load it into Dataframe immediately with maji. Since the memos of Dataframe are scattered, I will put back the Sample code in the memorandum.
In addition, it should be noted
Use Access Log as the subject. Access Log (csv) used in Technical Review Company's Book, to csv file Click here for Nao Rin. The content of csv is the following Log with 3 information of date, User_ID, Campaign_ID
click.at user.id campaign.id
2015/4/27 20:40 144012 Campaign077
2015/4/27 0:27 24485 Campaign063
2015/4/27 0:28 24485 Campaign063
2015/4/27 0:33 24485 Campaign038
Read csv and make it RDD. Delete the header in the first row and read the first column as a datetime Object.
import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
if not os.path.exists("./click_data_sample.csv"):
print "csv file not found at master node, will download and copy to HDFS"
commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")
whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
.map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])
whole_log.take(3)
#[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
# [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
# [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]
Dataframe can be created with sqlContext.createDataFrame (my_rdd, my_schema)
by specifying the column name and each Type (TimestampType
, ʻIntegerType,
StringType`, etc.) if there is an original RDD. I will. See here for Schema definitions.
printSchema ()
, dtypes
is for Schema information,count ()
is for the number of lines, andshow (n)
is for the first n records.
fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)
whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)
#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
To convert the data read from csv into a Dataframe as it is, use spark-csv
which is one of Spark Package. It's easier to use databricks / spark-csv). Unless otherwise specified, all are read as strings, but if you specify ʻinfer Schema`, it will be a good analogy.
whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)
#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#| click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55| 24485|Campaign063|
#|2015-04-27 00:28:13| 24485|Campaign063|
#|2015-04-27 00:33:42| 24485|Campaign038|
#|2015-04-27 01:00:04| 24485|Campaign063|
whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_3.printSchema()
#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)
By the way, it is troublesome to have .
in the column name, so you can rename it with withColumnRenamed
(you can create another renamed Dataframe).
whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
.withColumnRenamed("user.id", "userID")\
.withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
Use sqlContext.read.json
to convert the data read from json file into Dataframe as it is. Treat each line of file as 1 json object, if there is a Key that does not exist, null
will be entered.
# test_json.json contains following 3 lines, last line doesn't have "campaignID" key
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}
df_json = sqlContext.read.json("/user/hadoop/test_json.json")
df_json.printSchema()
df_json.show(5)
#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#| access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55| null| 24485|
#+-------------------+-----------+------+
Use sqlContext.read.parquet
to convert the data read from the parquet file into a Dataframe as it is. If you specify the Folder where the parquet file is located, the parquet files under that Folder will be read in a batch.
sqlContext.read.parquet("/user/hadoop/parquet_folder/")
This is a sample that queries the Dataframe with SQL statements. If you give SQL Table name to Dataframe with registerTempTable
, you can refer to it as SQL Table name. The return value of sqlContext.sql (SQL statement)
is also a Dataframe.
It is possible to describe Sub Query, but be aware that if you do not add Alias to the Sub Query side, a Syntax error will occur for some reason.
#Simple SQL query
whole_log_df.registerTempTable("whole_log_table")
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+
#When putting variables in SQL statements
for count in range(1, 3):
print "Campaign00" + str(count)
print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()
#Campaign001
#+----------+
#|access_num|
#+----------+
#| 2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#| 1674|
#+----------+
#For Sub Query:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#| 20480|
#+------------+
This is a simple search function for Dataframe. The SQL statement above is similar in function to Query
, but filter
and select
are positioned as simple search functions. filter
extracts the rows that meet the conditions, and select
extracts the columns. Note that the grammar is slightly different from the RDD filter
.
#Sample for filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+
#Sample for select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#| access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+
groupBy
provides functionality similar to RDD's reduceByKey
, but groupBy
is the method here. By calling .sql.html # pyspark.sql.GroupedData) after that, various aggregation functions can be realized. Typical examples are ʻaggand
count`.
Executes groupBy
with campaignID
as the Key, and counts the number of Records with count ()
. If you enumerate multiple Keys in groupBy
, the combination will be groupBy as a key.
print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+
print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292| 633|
#|Campaign086|107624| 623|
#|Campaign047|121150| 517|
#|Campaign086| 22975| 491|
#|Campaign122| 90714| 431|
#+-----------+------+-----+
It is possible to execute GroupBy with ʻuserIDas the Key and calculate the average and maximum / minimum of the aggregated results. Returns the result of executing the
value function (
min,
sum, ʻave
etc) on the key
column with ʻagg ({key: value}). Since the return value is a Dataframe, you can further narrow down the rows with
.filter ()`.
print whole_log_df.groupBy("userID").agg({"access_time": "min"}).show(3)
#+------+--------------------+
#|userID| min(access_time)|
#+------+--------------------+
#| 4831|2015-04-27 22:49:...|
#| 48631|2015-04-27 22:15:...|
#|143031|2015-04-27 21:52:...|
#+------+--------------------+
print whole_log_df.groupBy("userID").agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()
#20480
Pivot is a new feature from Spark v1.6 Provides similar functionality to SQL Pivot -spark.html). In the case of Pivot of Sample code, the vertical and horizontal changes as follows.
pivot_df
)You must always call groupBy ("column that remains vertical"). Pivot ("column that you want to convert from vertical to horizontal"). Sum ("column of aggregated values") and three methods in the chain.
agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)
#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107| 4|
#|103339|Campaign027| 1|
#|169114|Campaign112| 1|
#+------+-----------+-----+
#A Cell with no value will be null
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()
#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)
#When you want to fill a Cell with no value with 0
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)
UDF can be used in Spark Dataframe, I think the main use is to add columns. Since the Dataframe is basically Immutable, you cannot change the contents of the column, and you will create another Dataframe with the column added.
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
def add_day_column(access_time):
return int(access_time.strftime("%Y%m%d"))
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)
#+--------------------+------+-----------+----------+
#| access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077| 20150427|
#|2015-04-27 00:27:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:28:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:33:...| 24485|Campaign038| 20150427|
#|2015-04-27 01:00:...| 24485|Campaign063| 20150427|
#+--------------------+------+-----------+----------+
The UDF notation can also be written using the lambda function.
my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)
#+--------------------+------+-----------+--------+
#| access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077| 144017|
#|2015-04-27 00:27:...| 24485|Campaign063| 24490|
#|2015-04-27 00:28:...| 24485|Campaign063| 24490|
#|2015-04-27 00:33:...| 24485|Campaign038| 24490|
#|2015-04-27 01:00:...| 24485|Campaign063| 24490|
#+--------------------+------+-----------+--------+
Conversely, use df.drop ()
to create a Dataframe that wants to remove a particular column.
print whole_log_df.drop("userID").show(3)
#+--------------------+-----------+
#| access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+
It is also possible to join two Dataframes. Here, consider the case where only the Log of Heavy User (User with 100 or more Access times) is extracted from the entire Log.
First, the User ID of the User who has 100 or more Access times and the number of Access are aggregated by `.groupBy ("userID").
heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)
print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()
#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231| 134|
#| 13431| 128|
#|144432| 113|
#+------+-----+
#
#177
If you call the join
method in the original Dataframe (which will be Left) and write the join condition with the join partner (which will be Right), you can join the Dataframe like a SQL join.
You should be able to select ʻinner, ʻouter
, left_outer
, rignt_outer
, etc. as the join format, but other than ʻinner does not work as intended (and is processed as ʻouter
). For the time being, I am trying to outer join with ʻinner and then delete unnecessary Columns with
drop`. Please refer to Official Page for detailed options etc..
By the following join process, we were able to retrieve the Log of 38,729 rows corresponding to Users (177 people) who have 100 or more accesses (the total Log is about 320,000 rows).
joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()
#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)
#None
#+--------------------+-----------+------+
#| access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729
.distinct ()
to the end of the Dataframe.print whole_log_df.columns
#['access_time', 'userID', 'campaignID']
print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]
print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]
There are two main ways to convert a Dataframe back to RDD.
.map
.rdd
my_rdd.rdd.map (lambda x: x.asDict ())
and .asDict ()
for the Row object, you can convert it to a key-value RDD.#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]
#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]
If you export the Dataframe to a file in Parquet format, you can export it to a file while retaining the schema information. If the directory of the S3 bucket to be exported already exists, writing will fail. Specify a directory name that does not yet exist.
#write to parquet filed
whole_log_df.select("access_time", "userID").write.parquet("s3n://my_S3_bucket/parquet_export")
#reload from parquet filed
reload_df = sqlContext.read.parquet("s3n://my_S3_bucket/parquet_export")
print reload_df.printSchema()