Skip to content

Commit 59595a8

Browse files
committed
dynamic
1 parent acf5d0e commit 59595a8

1 file changed

Lines changed: 55 additions & 46 deletions

File tree

spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala

Lines changed: 55 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import scala.collection.mutable
2020

2121
import org.apache.commons.lang3.reflect.MethodUtils
2222
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
23-
import org.apache.spark.SparkEnv
2423
import org.apache.spark.broadcast.Broadcast
2524
import org.apache.spark.internal.Logging
2625
import org.apache.spark.sql.blaze.BlazeConvertStrategy.childOrderingRequiredTag
@@ -88,54 +87,54 @@ import org.blaze.protobuf.EmptyPartitionsExecNode
8887
import org.blaze.protobuf.PhysicalPlanNode
8988
import org.apache.spark.sql.catalyst.expressions.WindowExpression
9089
import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition
90+
import org.apache.spark.sql.internal.SQLConf
9191
import org.blaze.sparkver
9292

9393
object BlazeConverters extends Logging {
94-
val enableScan: Boolean =
95-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan", defaultValue = true)
96-
val enablePaimonScan: Boolean =
97-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.paimon.scan", defaultValue = false)
98-
val enableProject: Boolean =
99-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.project", defaultValue = true)
100-
val enableFilter: Boolean =
101-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.filter", defaultValue = true)
102-
val enableSort: Boolean =
103-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.sort", defaultValue = true)
104-
val enableUnion: Boolean =
105-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.union", defaultValue = true)
106-
val enableSmj: Boolean =
107-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.smj", defaultValue = true)
108-
val enableShj: Boolean =
109-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.shj", defaultValue = true)
110-
val enableBhj: Boolean =
111-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.bhj", defaultValue = true)
112-
val enableBnlj: Boolean =
113-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.bnlj", defaultValue = true)
114-
val enableLocalLimit: Boolean =
115-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.local.limit", defaultValue = true)
116-
val enableGlobalLimit: Boolean =
117-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.global.limit", defaultValue = true)
118-
val enableTakeOrderedAndProject: Boolean =
119-
SparkEnv.get.conf
120-
.getBoolean("spark.blaze.enable.take.ordered.and.project", defaultValue = true)
121-
val enableAggr: Boolean =
122-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.aggr", defaultValue = true)
123-
val enableExpand: Boolean =
124-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.expand", defaultValue = true)
125-
val enableWindow: Boolean =
126-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.window", defaultValue = true)
127-
val enableWindowGroupLimit: Boolean =
128-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.window.group.limit", defaultValue = true)
129-
val enableGenerate: Boolean =
130-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.generate", defaultValue = true)
131-
val enableLocalTableScan: Boolean =
132-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.local.table.scan", defaultValue = true)
133-
val enableDataWriting: Boolean =
134-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.data.writing", defaultValue = false)
135-
val enableScanParquet: Boolean =
136-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan.parquet", defaultValue = true)
137-
val enableScanOrc: Boolean =
138-
SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan.orc", defaultValue = true)
94+
def enableScan: Boolean =
95+
getBooleanConf("spark.blaze.enable.scan", defaultValue = true)
96+
def enablePaimonScan: Boolean =
97+
getBooleanConf("spark.blaze.enable.paimon.scan", defaultValue = false)
98+
def enableProject: Boolean =
99+
getBooleanConf("spark.blaze.enable.project", defaultValue = true)
100+
def enableFilter: Boolean =
101+
getBooleanConf("spark.blaze.enable.filter", defaultValue = true)
102+
def enableSort: Boolean =
103+
getBooleanConf("spark.blaze.enable.sort", defaultValue = true)
104+
def enableUnion: Boolean =
105+
getBooleanConf("spark.blaze.enable.union", defaultValue = true)
106+
def enableSmj: Boolean =
107+
getBooleanConf("spark.blaze.enable.smj", defaultValue = true)
108+
def enableShj: Boolean =
109+
getBooleanConf("spark.blaze.enable.shj", defaultValue = true)
110+
def enableBhj: Boolean =
111+
getBooleanConf("spark.blaze.enable.bhj", defaultValue = true)
112+
def enableBnlj: Boolean =
113+
getBooleanConf("spark.blaze.enable.bnlj", defaultValue = true)
114+
def enableLocalLimit: Boolean =
115+
getBooleanConf("spark.blaze.enable.local.limit", defaultValue = true)
116+
def enableGlobalLimit: Boolean =
117+
getBooleanConf("spark.blaze.enable.global.limit", defaultValue = true)
118+
def enableTakeOrderedAndProject: Boolean =
119+
getBooleanConf("spark.blaze.enable.take.ordered.and.project", defaultValue = true)
120+
def enableAggr: Boolean =
121+
getBooleanConf("spark.blaze.enable.aggr", defaultValue = true)
122+
def enableExpand: Boolean =
123+
getBooleanConf("spark.blaze.enable.expand", defaultValue = true)
124+
def enableWindow: Boolean =
125+
getBooleanConf("spark.blaze.enable.window", defaultValue = true)
126+
def enableWindowGroupLimit: Boolean =
127+
getBooleanConf("spark.blaze.enable.window.group.limit", defaultValue = true)
128+
def enableGenerate: Boolean =
129+
getBooleanConf("spark.blaze.enable.generate", defaultValue = true)
130+
def enableLocalTableScan: Boolean =
131+
getBooleanConf("spark.blaze.enable.local.table.scan", defaultValue = true)
132+
def enableDataWriting: Boolean =
133+
getBooleanConf("spark.blaze.enable.data.writing", defaultValue = false)
134+
def enableScanParquet: Boolean =
135+
getBooleanConf("spark.blaze.enable.scan.parquet", defaultValue = true)
136+
def enableScanOrc: Boolean =
137+
getBooleanConf("spark.blaze.enable.scan.orc", defaultValue = true)
139138

140139
import org.apache.spark.sql.catalyst.plans._
141140
import org.apache.spark.sql.catalyst.optimizer._
@@ -1084,6 +1083,16 @@ object BlazeConverters extends Logging {
10841083
projections.map(kv => Alias(kv._1, kv._2.name)(kv._2.exprId)).toList))
10851084
}
10861085

1086+
private def getBooleanConf(key: String, defaultValue: Boolean): Boolean = {
1087+
val s = SQLConf.get.getConfString(key, defaultValue.toString)
1088+
try {
1089+
s.trim.toBoolean
1090+
} catch {
1091+
case _: IllegalArgumentException =>
1092+
throw new IllegalArgumentException(s"$key should be boolean, but was $s")
1093+
}
1094+
}
1095+
10871096
abstract class ForceNativeExecutionWrapperBase(override val child: SparkPlan)
10881097
extends UnaryExecNode
10891098
with NativeSupports {

0 commit comments

Comments
 (0)