zhangnew

认真你就赢了

书接上回,如何不修改 Spark 源码对 Spark Catalyst 引擎扩展 中介绍了如何自定义 Spark Catalyst 引擎,这次再来一个实践中的场景:在数据量很大的接入任务中,对表进行分区,此分区对用户隐藏,当用户根据指定的时间字段过滤查询的时候,自动加上分区过滤条件。

题图 by DALL·E 3

概述

这里优化的一个前提是用户在配置接入任务过程中,需要标记一个时间字段,用于分区存储优化,然后再接入过程中对此字段进行解析,保存到分区 year=yyyy/month=MM/day=dd 中。

比如时间字段是 ctime,用户 SQL 是 where ctime > '2022-11-30 12:00:00' 进过我们的优化规则之后,会变成 where ctime > '2022-11-30 12:00:00' AND concat(year, month, day) >= '20221130';甚至用户可以使用 substring 函数,比如 where substring(ctime, 1, 10) >= '2021-12-12' 也可以优化为 where substring(ctime, 1, 10) >= '2021-12-12' AND concat(year, month, day) >= '20211212',当然这里其实不是修改 SQL,而是修改的逻辑执行计划。

一个测试 SQL:select * from test where dt > '2022-11-30 00:00:00'

未经优化的逻辑计划:

