lazyval optimizedPlan: LogicalPlan = { // We need to materialize the commandExecuted here because optimizedPlan is also tracked under // the optimizing phase assertCommandExecuted() executePhase(QueryPlanningTracker.OPTIMIZATION) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker) // We do not want optimized plans to be re-analyzed as literals that have been constant // folded and such can cause issues during analysis. While `clone` should maintain the // `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of // paranoia. plan.setAnalyzed() plan } }
defexecute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = newPlanChangeLogger[TreeType]() val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get val beforeMetrics = RuleExecutor.getCurrentMetrics()
// Run the structural integrity checker against the initial input if (!isPlanIntegral(plan, plan)) { throwQueryExecutionErrors.structuralIntegrityOfInputPlanIsBrokenInClassError( this.getClass.getName.stripSuffix("$")) }
batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan varcontinue = true
// Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() // 这一行对 plan 执行优化规则,得到 result 是转换之后的 plan val result = rule(plan) val runTime = System.nanoTime() - startTime val effective = !result.fastEquals(plan)
// Record timing information using QueryPlanningTracker tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))
// Run the structural integrity checker against the plan after each rule. if (effective && !isPlanIntegral(plan, result)) { throwQueryExecutionErrors.structuralIntegrityIsBrokenAfterApplyingRuleError( rule.ruleName, batch.name) }
result } iteration += 1 // 结束的标准之一 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val endingMsg = if (batch.strategy.maxIterationsSetting == null) { "." } else { s", please set '${batch.strategy.maxIterationsSetting}' to a larger value." } val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + s"$endingMsg" if (Utils.isTesting || batch.strategy.errorOnExceed) { thrownewRuntimeException(message) } else { logWarning(message) } } // Check idempotence for Once batches. if (batch.strategy == Once && Utils.isTesting && !excludedOnceBatches.contains(batch.name)) { checkBatchIdempotence(batch, curPlan) } continue = false } // 结束的标准之二 if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan }