seq搜索引擎优化至少包括那几步?(spark集群部署大数据JUC面试题集群集群的数据生态体系)

优采云 发布时间: 2022-01-30 09:15

  seq搜索引擎优化至少包括那几步?(spark集群部署大数据JUC面试题集群集群的数据生态体系)

  问题

  当使用这个partition BETWEEN 'start' AND 'end' OR (partition = 'other' AND column 'value') 条件查询spark-sql中的数据时,程序会拉取整个分区中的数据。

  解决方案

  前面我们提到在使用spark-sql读取hive分区表的时候,使用了PredicateHelper中的方法,但是增加了一个新的splitPredicates方法,因为PredicateHelper只有splitConjunctivePredicates和splitDisjunctivePredicates方法。

  protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {

condition match {

case And(cond1, cond2) =>

splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)

case other => other :: Nil

}

}

protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = {

condition match {

case Or(cond1, cond2) =>

splitDisjunctivePredicates(cond1) ++ splitDisjunctivePredicates(cond2)

case other => other :: Nil

}

}

  在 PhysicalOperation 类中,仅对 Filter 进行如下处理:

  

  可以看出,解析Filter语法树时只调用了splitConjunctivePredicates方法,即只处理AND表达式;

  PruneFileSourcePartitions类匹配PhysicalOperation,生成的过滤器就是上面collectProjectsAndFilters中Filter处理的结果;

  private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {

case op @ PhysicalOperation(projects, filters,

logicalRelation @

LogicalRelation(fsRelation @

HadoopFsRelation(catalogFileIndex: CatalogFileIndex, partitionSchema, _, _, _, _), _, _))

if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>

  以下是 PruneFileSourcePartitions 中的原创代码。将这部分代码替换为指定分区数的过滤方法中获取分区表达式的代码即可轻松解决上述问题。

  val sparkSession = fsRelation.sparkSession

val partitionColumns =

logicalRelation.resolve(

partitionSchema, sparkSession.sessionState.analyzer.resolver)

val partitionSet = AttributeSet(partitionColumns)

val partitionKeyFilters =

ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))

  修改后的代码如下:

  val partitionColumns =

logicalRelation.resolve(

partitionSchema,

sparkSession.sessionState.analyzer.resolver)

val partitionSet = AttributeSet(partitionColumns)

val partitionKeyFilters = splitPredicates(normalizedFilters.reduceLeft(And),parti

  大数据与云计算的关系

  大数据技术生态系统

  大数据的切片机制有哪些?

  大数据的Kafka集群部署

  大数据JUC面试题

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线