对于一个数据集,map
是对每行进行操作,为每行得到一个结果;reduce
则是对多行进行操作,得到一个结果;而 window
函数则是对多行进行操作,得到多个结果(每行一个)。本文会以实例介绍 window
函数的基本概念和用法。
示例:计算成绩排名
例如大学里有许多专业,每个专业有若干个班级,每个班级又有许多学生,这次考试,每个学生的成绩用 pyspark 表示如下:
df = sqlContext.createDataFrame([ |
现在我们的需求是:计算每个学生在专业里的成绩排名。
首先,我们将学生按专业分成两组:
接着我们按分数从高到低进行排序:
之后是执行窗口函数。对于每个学生,我们将排在它之前的所有学生取出,再计算当前学生排在第几名:
对应的 pyspark 代码如下:
windowSpec = Window.partitionBy(df.subject) |
如何定义窗口
一个窗口需要定义三个部分:
- 分组,如何将行分组?在选取窗口数据时,只对组内数据生效
- 排序,按何种方式进行排序?选取窗口数据时,会首先按指定方式排序
- 帧(frame)选取,以当前行为基准,如何选取周围行?
Row Frame(行帧)
行帧,即选择帧的时候通过行数指定。语法为 rowsBetween(x, y)
,其中 x, y
可以是数字,-n
表示向前数 n
行,n
表示向后数 n
行。除此之外,还可以是:
Window.unboundedPreceding
表示当前行之前的无限行Window.currentRow
表示当前行Window.unboundedFollowing
表示当前行之后的无限行
例如,要选择当前行的前一行和后一行,则 pyspark 的写法为 rowsBetween(-1, 1)
,对应 SQL 的写法为 ROWS BETWEEN 1 PRECEEDING AND 1 FOLLOWING
,表示如下图:
Range Frame(范围帧)
有时,我们想根据当前行列值的范围来选取窗口,语法为 rangeBetween(x, y)
。例如,当前的分数为 60
,选择范围帧 rangeBetween(-20, 20)
,则会选择所有分数落在
[40, 80]
范围内的行。如下图:
窗口函数
从通用性的角度来说,选定帧内数据后,做何种计算,需要让用户自行定义。考虑到效率和便利性等因素,Spark SQL 不支持自定义的窗口函数[1],而是提供了一些内置的优化过的函数,来满足日常的需求。
Spark SQL 支持三种类型的窗口函数:排名函数(ranking function)、分析函数
(analytic functions)和聚合函数(aggregate functions)。其中聚合函数(如 max
,
min
, avg
等)常用在 reduce
操作中,不再介绍,其它函数如下:
SQL | Dataframe API | |
---|---|---|
Ranking functions | rank | rank |
dense_rank | denseRank | |
percent_rank | percentRank | |
ntile | ntile | |
row_number | rowNumber | |
Analytic functions | cume_dist | cumeDist |
first_value | firstValue | |
last_value | lastValue | |
lag | lead |
这些函数在使用时,只需要将函数应用在窗口定义上,例如
avg(df.score).over(windowSpec)
。
小结
文章给出一个使用窗口函数的示例,并尝试说清如何定义一个窗口,包括帧的选择,最后给出一些常用的窗口函数。
注意的是窗口函数在 Spark 1.4 开始支持,一些窗口函数在 Spark 1.* 中需要使用
HiveContext
才能运行。
参考
- Introducing Window Functions in Spark SQL 入门必看
- https://knockdata.github.io/spark-window-function/ 也是不错的入门材料,可以相互佐证
- https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions.html 窗口函数汇总
StackOverflow 看到说 pyspark >= 2.4 后才支持窗口函数中使用 UDF,这里不深究了 ↩