Spark DataSet DataFrame 操作

本文基于 Spark 2.x,除特殊说明,代码均为 Scala

函数详解

import

使用 DataFrame 通常需要 import org.apache.spark.sql.functions._,这里引入了 col, lit, udf, sum, max, avg, agg 等常用函数。函数源码

PySpark UDF

不推荐,与 Scala 版的 UDF 相比性能非常差。

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

def filter_xx(xx):
if xx == "abc":
return True
return False

my_udf = udf(filter_xx, BooleanType())
df.filter(my_udf(df.xx))

Scala UDF

这里是一个 scala 版的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 一个过滤函数,不需要关心细节
// 有两个参数,第二个参数是程序运行时传入的,不能写死
def filter_xx(str: String, xx: String): Boolean = {
if(str == "") return false
val list_xx = xx.split(",", -1)
str.split(",", -1).foreach(id => {
if(list_xx.contains(id)) return true
})
false
}

val my_udf = udf(filter_xx _) // 最后这个 _ 传参数
val xx = "a,b,c"

// 由于 my_udf 的第二个参数是外部传入的,这里通过 lit 来传给我们自定义的 udf
df.filter(my_udf($"str", lit(xx)))

lit

lit 在使用中,可以传入一个自定义的值,参见上面的 UDF 。

col 和 语法糖 $”fields”

下面这两种写法是一样的:

1
2
3
4
df.select(df.col("field"))

import spark.implicits._
df.select($"field")

cube rollup groupBy

这段部分节选自 What is the difference between cube, rollup and groupBy operators?

首先我们创建一个 DataFrame

1
2
3
4
5
6
7
8
9
10
> val df = Seq(("foo", 1L), ("foo", 2L), ("bar", 2L), ("bar", 2L)).toDF("x", "y")
> df.show
// +---+---+
// | x| y|
// +---+---+
// |foo| 1|
// |foo| 2|
// |bar| 2|
// |bar| 2|
// +---+---+

cube 是这样一个聚合函数:对所有输入的列进行所有可能性的组合,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
> df.cube($"x", $"y").count.show
// +----+----+-----+
// | x| y|count|
// +----+----+-----+
// |null| 1| 1| <- count of records where y = 1
// |null| 2| 3| <- count of records where y = 2
// | foo|null| 2| <- count of records where x = foo
// | bar| 2| 2| <- count of records where x = bar AND y = 2
// | foo| 1| 1| <- count of records where x = foo AND y = 1
// | foo| 2| 1| <- count of records where x = foo AND y = 2
// |null|null| 4| <- total count of records
// | bar|null| 2| <- count of records where x = bar
// +----+----+-----+

rollupcube 相比,没有了 x=null 的情况,也就是没有单独统计 y 的数量(有点类似 left join ?)

1
2
3
4
5
6
7
8
9
10
11
> df.rollup($"x", $"y").count.show
+----+----+-----+
| x| y|count|
+----+----+-----+
| foo|null| 2| <- count where x is fixed to foo
| bar| 2| 2| <- count where x is fixed to bar and y is fixed to 2
| foo| 1| 1| ...
| foo| 2| 1| ...
|null|null| 4| <- count where no column is fixed
| bar|null| 2| <- count where x is fixed to bar
+----+----+-----+

groupBy 和我们熟悉的 SQL 里面的 group by 是一样的。

1
2
3
4
5
6
7
8
9
df.groupBy($"x", $"y").count.show

// +---+---+-----+
// | x| y|count|
// +---+---+-----+
// |foo| 1| 1| <- this is identical to x = foo AND y = 1 in CUBE or ROLLUP
// |foo| 2| 1| <- this is identical to x = foo AND y = 2 in CUBE or ROLLUP
// |bar| 2| 2| <- this is identical to x = bar AND y = 2 in CUBE or ROLLUP
// +---+---+-----+

