Depuis Spark Ver 1.3, une fonction appelée Spark Dataframe a été ajoutée. Les caractéristiques sont les suivantes.
filter
et select
.groupBy → agg
En d'autres termes, il s'agit d'un code simple et d'un traitement plus rapide que l'écriture en RDD map
ou filter
. En supposant que le prétraitement des données soit effectué par RDD, il est préférable de les lire immédiatement dans Dataframe avec maji. Étant donné que les mémos de Dataframe sont dispersés, je sauvegarderai l'exemple de code lorsque j'aurai un mémorandum.
De plus, il convient de noter
Utilisez le journal d'accès comme sujet. Journal d'accès (csv) utilisé dans Technical Review Company's Book, au fichier csv Cliquez ici pour Nao Rin. Le contenu de csv est le journal suivant avec 3 informations de date, User_ID, Campaign_ID
click.at user.id campaign.id
2015/4/27 20:40 144012 Campaign077
2015/4/27 0:27 24485 Campaign063
2015/4/27 0:28 24485 Campaign063
2015/4/27 0:33 24485 Campaign038
Lisez csv et faites-le RDD. Supprimez l'en-tête de la première ligne et lisez la première colonne en tant qu'objet datetime.
import json, os, datetime, collections, commands
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
if not os.path.exists("./click_data_sample.csv"):
print "csv file not found at master node, will download and copy to HDFS"
commands.getoutput("wget -q http://image.gihyo.co.jp/assets/files/book/2015/978-4-7741-7631-4/download/click_data_sample.csv")
commands.getoutput("hadoop fs -copyFromLocal -f ./click_data_sample.csv /user/hadoop/")
whole_raw_log = sc.textFile("/user/hadoop/click_data_sample.csv")
header = whole_raw_log.first()
whole_log = whole_raw_log.filter(lambda x:x !=header).map(lambda line: line.split(","))\
.map(lambda line: [datetime.datetime.strptime(line[0].replace('"', ''), '%Y-%m-%d %H:%M:%S'), int(line[1]), line[2].replace('"', '')])
whole_log.take(3)
#[[datetime.datetime(2015, 4, 27, 20, 40, 40), 144012, u'Campaign077'],
# [datetime.datetime(2015, 4, 27, 0, 27, 55), 24485, u'Campaign063'],
# [datetime.datetime(2015, 4, 27, 0, 28, 13), 24485, u'Campaign063']]
Dataframe peut être créé avec sqlContext.createDataFrame (my_rdd, my_schema)
en spécifiant le nom de la colonne et chaque type (TimestampType
, ʻIntegerType,
StringType`, etc.) s'il existe un RDD original. Je vais. Voir ici pour la définition de Schéma.
printSchema ()
, dtypes
sont les informations du schéma, count () ʻest le nombre de lignes, et
show (n) ʻest les n premiers enregistrements.
fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)
whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)
#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
Pour convertir les données lues à partir de csv en un Dataframe tel quel, utilisez spark-csv
, qui est l'un des Spark Package. Il est plus facile d'utiliser databricks / spark-csv). Sauf indication contraire, tous sont lus comme des chaînes, mais si vous spécifiez ʻinfer Schema`, ce sera une bonne analogie.
whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)
#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#| click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55| 24485|Campaign063|
#|2015-04-27 00:28:13| 24485|Campaign063|
#|2015-04-27 00:33:42| 24485|Campaign038|
#|2015-04-27 01:00:04| 24485|Campaign063|
whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_3.printSchema()
#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)
En passant, il est gênant d'avoir .
dans le nom de la colonne, vous pouvez donc le renommer avec withColumnRenamed
(vous pouvez créer un autre Dataframe renommé).
whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
.withColumnRenamed("user.id", "userID")\
.withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
Utilisez sqlContext.read.json
pour convertir les données lues à partir du fichier json en un Dataframe tel quel. Traitez chaque ligne de fichier comme un objet json 1, s'il existe une clé qui n'existe pas, «null» sera entré.
# test_json.json contains following 3 lines, last line doesn't have "campaignID" key
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}
df_json = sqlContext.read.json("/user/hadoop/test_json.json")
df_json.printSchema()
df_json.show(5)
#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#| access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55| null| 24485|
#+-------------------+-----------+------+
Utilisez sqlContext.read.parquet
pour convertir les données lues à partir du fichier parquet en un Dataframe tel quel. Si vous spécifiez le dossier dans lequel se trouve le fichier parquet, les fichiers parquet sous ce dossier seront lus dans un lot.
sqlContext.read.parquet("/user/hadoop/parquet_folder/")
Ceci est un exemple qui interroge le Dataframe avec des instructions SQL. Si vous donnez le nom de table SQL à Dataframe avec registerTempTable
, vous pouvez y faire référence en tant que nom de table SQL. La valeur de retour de sqlContext.sql (instruction SQL)
est également un Dataframe.
Il est possible de décrire la sous-requête, mais sachez que si vous n'ajoutez pas d'alias au côté de la sous-requête, une erreur de syntaxe se produira pour une raison quelconque.
#Requête SQL simple
whole_log_df.registerTempTable("whole_log_table")
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+
#Lors de la mise en place de variables dans des instructions SQL
for count in range(1, 3):
print "Campaign00" + str(count)
print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()
#Campaign001
#+----------+
#|access_num|
#+----------+
#| 2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#| 1674|
#+----------+
#Pour la sous-requête:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#| 20480|
#+------------+
Il s'agit d'une fonction de recherche simple pour Dataframe. L'instruction SQL ci-dessus est similaire en fonction à Query, mais "filter" et "select" sont positionnés comme des fonctions de recherche simples. filter
extrait les lignes qui remplissent les conditions et select
extrait les colonnes. Notez que la grammaire est légèrement différente du «filtre» RDD.
#Sample for filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+
#Sample for select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#| access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+
groupBy
fournit des fonctionnalités similaires à reductionByKey
de RDD, mais groupBy
est la méthode ici. En appelant .sql.html # pyspark.sql.GroupedData) après cela, diverses fonctions d'agrégation peuvent être réalisées. Des exemples typiques sont «agg» et «count».
Exécute groupBy
avec campaignID
comme clé et compte le nombre d'enregistrements aveccount ()
. Si vous énumérez plusieurs clés dans groupBy
, la combinaison sera utilisée comme clé pour groupBy.
print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+
print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292| 633|
#|Campaign086|107624| 623|
#|Campaign047|121150| 517|
#|Campaign086| 22975| 491|
#|Campaign122| 90714| 431|
#+-----------+------+-----+
Vous pouvez exécuter GroupBy avec ʻuserIDcomme clé et calculer la moyenne et le maximum / minimum des résultats agrégés. Renvoie le résultat de l'exécution de la fonction
value (
min,
sum, ʻave
etc) sur la colonne key
avec ʻagg ({key: value}). Puisque la valeur de retour est un Dataframe, il est possible de réduire davantage les lignes avec
.filter ()`.
print whole_log_df.groupBy("userID").agg({"access_time": "min"}).show(3)
#+------+--------------------+
#|userID| min(access_time)|
#+------+--------------------+
#| 4831|2015-04-27 22:49:...|
#| 48631|2015-04-27 22:15:...|
#|143031|2015-04-27 21:52:...|
#+------+--------------------+
print whole_log_df.groupBy("userID").agg({"access_time": "min"}).filter("min(access_time) < '2015-04-28'").count()
#20480
Pivot est une nouvelle fonctionnalité de Spark v1.6 Fournit des fonctionnalités similaires à SQL Pivot -spark.html). Dans le cas de Pivot of Sample code, les modifications verticales et horizontales comme suit.
pivot_df
)Vous devez toujours appeler groupBy ("colonnes qui restent verticales"). Pivot ("colonnes que vous voulez convertir de portrait en paysage"). Sum ("colonnes de valeurs agrégées") et trois méthodes de la chaîne.
agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)
#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107| 4|
#|103339|Campaign027| 1|
#|169114|Campaign112| 1|
#+------+-----------+-----+
#Une cellule sans valeur sera nulle
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()
#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)
#Lorsque vous souhaitez remplir une cellule sans valeur avec 0
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)
UDF peut être utilisé dans Spark Dataframe, je pense que l'utilisation principale est d'ajouter des colonnes. Étant donné que le Dataframe est fondamentalement immuable, vous ne pouvez pas modifier le contenu de la colonne et vous allez créer un autre Dataframe avec la colonne ajoutée.
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
def add_day_column(access_time):
return int(access_time.strftime("%Y%m%d"))
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)
#+--------------------+------+-----------+----------+
#| access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077| 20150427|
#|2015-04-27 00:27:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:28:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:33:...| 24485|Campaign038| 20150427|
#|2015-04-27 01:00:...| 24485|Campaign063| 20150427|
#+--------------------+------+-----------+----------+
La notation UDF peut également être écrite à l'aide de la fonction lambda.
my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)
#+--------------------+------+-----------+--------+
#| access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077| 144017|
#|2015-04-27 00:27:...| 24485|Campaign063| 24490|
#|2015-04-27 00:28:...| 24485|Campaign063| 24490|
#|2015-04-27 00:33:...| 24485|Campaign038| 24490|
#|2015-04-27 01:00:...| 24485|Campaign063| 24490|
#+--------------------+------+-----------+--------+
Inversement, utilisez df.drop ()
pour créer un Dataframe qui souhaite supprimer une colonne particulière.
print whole_log_df.drop("userID").show(3)
#+--------------------+-----------+
#| access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+
Il est également possible de joindre deux Dataframes. Ici, considérons le cas où seul le journal d'un utilisateur lourd (utilisateur avec 100 accès ou plus) est extrait du journal entier.
Premièrement, l'ID utilisateur d'un utilisateur qui a 100 accès ou plus et le nombre d'accès sont agrégés par `.groupBy (" userID "). Count ()" et réduit à 100 ou plus par "filtre".
heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)
print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()
#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231| 134|
#| 13431| 128|
#|144432| 113|
#+------+-----+
#
#177
Si vous appelez la méthode join
dans le Dataframe d'origine (qui sera à gauche) et écrivez la condition de jointure avec le partenaire de jointure (qui sera à droite), vous pouvez rejoindre le Dataframe comme une jointure SQL.
Le format de jointure doit être ʻinner, ʻouter
, left_outer
, rignt_outer
, etc., mais autre que ʻinner ne fonctionne pas comme prévu (et est traité comme ʻouter
). Pour le moment, j'essaye de joindre à l'extérieur avec ʻinner, puis de supprimer les colonnes inutiles avec
drop`. Veuillez consulter la page officielle pour les options détaillées, etc..
Par le processus de jointure suivant, nous avons pu récupérer le journal de 38 729 lignes correspondant aux utilisateurs (177 personnes) qui ont 100 accès ou plus (le journal total est d'environ 320 000 lignes).
joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()
#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)
#None
#+--------------------+-----------+------+
#| access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729
.distinct ()
à la fin du Dataframe.print whole_log_df.columns
#['access_time', 'userID', 'campaignID']
print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]
print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]
Il existe deux manières principales de convertir un Dataframe en RDD.
.map
.rdd
my_rdd.rdd.map (lambda x: x.asDict ())
et .asDict ()
pour l'objet Row, vous pouvez le convertir en RDD clé-valeur.#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]
#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]
Si vous exportez le Dataframe vers un fichier au format Parquet, vous pouvez exporter vers un fichier tout en conservant les informations de schéma. Si le répertoire du compartiment S3 à exporter existe déjà, l'écriture échouera. Spécifiez un nom de répertoire qui n'existe pas encore.
#write to parquet filed
whole_log_df.select("access_time", "userID").write.parquet("s3n://my_S3_bucket/parquet_export")
#reload from parquet filed
reload_df = sqlContext.read.parquet("s3n://my_S3_bucket/parquet_export")
print reload_df.printSchema()
Recommended Posts