Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ object AuronConvertStrategy extends Logging {
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("auron.convertible")
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("auron.convertToNonNative")
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("auron.convert.strategy")
val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason")
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag(
"auron.child.ordering.required")
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("auron.join.smallerSide")
Expand All @@ -79,6 +80,9 @@ object AuronConvertStrategy extends Logging {
case _ =>
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(
neverConvertReasonTag,
s"${exec.getClass.getSimpleName} is not supported yet.")
}
danglingChildren = newDangling :+ converted
}
Expand Down Expand Up @@ -190,6 +194,9 @@ object AuronConvertStrategy extends Logging {
case e =>
// not marked -- default to NeverConvert
e.setTagValue(convertStrategyTag, NeverConvert)
e.setTagValue(
neverConvertReasonTag,
s"${exec.getClass.getSimpleName} not marked, default to NeverConvert.")
}
}

Expand All @@ -206,9 +213,10 @@ object AuronConvertStrategy extends Logging {

while (!finished) {
finished = true
val dontConvertIf = (exec: SparkPlan, condition: Boolean) => {
val dontConvertIf = (exec: SparkPlan, condition: Boolean, neverConvertReason: String) => {
if (condition) {
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(neverConvertReasonTag, neverConvertReason)
finished = false
}
}
Expand All @@ -218,28 +226,41 @@ object AuronConvertStrategy extends Logging {
// don't use NativeFilter because it requires ConvertToNative with a lot of records
if (!isNeverConvert(e) && e.isInstanceOf[FilterExec]) {
val child = e.children.head
dontConvertIf(e, isNeverConvert(child))
dontConvertIf(
e,
isNeverConvert(child),
s"${e.getClass.getSimpleName}, children is not native.")
}

// NonNative -> NativeAgg
// don't use NativeAgg because it requires ConvertToNative with a lot of records
if (!isNeverConvert(e) && isAggregate(e)) {
val child = e.children.head
dontConvertIf(e, isNeverConvert(child))
dontConvertIf(
e,
isNeverConvert(child),
s"${e.getClass.getSimpleName}, children is not native.")
}

// Agg -> NativeShuffle
// don't use NativeShuffle because the next stage is like to use non-native shuffle reader
if (!isNeverConvert(e) && e.isInstanceOf[ShuffleExchangeLike]) {
val child = e.children.head
dontConvertIf(e, isAggregate(child) && isNeverConvert(child))
dontConvertIf(
e,
isAggregate(child) && isNeverConvert(child),
s"${e.getClass.getSimpleName}, children is not native and children is agg.")
}

// NativeExpand -> NonNative
// don't use NativeExpand because it requires C2R with a lot of records
if (isNeverConvert(e)) {
e.children.find(_.isInstanceOf[ExpandExec]) match {
case Some(expand) => dontConvertIf(expand, !isNeverConvert(expand))
case Some(expand) =>
dontConvertIf(
expand,
!isNeverConvert(expand),
s"${e.getClass.getSimpleName}, children is nativeExpand.")
case _ =>
}
}
Expand All @@ -248,7 +269,11 @@ object AuronConvertStrategy extends Logging {
// don't use NativeParquetScan because it requires C2R with a lot of records
if (isNeverConvert(e)) {
e.children.find(_.isInstanceOf[FileSourceScanExec]) match {
case Some(scan) => dontConvertIf(scan, !isNeverConvert(scan))
case Some(scan) =>
dontConvertIf(
scan,
!isNeverConvert(scan),
s"${e.getClass.getSimpleName}, children is nativeParquetScan.")
case _ =>
}
}
Expand All @@ -257,7 +282,10 @@ object AuronConvertStrategy extends Logging {
// don't use native sort
if (isNeverConvert(e)) {
e.children.filter(_.isInstanceOf[SortExec]).foreach { sort =>
dontConvertIf(sort, !isNeverConvert(sort) && isNeverConvert(sort.children.head))
dontConvertIf(
sort,
!isNeverConvert(sort) && isNeverConvert(sort.children.head),
s"${e.getClass.getSimpleName}, children and parent both are not native.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
import org.apache.spark.Partition
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.AuronConvertStrategy.childOrderingRequiredTag
import org.apache.spark.sql.auron.AuronConvertStrategy.convertibleTag
import org.apache.spark.sql.auron.AuronConvertStrategy.convertStrategyTag
import org.apache.spark.sql.auron.AuronConvertStrategy.convertToNonNativeTag
import org.apache.spark.sql.auron.AuronConvertStrategy.isNeverConvert
import org.apache.spark.sql.auron.AuronConvertStrategy.joinSmallerSideTag
import org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag, convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert, joinSmallerSideTag, neverConvertReasonTag}
import org.apache.spark.sql.auron.NativeConverters.{roundRobinTypeSupported, scalarTypeSupported, StubExpr}
import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion
import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction
Expand Down Expand Up @@ -265,16 +260,70 @@ object AuronConverters extends Logging {
if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
} else {
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
addNeverConvertReasonTag(exec)
}
exec
}
}
}
}

private def addNeverConvertReasonTag(exec: SparkPlan) = {
val neverConvertReason =
exec match {
case _: FileSourceScanExec if !enableScan =>
"Conversion disabled: spark.auron.enable.scan=false."
case _: ProjectExec if !enableProject =>
"Conversion disabled: spark.auron.enable.project=false."
case _: FilterExec if !enableFilter =>
"Conversion disabled: spark.auron.enable.filter=false."
case _: SortExec if !enableSort =>
"Conversion disabled: spark.auron.enable.sort=false."
case _: UnionExec if !enableUnion =>
"Conversion disabled: spark.auron.enable.union=false."
case _: SortMergeJoinExec if !enableSmj =>
"Conversion disabled: spark.auron.enable.smj=false."
case _: ShuffledHashJoinExec if !enableShj =>
"Conversion disabled: spark.auron.enable.shj=false."
case _: BroadcastHashJoinExec if !enableBhj =>
"Conversion disabled: spark.auron.enable.bhj=false."
case _: BroadcastNestedLoopJoinExec if !enableBnlj =>
"Conversion disabled: spark.auron.enable.bnlj=false."
case _: LocalLimitExec if !enableLocalLimit =>
"Conversion disabled: spark.auron.enable.local.limit=false."
case _: GlobalLimitExec if !enableGlobalLimit =>
"Conversion disabled: spark.auron.enable.global.limit=false."
case _: TakeOrderedAndProjectExec if !enableTakeOrderedAndProject =>
"Conversion disabled: spark.auron.enable.take.ordered.and.project=false."
case _: HashAggregateExec if !enableAggr =>
"Conversion disabled: spark.auron.enable.aggr=false."
case _: ObjectHashAggregateExec if !enableAggr =>
"Conversion disabled: spark.auron.enable.aggr=false."
case _: SortAggregateExec if !enableAggr =>
"Conversion disabled: spark.auron.enable.aggr=false."
case _: ExpandExec if !enableExpand =>
"Conversion disabled: spark.auron.enable.expand=false."
case _: WindowExec if !enableWindow =>
"Conversion disabled: spark.auron.enable.window=false."
case _: UnaryExecNode
if exec.getClass.getSimpleName == "WindowGroupLimitExec" && !enableWindowGroupLimit =>
"Conversion disabled: spark.auron.enable.window.group.limit=false."
case _: GenerateExec if !enableGenerate =>
"Conversion disabled: spark.auron.enable.generate=false."
case _: LocalTableScanExec if !enableLocalTableScan =>
"Conversion disabled: spark.auron.enable.local.table.scan=false."
case _: DataWritingCommandExec if !enableDataWriting =>
"Conversion disabled: spark.auron.enable.data.writing=false."
case _ =>
s"${exec.getClass.getSimpleName} is not supported yet."
}
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(neverConvertReasonTag, neverConvertReason)
exec
}

def tryConvert[T <: SparkPlan](exec: T, convert: T => SparkPlan): SparkPlan = {
try {
exec.setTagValue(convertibleTag, true)
Expand All @@ -283,8 +332,26 @@ object AuronConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}")
val neverConvertReason = e match {
case _: AssertionError =>
exec match {
case _: FileSourceScanExec if enableScan =>
if (!enableScanParquet) {
"Conversion disabled: spark.auron.enable.scan.parquet=false."
} else if (!enableScanOrc) {
"Conversion disabled: spark.auron.enable.scan.orc=false."
} else {
s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}"
}
case _ =>
s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}"
}
case _ =>
s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}"
}
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(neverConvertReasonTag, neverConvertReason)
exec
}
}
Expand Down