Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.Partition
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.sql.auron.AuronConvertStrategy.{childOrderingRequiredTag, convertibleTag, convertStrategyTag, convertToNonNativeTag, isNeverConvert, joinSmallerSideTag, neverConvertReasonTag}
import org.apache.spark.sql.auron.NativeConverters.{roundRobinTypeSupported, scalarTypeSupported, StubExpr}
import org.apache.spark.sql.auron.NativeConverters.{existTimestampType, roundRobinTypeSupported, scalarTypeSupported, StubExpr}
import org.apache.spark.sql.auron.util.AuronLogUtils.logDebugPlanConversion
import org.apache.spark.sql.catalyst.expressions.AggregateWindowFunction
import org.apache.spark.sql.catalyst.expressions.Alias
Expand Down Expand Up @@ -135,8 +135,12 @@ object AuronConverters extends Logging {
getBooleanConf("spark.auron.enable.data.writing", defaultValue = false)
def enableScanParquet: Boolean =
getBooleanConf("spark.auron.enable.scan.parquet", defaultValue = true)
def enableScanParquetTimestamp: Boolean =
getBooleanConf("spark.auron.enable.scan.parquet.timestamp", defaultValue = true)
def enableScanOrc: Boolean =
getBooleanConf("spark.auron.enable.scan.orc", defaultValue = true)
def enableScanOrcTimestamp: Boolean =
getBooleanConf("spark.auron.enable.scan.orc.timestamp", defaultValue = true)
def enableBroadcastExchange: Boolean =
getBooleanConf("spark.auron.enable.broadcastExchange", defaultValue = true)
def enableShuffleExechange: Boolean =
Expand Down Expand Up @@ -467,9 +471,25 @@ object AuronConverters extends Logging {
relation.fileFormat match {
case p if p.getClass.getName.endsWith("ParquetFileFormat") =>
assert(enableScanParquet)
if (!enableScanParquetTimestamp) {
assert(
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-sql (default)> set spark.auron.enable.scan.parquet.timestamp=false;
spark.auron.enable.scan.parquet.timestamp	false
Time taken: 0.054 seconds, Fetched 1 row(s)
spark-sql (default)> select * from t3_parquet ;
25/11/27 16:34:36 WARN AuronConverters: Falling back exec: FileSourceScanExec: assertion failed: Parquet scan with timestamp type is not supported for table: `spark_catalog`.`default`.`t3_parquet`. Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support or remove timestamp columns from the query.

.getOrElse("unknown")}. " +
"Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " +
"or remove timestamp columns from the query.")
}
addRenameColumnsExec(Shims.get.createNativeParquetScanExec(exec))
case p if p.getClass.getName.endsWith("OrcFileFormat") =>
assert(enableScanOrc)
if (!enableScanOrcTimestamp) {
assert(
!exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier
.getOrElse("unknown")}. " +
"Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " +
"or remove timestamp columns from the query.")
}
addRenameColumnsExec(Shims.get.createNativeOrcScanExec(exec))
case p =>
throw new NotImplementedError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ object NativeConverters extends Logging {
}
}

def existTimestampType(dataType: DataType): Boolean = {
dataType match {
case TimestampType =>
true
case at: ArrayType => existTimestampType(at.elementType)
case m: MapType =>
existTimestampType(m.keyType) || existTimestampType(m.valueType)
case s: StructType =>
s.fields.exists(e => existTimestampType(e.dataType))
case _ => false
}
}

def roundRobinTypeSupported(dataType: DataType): Boolean = dataType match {
case MapType(_, _, _) => false
case ArrayType(elementType, _) => roundRobinTypeSupported(elementType)
Expand Down
Loading