基本概念
树 TreeNode
解析过程中的各种计划(plan)都是树,树的节点是各种表达式,比如 x + (1 + 2)
由三种表达式(expression)组合:
- Literal(value: Int): 一个常量,比如 1 和 2
- Attribute(name: String): 字段,比如 x
- Add(left: TreeNode, right: TreeNode): 一个加法表达式
x + (1 + 2)
即:Add(Attribute(x), Add(Literal(1), Literal(2)))
注意括号顺序
转为 Spark 的 TreeNode 就是如下的形式:
规则 Rule
规则一般是指对 plan 的转换,比如一个简单的常量替换(ConstantFolding)规则如下(非真实代码):
1 2 3 4 5
| plan.transform { case Add(Literal(c1), Literal(c2)) => Literal(c1+c2) case Add(left, Literal(0)) => left case Add(Literal(0), right) => right }
|
这里的 transform 是对 plan 中节点进行遍历,通过使用 Scala 语法中的模式匹配功能,只对 plan 中匹配到的节点进行优化处理。
Rule 都会多次执行,直到 plan 不再变化,比如 (x+0) + (1+2+3)
需要多次执行上面的 Rule 优化才能完全替换所有常量。
一. Antlr4 语法解析 Parser
通过将 Antlr4 语法文件(Calculator.g4)编译成 java 代码:
- CalculatorBaseListener.java
- CalculatorBaseVisitor.java
- CalculatorLexer.java
- CalculatorListener.java
- CalculatorParser.java
- CalculatorVisitor.java
通过 visitor 模式和 listener 模式对 AST 进行遍历。Demo 示例(非 Spark 源码):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| String expression = "1.8+2*3";
Lexer lexer = new CalculatorLexer(CharStreams.fromString(expression));
TokenStream tokenStream = new CommonTokenStream(lexer);
CalculatorParser parser = new CalculatorParser(tokenStream);
CalculatorParser.ExpressionContext context = parser.expression(); System.out.println(context.toStringTree(parser));
MyVisitor myVisitor = new MyVisitor(); myVisitor.visit(context);
ParseTreeWalker walker = new ParseTreeWalker(); MyListener myListener = new MyListener(); walker.walk(myListener, context);
|
MyListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MyListener extends CalculatorBaseListener { @Override public void enterEquation(CalculatorParser.EquationContext ctx) { super.enterEquation(ctx); } @Override public void exitEquation(CalculatorParser.EquationContext ctx) { super.exitEquation(ctx); } @Override public void enterExpression(CalculatorParser.ExpressionContext ctx) { System.out.println("enterExpression = " + ctx.getText()); super.enterExpression(ctx); } @Override public void exitExpression(CalculatorParser.ExpressionContext ctx) { System.out.println("exitExpression = " + ctx.getText()); super.exitExpression(ctx); } }
|
MyVisitor.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class MyVisitor extends CalculatorBaseVisitor<Double> { @Override public Double visitExpression(CalculatorParser.ExpressionContext ctx) { Double result = super.visitExpression(ctx); System.out.println("visitExpression = " + ctx.getText() + ", return " + result); return result; } @Override public Double visitMultiplyingExpression(CalculatorParser.MultiplyingExpressionContext ctx) { Double result = super.visitMultiplyingExpression(ctx); System.out.println("visitMultiplyingExpression = " + ctx.getText() + ", return " + result); return result; } @Override public Double visitPowExpression(CalculatorParser.PowExpressionContext ctx) { System.out.println("visitPowExpression = " + ctx.getText()); return super.visitPowExpression(ctx); } @Override public Double visitConstant(CalculatorParser.ConstantContext ctx) { return super.visitConstant(ctx); } @Override public Double visit(ParseTree tree) { System.out.println("visit = " + tree.getText()); return super.visit(tree); } @Override public Double visitChildren(RuleNode node) { System.out.println("visitChildren = " + node.getText()); return super.visitChildren(node); } }
|
Spark 经过这一步得到 Unresolved Logical Plan
二. 分析验证 Analyzer
这一步主要是根据表的元数据信息对 Unresolved Logical Plan 中的节点进行匹配,包含一系列的转换规则:
- 根据表名找到元数据信息
- 对列名进行映射,校验是否合法
- 判断列名关系,分配唯一 id
- 判断数据类型
详见:Analyzer.scala,这步之后得到 Logical Plan
三. 优化逻辑计划 Optimizer
这一步对 Logical Plan 进行 RBO 优化,常见的优化包括:常量替换、消除子查询、谓词下推、列剪裁等。详见: Optimizer.scala,得到 Optimized Logical Plan
四. 生成物理计划 SparkPlanner
计算所有可能的物理计划,分别计算其代价,通过 CBO 挑选代价最小的物理计划,每个节点的代价包括如下两部分
该执行节点对数据集的影响
或者说该节点输出数据集的大小与分布,而数据的统计信息依赖 ANALYZE 命令:
1
| ANALYZE TABLE table_name COMPUTE STATISTICS [FOR COLUMNS col1, col2];
|
可以得到表的大小、行数、甚至每个字段的直方图(数据分布),方便后续的代价估计。
该执行节点操作算子的代价
算子代价较为固定,比如 join 算子的代价计算公式:
Cost = rows * weight + size * (1 - weight)
其中:rows 行数代表 CPU 代价,size 代表 IO 代价,即等于如下公式:
Cost = CostCPU * weight + CostIO * (1 - weight)
weight 由参数 spark.sql.cbo.joinReorder.card.weight 控制,默认值为 0.7
详见:SparkStrategies.scala,得到 Selected Physical Plan
五. 代码生成 WSCG
最后一步是将物理计划翻译成 Java 字节码去运行,这中间会生成 Scala 代码去操作 rdd,最终由 Scala 解释器生成字节码。
比如 前面的表达式 翻译成代码:
1 2 3 4 5
| def compile(node: Node): AST = node match { case Literal(value) => q"$value" case Attribute(name) => q"row.get($name)" case Add(left, right) => q"${compile(left)} + ${compile(right)}" }
|
详见:CodeGenerator.scala,至此,完成了整个解析执行流程。
写的比较简单,详细的内容可以参考代码,以及下面的参考资料。
参考资料