1
2
3
Project [id#0, dt#1, year#2, month#3, day#4]
+- Filter (isnotnull(dt#1) AND (dt#1 > 2022-11-30 00:00:00))
+- FileScan parquet xx.test[id#0,dt#1,year#2,month#3,day#4] Batched: true, DataFilters: [isnotnull(dt#1), (dt#1 > 2022-11-30 00:00:00)], Format: Parquet, Location: CatalogFileIndex(1 paths)[hdfs://xx.db/test], PartitionFilters: [], PushedFilters: [IsNotNull(dt), GreaterThan(dt,2022-11-30 00:00:00)], ReadSchema: struct<id:string,dt:string>

优化后:PartitionFilters 部分正是我们添加的分区过滤条件

1
2
3
Project [id#0, dt#1, year#2, month#3, day#4]
+- Filter (isnotnull(dt#1) AND (dt#1 > 2022-11-30 00:00:00))
+- FileScan parquet xx.test[id#0,dt#1,year#2,month#3,day#4] Batched: true, DataFilters: [isnotnull(dt#1), (dt#1 > 2022-11-30 00:00:00)], Format: Parquet, Location: InMemoryFileIndex(2 paths)[hdfs://xx.db/test/year=2022/month=12/day..., PartitionFilters: [(concat(year#2, month#3, day#4) >= 20221130)], PushedFilters: [IsNotNull(dt), GreaterThan(dt,2022-11-30 00:00:00)], ReadSchema: struct<id:string,dt:string>
阅读全文 »

太长不看版:Spark 3.2 之后为了兼容 ANSI SQL 标准,修改了 Interval 的数据类型(CalendarIntervalType → YearMonthIntervalType),代码中只写了 String → CalendarIntervalType的隐式转换规则,这里新版需要配置 spark.sql.legacy.interval.enabled=true 来让 Interval 使用旧的数据类型(CalendarIntervalType)。

题图 by DALL·E 3

问题描述

最近将项目从 Spark 2.4 升级到 Spark 3.2.2 之后发现一个 SQL 报错:

1
2
3
4
scala> spark.sql("select '2022-11-12 23:33:55' - INTERVAL 3 YEAR")
org.apache.spark.sql.AnalysisException: cannot resolve '(CAST('2022-11-12 23:33:55' AS DOUBLE) - INTERVAL '3' YEAR)' due to data type mismatch: differing types in '(CAST('2022-11-12 23:33:55' AS DOUBLE) - INTERVAL '3' YEAR)' (double and interval year).; line 1 pos 7;
'Project [unresolvedalias((cast(2022-11-12 23:33:55 as double) - INTERVAL '3' YEAR), None)]
+- OneRowRelation

而在 Spark 2.4 中可以正常执行:

1
2
3
4
5
6
scala> spark.sql("select '2022-11-12 23:33:55' - INTERVAL 3 YEAR").show
+-------------------------------------------------------------------------+
|CAST(CAST(2022-11-12 23:33:55 AS TIMESTAMP) - interval 3 years AS STRING)|
+-------------------------------------------------------------------------+
| 2019-11-12 23:33:55|
+-------------------------------------------------------------------------+

这里可以看到,Spark 2.4 中 StringInterval 类型做运算时,自动将 String 类型转为了 Timestamp 类型,而在 Spark 3.2 中却把 String 类型转为了 Double 类型,导致计算报错。

阅读全文 »

根据 Spark 官方文档,表名是大小写不敏感的,大写会自动转小写:

An identifier is a string used to identify a database object such as a table, view, schema, column, etc. Spark SQL has regular identifiers and delimited identifiers, which are enclosed within backticks. Both regular identifiers and delimited identifiers are case-insensitive.

问题描述

Spark 1.4 之后新增了字段名大小写敏感的开关,经测试,开启之后会导致一些问题;

cannot resolve ‘Tb.xx’ given input columns

开启大小写敏感 spark.sql.caseSensitive=true

如果表名包含大写字母,比如 Tb1 然后执行 select Tb1.xx 就会报错,因为 Tb1 表名会被转为 tb1,Spark 的元数据中只有 tb1.xx 字段。

阅读全文 »

书接上回,上篇文章 概述了一下 Spark SQL 源码中的基本概念和解析流程,这篇文章介绍一种扩展方法。

圆角表示为 Catalyst 部分

从 Spark 2.2 之后,Spark 支持扩展 Catalyst 引擎。扩展点如下表:

Stage Extension description
Parser injectParser 负责 SQL 解析
Analyzer injectResolutionRule
injectPostHocResolutionRule
injectCheckRule
负责逻辑执行计划生成,catalog 绑定,以及进行各种检查
Optimizer injectOptimizerRule 负责逻辑执行计划的优化
Planner injectPlannerStrategy 负责物理执行计划的生成

在 Spark 3.x 之后,又额外提供了一些其他扩展点:

  • e.injectColumnar:底层读写文件相关
  • e.injectFunction:增加内置函数
  • e.injectQueryStagePrepRule:优化 AQE

注意这里只能新增扩展,无法修改已有的规则,实在不行的还得去改 Spark 源码。

阅读全文 »

圆角表示为 Catalyst 部分

基本概念

树 TreeNode

解析过程中的各种计划(plan)都是树,树的节点是各种表达式,比如 x + (1 + 2) 由三种表达式(expression)组合:

  • Literal(value: Int): 一个常量,比如 1 和 2
  • Attribute(name: String): 字段,比如 x
  • Add(left: TreeNode, right: TreeNode): 一个加法表达式

x + (1 + 2) 即:Add(Attribute(x), Add(Literal(1), Literal(2))) 注意括号顺序

转为 Spark 的 TreeNode 就是如下的形式:

TreeNode

规则 Rule

规则一般是指对 plan 的转换,比如一个简单的常量替换(ConstantFolding)规则如下(非真实代码):

1
2
3
4
5
plan.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
case Add(left, Literal(0)) => left
case Add(Literal(0), right) => right
}

这里的 transform 是对 plan 中节点进行遍历,通过使用 Scala 语法中的模式匹配功能,只对 plan 中匹配到的节点进行优化处理。
Rule 都会多次执行,直到 plan 不再变化,比如 (x+0) + (1+2+3) 需要多次执行上面的 Rule 优化才能完全替换所有常量。

阅读全文 »

默认情况下,直接运行 Hadoop 命令会出现如下警告:

1
2
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable

当然这个警告信息没有什么实质的影响,一般也不会在本地电脑运行计算任务,消除警告只是强迫症发作罢了。
这里提供了编译好的二进制文件,但是版本不全,需要手动编译的参考下面的内容。
另有 Intel CPU 版本的二进制文件。我上面仓库中编译的 ARM 版本也 merge 到此仓库了。
只需要将对应版本 hadoop-x.x.x/lib/native 下的文件,替换到本地 ${HADOOP_HOME}/lib/native 中即可,不需要重启集群。

另外还有一个更简单的办法,修改日志等级,就不会打印这条警告日志了,只需要在 ${HADOOP_HOME}/etc/hadoop/log4j.properties 里面加上下面这一行即可:

1
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
阅读全文 »

之前很长一段时间都是使用 Isso 作为评论系统,最近发现了一个 Valine 的替代品:Waline,更加符合需求,花了一个周末迁移了一下,写了个脚本,记录一下。

先说下 Waline 的特性,也是比较吸引我的点:

  • 支持 Markdown、表情、公式
  • 支持内容校验、防灌水
  • 允许匿名、也支持登录
  • 可以完全自托管独立部署
  • 支持浏览量统计
  • 多种通知方式
阅读全文 »