1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| val parseFormat = DateTimeFormat.forPattern(tablePartitionKeyFormat) val resolvedExpr = mutable.Set[Expression]()
def pushDown(left: Expression, right: Expression, factory: (Expression, Expression) => BinaryExpression): Expression = { val originPlan = factory(left, right)
def buildExpr(left: Expression, right: Expression, reverse: Boolean = false): Expression = {
def buildExprByRightExpr(rightExpr: Expression): Expression = { rightExpr match { case literal: Literal => val date = try { parseFormat.parseDateTime(literal.value.toString) } catch { case _: Throwable => return originPlan } val partitionValueList = date.toString("yyyy-MM-dd").split("-") val partitionKeyList = partitionReferences.toSeq.sortBy(_.name).reverse val expr1 = Concat(partitionKeyList) val expr2 = Literal(partitionValueList.take(partitionKeyList.size).mkString("")) val (exprLeft, exprRight) = if (reverse) { (expr2, expr1) } else { (expr1, expr2) } val partitionExpr = originPlan match { case GreaterThan(_, _) => GreaterThanOrEqual(exprLeft, exprRight) case LessThan(_, _) => LessThanOrEqual(exprLeft, exprRight) case _ => factory(exprLeft, exprRight) } val expr = And(originPlan, partitionExpr) resolvedExpr.add(originPlan) expr case _ => originPlan } }
left match { case reference: AttributeReference if reference.name == tablePartitionKey => return buildExprByRightExpr(right) case sub: Substring if sub.pos.isInstanceOf[Literal] && sub.pos.asInstanceOf[Literal].value == 1 && sub.len.isInstanceOf[Literal] && sub.len.asInstanceOf[Literal].value == 10 && sub.str.isInstanceOf[AttributeReference] && sub.str.asInstanceOf[AttributeReference].name == tablePartitionKey => return buildExprByRightExpr(right) case _ => } originPlan }
if (partitionReferences.isEmpty || tablePartitionKey.isEmpty || partitionFields.isEmpty || resolvedExpr.contains(originPlan)) { return originPlan }
val expr = buildExpr(left, right) if (!originPlan.fastEquals(expr)) expr else buildExpr(right, left, reverse = true) }
plan transformAllExpressions { case GreaterThan(left, right) => pushDown(left, right, (left: Expression, right: Expression) => GreaterThan(left, right)) case GreaterThanOrEqual(left, right) => pushDown(left, right, (left: Expression, right: Expression) => GreaterThanOrEqual(left, right)) case LessThan(left, right) => pushDown(left, right, (left: Expression, right: Expression) => LessThan(left, right)) case LessThanOrEqual(left, right) => pushDown(left, right, (left: Expression, right: Expression) => LessThanOrEqual(left, right)) case EqualTo(left, right) => pushDown(left, right, (left: Expression, right: Expression) => EqualTo(left, right)) case x => x }
|