Spark DataSet DateFrame 操作

本文基于 Spark 2.x,除特殊说明,代码均为 Scala

函数详解

import

使用 DataFrame 通常需要 import org.apache.spark.sql.functions._,这里引入了 col lit udf 等常用函数

PySpark UDF

不推荐,与 Scala 版的 UDF 相比性能非常差。

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

def filter_xx(xx):
if xx == "abc":
return True
return False

my_udf = udf(filter_xx, BooleanType())
df.filter(my_udf(df.xx))

Scala UDF

这里是一个 scala 版的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 一个过滤函数,不需要关系细节
// 有两个参数,第二个参数是程序运行时传入的,不能写死
def filter_xx(str: String, xx: String): Boolean = {
if(str == "") return false
val list_xx = xx.split(",", -1)
str.split(",", -1).foreach(id => {
if(list_xx.contains(id)) return true
})
false
}

val my_udf = udf(filter_xx _) // 最后这个 _ 传参数
val xx = "a,b,c"

// 由于 my_udf 的第二个参数是外部传入的,这里通过 lit 来传给我们自定义的 udf
df.filter(my_udf($"str", lit(xx)))

lit

lit 在使用中,可以传入一个自定义的值,参见上面的 UDF 。

col 和 语法糖 $”fields”

下面这两种写法是一样的:

1
2
3
df.select(df.col("field"))

df.select($"field")

cube rollup groupBy

这段翻译自 What is the difference between cube, rollup and groupBy operators?

首先我们创建一个 DateFrame

1
2
3
4
5
6
7
8
9
10
11
12
val df = Seq(("foo", 1L), ("foo", 2L), ("bar", 2L), ("bar", 2L)).toDF("x", "y")

df.show

// +---+---+
// | x| y|
// +---+---+
// |foo| 1|
// |foo| 2|
// |bar| 2|
// |bar| 2|
// +---+---+

cube 是这样一个聚合函数:对所有输入的列进行所有可能性的组合,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
df.cube($"x", $"y").count.show

// +----+----+-----+
// | x| y|count|
// +----+----+-----+
// |null| 1| 1| <- count of records where y = 1
// |null| 2| 3| <- count of records where y = 2
// | foo|null| 2| <- count of records where x = foo
// | bar| 2| 2| <- count of records where x = bar AND y = 2
// | foo| 1| 1| <- count of records where x = foo AND y = 1
// | foo| 2| 1| <- count of records where x = foo AND y = 2
// |null|null| 4| <- total count of records
// | bar|null| 2| <- count of records where x = bar
// +----+----+-----+

rollupcube 相比,没有了 x=null 的情况,也就是没有单独统计 y 的数量(有点类似 left join ?)

1
2
3
4
5
6
7
8
9
10
11
df.rollup($"x", $"y").count.show
// +----+----+-----+
// | x| y|count|
// +----+----+-----+
// | foo|null| 2| <- count where x is fixed to foo
// | bar| 2| 2| <- count where x is fixed to bar and y is fixed to 2
// | foo| 1| 1| ...
// | foo| 2| 1| ...
// |null|null| 4| <- count where no column is fixed
// | bar|null| 2| <- count where x is fixed to bar
// +----+----+-----+

groupBy 和我们熟悉的 SQL 里面的 group by 是一样的。

1
2
3
4
5
6
7
8
9
df.groupBy($"x", $"y").count.show

// +---+---+-----+
// | x| y|count|
// +---+---+-----+
// |foo| 1| 1| <- this is identical to x = foo AND y = 1 in CUBE or ROLLUP
// |foo| 2| 1| <- this is identical to x = foo AND y = 2 in CUBE or ROLLUP
// |bar| 2| 2| <- this is identical to x = bar AND y = 2 in CUBE or ROLLUP
// +---+---+-----+
1
2


1
2


如果你不是非常有钱,请不要捐赠,土豪随意