Spark SQL 解析流程概述

圆角表示为 Catalyst 部分

基本概念

树 TreeNode

解析过程中的各种计划(plan)都是树,树的节点是各种表达式,比如 x + (1 + 2) 由三种表达式(expression)组合:

  • Literal(value: Int): 一个常量,比如 1 和 2
  • Attribute(name: String): 字段,比如 x
  • Add(left: TreeNode, right: TreeNode): 一个加法表达式

x + (1 + 2) 即:Add(Attribute(x), Add(Literal(1), Literal(2))) 注意括号顺序

转为 Spark 的 TreeNode 就是如下的形式:

TreeNode

规则 Rule

规则一般是指对 plan 的转换,比如一个简单的常量替换(ConstantFolding)规则如下(非真实代码):

1
2
3
4
5
plan.transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
case Add(left, Literal(0)) => left
case Add(Literal(0), right) => right
}

这里的 transform 是对 plan 中节点进行遍历,通过使用 Scala 语法中的模式匹配功能,只对 plan 中匹配到的节点进行优化处理。
Rule 都会多次执行,直到 plan 不再变化,比如 (x+0) + (1+2+3) 需要多次执行上面的 Rule 优化才能完全替换所有常量。

一. Antlr4 语法解析 Parser

通过将 Antlr4 语法文件(Calculator.g4)编译成 java 代码:

  • CalculatorBaseListener.java
  • CalculatorBaseVisitor.java
  • CalculatorLexer.java
  • CalculatorListener.java
  • CalculatorParser.java
  • CalculatorVisitor.java

通过 visitor 模式和 listener 模式对 AST 进行遍历。Demo 示例(非 Spark 源码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
String expression = "1.8+2*3";

// 1. 词法分析器,将字符序列转换为 Token(单词)
Lexer lexer = new CalculatorLexer(CharStreams.fromString(expression));
// 2. 语法分析
TokenStream tokenStream = new CommonTokenStream(lexer);
// 3. 语法解析器,将 Token 序列转换为语法树
CalculatorParser parser = new CalculatorParser(tokenStream);
// 4. 语法树遍历
CalculatorParser.ExpressionContext context = parser.expression();
System.out.println(context.toStringTree(parser));

MyVisitor myVisitor = new MyVisitor();
myVisitor.visit(context);

ParseTreeWalker walker = new ParseTreeWalker();
MyListener myListener = new MyListener();
walker.walk(myListener, context);

MyListener.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyListener extends CalculatorBaseListener {
@Override
public void enterEquation(CalculatorParser.EquationContext ctx) {
super.enterEquation(ctx);
}
@Override
public void exitEquation(CalculatorParser.EquationContext ctx) {
super.exitEquation(ctx);
}
@Override
public void enterExpression(CalculatorParser.ExpressionContext ctx) {
System.out.println("enterExpression = " + ctx.getText());
super.enterExpression(ctx);
}
@Override
public void exitExpression(CalculatorParser.ExpressionContext ctx) {
System.out.println("exitExpression = " + ctx.getText());
super.exitExpression(ctx);
}
// ... 略
}

MyVisitor.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyVisitor extends CalculatorBaseVisitor<Double> {
@Override
public Double visitExpression(CalculatorParser.ExpressionContext ctx) {
Double result = super.visitExpression(ctx);
System.out.println("visitExpression = " + ctx.getText() + ", return " + result);
return result;
}
@Override
public Double visitMultiplyingExpression(CalculatorParser.MultiplyingExpressionContext ctx) {
Double result = super.visitMultiplyingExpression(ctx);
System.out.println("visitMultiplyingExpression = " + ctx.getText() + ", return " + result);
return result;
}
@Override
public Double visitPowExpression(CalculatorParser.PowExpressionContext ctx) {
System.out.println("visitPowExpression = " + ctx.getText());
return super.visitPowExpression(ctx);
}
@Override
public Double visitConstant(CalculatorParser.ConstantContext ctx) {
return super.visitConstant(ctx);
}
@Override
public Double visit(ParseTree tree) {
System.out.println("visit = " + tree.getText());
return super.visit(tree);
}
@Override
public Double visitChildren(RuleNode node) {
System.out.println("visitChildren = " + node.getText());
return super.visitChildren(node);
}
// ... 略
}

Spark 经过这一步得到 Unresolved Logical Plan

二. 分析验证 Analyzer

这一步主要是根据表的元数据信息对 Unresolved Logical Plan 中的节点进行匹配,包含一系列的转换规则:

  • 根据表名找到元数据信息
  • 对列名进行映射,校验是否合法
  • 判断列名关系,分配唯一 id
  • 判断数据类型

详见:Analyzer.scala,这步之后得到 Logical Plan

三. 优化逻辑计划 Optimizer

这一步对 Logical Plan 进行 RBO 优化,常见的优化包括:常量替换、消除子查询、谓词下推、列剪裁等。详见: Optimizer.scala,得到 Optimized Logical Plan

四. 生成物理计划 SparkPlanner

计算所有可能的物理计划,分别计算其代价,通过 CBO 挑选代价最小的物理计划,每个节点的代价包括如下两部分

该执行节点对数据集的影响

或者说该节点输出数据集的大小与分布,而数据的统计信息依赖 ANALYZE 命令:

1
ANALYZE TABLE table_name COMPUTE STATISTICS [FOR COLUMNS col1, col2];

可以得到表的大小、行数、甚至每个字段的直方图(数据分布),方便后续的代价估计。

该执行节点操作算子的代价

算子代价较为固定,比如 join 算子的代价计算公式:

  • Cost = rows * weight + size * (1 - weight)

    其中:rows 行数代表 CPU 代价,size 代表 IO 代价,即等于如下公式:

  • Cost = CostCPU * weight + CostIO * (1 - weight)

weight 由参数 spark.sql.cbo.joinReorder.card.weight 控制,默认值为 0.7

详见:SparkStrategies.scala,得到 Selected Physical Plan

五. 代码生成 WSCG

最后一步是将物理计划翻译成 Java 字节码去运行,这中间会生成 Scala 代码去操作 rdd,最终由 Scala 解释器生成字节码。

比如 前面的表达式 翻译成代码:

1
2
3
4
5
def compile(node: Node): AST = node match {
case Literal(value) => q"$value"
case Attribute(name) => q"row.get($name)"
case Add(left, right) => q"${compile(left)} + ${compile(right)}"
}

详见:CodeGenerator.scala,至此,完成了整个解析执行流程。

写的比较简单,详细的内容可以参考代码,以及下面的参考资料。

参考资料