这三个函数的返回值类型都是 org.apache.spark.sql.RelationalGroupedDataset
后面可以接如下几个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
> import org.apache.spark.sql.functions._
> val df = Seq(("foo", 1L, 2), ("foo", 2L, 3), ("bar", 2L, 4), ("bar", 2L, 5)).toDF("key", "x", "y")
> df.show
// +---+---+---+
// |key| x| y|
// +---+---+---+
// |foo| 1| 2|
// |foo| 2| 3|
// |bar| 2| 4|
// |bar| 2| 5|
// +---+---+---+

// avg 平均值 = mean
> df.groupBy("key").avg("x", "y").show
// +---+------+------+
// |key|avg(x)|avg(y)|
// +---+------+------+
// |bar| 2.0| 4.5|
// |foo| 1.5| 2.5|
// +---+------+------+

// count 计数
> df.groupBy("key", "x").count.show
// +---+---+-----+
// |key| x|count|
// +---+---+-----+
// |foo| 2| 1|
// |bar| 2| 2|
// |foo| 1| 1|
// +---+---+-----+

// max 最大值
> df.groupBy("key").max("x","y").show
// +---+------+------+
// |key|max(x)|max(y)|
// +---+------+------+
// |bar| 2| 5|
// |foo| 2| 3|
// +---+------+------+

// min 最小值
> df.groupBy("key").min("x","y").show
// +---+------+------+
// |key|min(x)|min(y)|
// +---+------+------+
// |bar| 2| 4|
// |foo| 1| 2|
// +---+------+------+

// sum 求和
> df.groupBy("key").sum("x","y").show
// +---+------+------+
// |key|sum(x)|sum(y)|
// +---+------+------+
// |bar| 4| 9|
// |foo| 3| 5|
// +---+------+------+

// pivot 透视
// Seq 是 x 列的所有字段值,当然也可以只写一部分,也可以写不在 x 值域的值
// 这里把 x 的值都变成列字段名,对应的值则是对 y 的求和
// 比如第一行 bar 里面 2 对应的 9,表示当 key=bar, x=2 的时候对 y 求和 4+5=9
> df.groupBy("key").pivot("x", Seq("1", "2")).sum("y").show()
// +---+----+---+
// |key| 1| 2|
// +---+----+---+
// |bar|null| 9|
// |foo| 2| 3|
// +---+----+---+

// agg 对不同的字段做不同的聚合操作
// org.apache.spark.sql.functions 里面的算子应该都可以,未测试
// 比如:对 x 列求 sum,对 y 列求 avg,下面几种写法都可以
> df.groupBy("key").agg(sum("x"), avg("y")).show
> df.groupBy("key").agg("x" -> "sum", "y" -> "avg").show
> df.groupBy("key").agg(Map("x" -> "sum", "y" -> "avg")).show
+---+------+------+
|key|sum(x)|avg(y)|
+---+------+------+
|bar| 4| 4.5|
|foo| 3| 2.5|
+---+------+------+

RDD 中的 reduceByKey,可以用如下方法替换

1
2
3
4
ds.map(x => (x.someKey, x.someField))
.groupByKey(_._1)
.reduceGroups((a, b) => (a._1, a._2 + b._2))
.map(_._2)

对应的 PySpark 操作

1
2
3
4
5
6
7
8
rdd = sc.parallelize([("foo", 1, 2), ("foo", 2, 3), ("bar", 2, 4), ("bar", 2, 5)])
df = sqlContext.createDataFrame(rdd).toDF('key','x','y')
df.show() # 注意在 Python 中,调用方法后面一定要加 () 括号
df.cube("x", "y").count().show() # count show 后面都有括号
df.rollup("x", "y").count().show()
df.groupBy("key").avg("x", "y").show()
df.groupBy("key").pivot("x", ["1", "2", "3"]).sum("y").show() # 3 是多余的
df.groupBy("key").agg({"x": "sum", "y": "avg"}).show() # 对 x 求和,对 y 求平均