diff --git a/pom.xml b/pom.xml
index 29c65c9f2..e488a2174 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,6 +19,7 @@
UTF-8
16.0.0
3.21.9
+ 0.9.0
@@ -43,6 +44,11 @@
spark-sql_${scalaVersion}
${sparkVersion}
+
+ org.apache.paimon
+ paimon-core
+ ${paimonVersion}
+
org.apache.arrow
arrow-c-data
diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml
index 0441c165a..83c8c55bb 100644
--- a/spark-extension/pom.xml
+++ b/spark-extension/pom.xml
@@ -38,6 +38,11 @@
spark-sql_${scalaVersion}
provided
+
+ org.apache.paimon
+ paimon-core
+ provided
+
org.apache.arrow
arrow-c-data
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala
index 596ebcc55..1ef4712a2 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.blaze.plan.BuildSide
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
+import org.apache.spark.sql.hive.blaze.BlazeHiveConverters
object BlazeConvertStrategy extends Logging {
import BlazeConverters._
@@ -123,6 +124,8 @@ object BlazeConvertStrategy extends Logging {
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: FileSourceScanExec =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
+ case e if BlazeHiveConverters.isNativePaimonTableScan(e) =>
+ e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: ProjectExec if isNative(e.child) =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: FilterExec if isNative(e.child) =>
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala
index ac656d7aa..f21be318b 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala
@@ -76,12 +76,16 @@ import org.apache.spark.sql.execution.blaze.plan.ConvertToNativeBase
import org.apache.spark.sql.execution.blaze.plan.NativeOrcScanBase
import org.apache.spark.sql.execution.blaze.plan.NativeParquetScanBase
import org.apache.spark.sql.execution.blaze.plan.NativeSortBase
+import org.apache.spark.sql.hive.blaze.BlazeHiveConverters
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
+import org.apache.spark.sql.hive.execution.blaze.plan.NativeHiveTableScanBase
import org.apache.spark.sql.types.LongType
object BlazeConverters extends Logging {
val enableScan: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan", defaultValue = true)
+ val enablePaimonScan: Boolean =
+ SparkEnv.get.conf.getBoolean("spark.blaze.enable.paimon.scan", defaultValue = false)
val enableProject: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.project", defaultValue = true)
val enableFilter: Boolean =
@@ -152,6 +156,9 @@ object BlazeConverters extends Logging {
case e: BroadcastExchangeExec => tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
tryConvert(e, convertFileSourceScanExec)
+ case e
+ if enablePaimonScan && BlazeHiveConverters.isNativePaimonTableScan(e) => // scan paimon
+ tryConvert(e, BlazeHiveConverters.convertPaimonTableScanExec)
case e: ProjectExec if enableProject => // project
tryConvert(e, convertProjectExec)
case e: FilterExec if enableFilter => // filter
@@ -787,7 +794,7 @@ object BlazeConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(
- s"Error projecting resultExpressions, failback to non-native projection: " +
+ s"Error projecting resultExpressions, fallback to non-native projection: " +
s"${e.getMessage}")
val proj = ProjectExec(exec.resultExpressions, nativeAggr)
proj.setTagValue(convertToNonNativeTag, true)
@@ -878,7 +885,9 @@ object BlazeConverters extends Logging {
return false
}
plan match {
- case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeUnionBase => true
+ case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeHiveTableScanBase |
+ _: NativeUnionBase =>
+ true
case _: ConvertToNativeBase => needRenameColumns(plan.children.head)
case exec if NativeHelper.isNative(exec) =>
NativeHelper.getUnderlyingNativePlan(exec).output != plan.output
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/BlazeHiveConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/BlazeHiveConverters.scala
new file mode 100644
index 000000000..c2d33e43b
--- /dev/null
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/BlazeHiveConverters.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2022 The Blaze Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.blaze
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.blaze.BlazeConverters.addRenameColumnsExec
+import org.apache.spark.sql.blaze.Shims
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.hive.execution.blaze.plan.NativePaimonTableScanExec
+
+object BlazeHiveConverters extends Logging {
+
+ def isNativePaimonTableScan(exec: SparkPlan): Boolean = {
+ exec match {
+ case e: HiveTableScanExec
+ if e.relation.tableMeta.storage.serde.isDefined
+ && e.relation.tableMeta.storage.serde.get.contains("Paimon") =>
+ true
+ case _ => false
+ }
+ }
+
+ def convertPaimonTableScanExec(exec: SparkPlan): SparkPlan = {
+ val hiveExec = exec.asInstanceOf[HiveTableScanExec]
+ assert(
+ PaimonUtil.isPaimonCowTable(
+ PaimonUtil.loadTable(hiveExec.relation.tableMeta.location.toString)),
+ "paimon MOR/MOW mode is not supported")
+ val (relation, output, requestedAttributes, partitionPruningPred) = (
+ hiveExec.relation,
+ hiveExec.output,
+ hiveExec.requestedAttributes,
+ hiveExec.partitionPruningPred)
+ logDebug(s"Converting HiveTableScanExec: ${Shims.get.simpleStringWithNodeId(exec)}")
+ logDebug(s" relation: ${relation.getClass}")
+ logDebug(s" relation.location: ${relation.tableMeta.location}")
+ logDebug(s" output: $output")
+ logDebug(s" requestedAttributes: $requestedAttributes")
+ logDebug(s" partitionPruningPred: $partitionPruningPred")
+
+ addRenameColumnsExec(NativePaimonTableScanExec(hiveExec))
+ }
+}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/PaimonUtil.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/PaimonUtil.scala
new file mode 100644
index 000000000..f0208cc52
--- /dev/null
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/PaimonUtil.scala
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2022 The Blaze Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.blaze
+
+import java.util.{HashMap => JHashMap}
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.catalog.CatalogContext
+import org.apache.paimon.options.Options
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.FileStoreTableFactory
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+object PaimonUtil extends Logging {
+ private val paimonCowOptionKey = "full-compaction.delta-commits"
+ private val paimonFileFormatOptionKey = "file.format"
+ val parquetFormat = "parquet"
+ val orcFormat = "orc"
+
+ def loadTable(path: String): FileStoreTable = {
+ val parameters = new JHashMap[String, String]()
+ parameters.put(CoreOptions.PATH.key, path)
+ val catalogContext =
+ CatalogContext.create(
+ Options.fromMap(parameters),
+ SparkSession.active.sessionState.newHadoopConf())
+ FileStoreTableFactory.create(catalogContext)
+ }
+
+ def isPaimonCowTable(table: FileStoreTable): Boolean = {
+ // https://paimon.apache.org/docs/master/primary-key-table/table-mode/
+ // Paimon COW mode: 'full-compaction.delta-commits' = '1'
+ table
+ .options()
+ .get(paimonCowOptionKey) != null && table.options().get(paimonCowOptionKey).equals("1")
+ }
+
+ def paimonFileFormat(table: FileStoreTable): String = {
+ if (table.options().get(paimonFileFormatOptionKey) != null) {
+ table.options().get(paimonFileFormatOptionKey)
+ } else {
+ parquetFormat
+ }
+ }
+}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativeHiveTableScanBase.scala
new file mode 100644
index 000000000..b3f26de46
--- /dev/null
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativeHiveTableScanBase.scala
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2022 The Blaze Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.blaze.plan
+
+import scala.collection.immutable.SortedMap
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.blaze.JniBridge
+import org.apache.spark.sql.blaze.NativeConverters
+import org.apache.spark.sql.blaze.NativeHelper
+import org.apache.spark.sql.blaze.NativeSupports
+import org.apache.spark.sql.blaze.Shims
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.FilePartition
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.types.NullType
+import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+import org.blaze.{protobuf => pb}
+
+import java.net.URI
+import java.security.PrivilegedExceptionAction
+
+abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec)
+ extends LeafExecNode
+ with NativeSupports {
+
+ override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
+ NativeHelper
+ .getDefaultNativeMetrics(sparkContext)
+ .filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
+ .toSeq :+
+ ("predicate_evaluation_errors", SQLMetrics
+ .createMetric(sparkContext, "Native.predicate_evaluation_errors")) :+
+ ("row_groups_pruned", SQLMetrics
+ .createMetric(sparkContext, "Native.row_groups_pruned")) :+
+ ("bytes_scanned", SQLMetrics.createSizeMetric(sparkContext, "Native.bytes_scanned")) :+
+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) :+
+ ("io_time_getfs", SQLMetrics
+ .createNanoTimingMetric(sparkContext, "Native.io_time_getfs")): _*)
+
+ override val output: Seq[Attribute] = basedHiveScan.output
+ override val outputPartitioning: Partitioning = basedHiveScan.outputPartitioning
+
+ protected val relation: HiveTableRelation = basedHiveScan.relation
+ protected val partitionSchema: StructType = relation.tableMeta.partitionSchema
+ protected val tableName: String = relation.tableMeta.identifier.unquotedString
+
+ protected lazy val partitions: Array[FilePartition] = getFilePartitions()
+ private lazy val fileSizes = partitions
+ .flatMap(_.files)
+ .groupBy(_.filePath)
+ .mapValues(_.map(_.length).sum)
+ .map(identity) // make this map serializable
+
+ // should not include partition columns
+ protected def nativeFileSchema: pb.Schema =
+ NativeConverters.convertSchema(StructType(relation.tableMeta.dataSchema.map {
+ case field if basedHiveScan.requestedAttributes.exists(_.name == field.name) =>
+ field.copy(nullable = true)
+ case field =>
+ // avoid converting unsupported type in non-used fields
+ StructField(field.name, NullType, nullable = true)
+ }))
+
+ protected def nativePartitionSchema: pb.Schema =
+ NativeConverters.convertSchema(partitionSchema)
+
+ protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => {
+ // list input file statuses
+ val nativePartitionedFile = (file: PartitionedFile) => {
+ val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) =>
+ NativeConverters.convertValue(
+ file.partitionValues.get(index, field.dataType),
+ field.dataType)
+ }
+ pb.PartitionedFile
+ .newBuilder()
+ .setPath(s"${file.filePath}")
+ .setSize(fileSizes(file.filePath))
+ .addAllPartitionValues(nativePartitionValues.asJava)
+ .setLastModifiedNs(0)
+ .setRange(
+ pb.FileRange
+ .newBuilder()
+ .setStart(file.start)
+ .setEnd(file.start + file.length)
+ .build())
+ .build()
+ }
+ pb.FileGroup
+ .newBuilder()
+ .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava)
+ .build()
+ }
+
+ // check whether native converting is supported
+ nativeFileSchema
+ nativePartitionSchema
+ nativeFileGroups
+
+ protected def putJniBridgeResource(
+ resourceId: String,
+ broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = {
+ val sharedConf = broadcastedHadoopConf.value.value
+ JniBridge.resourcesMap.put(
+ resourceId,
+ (location: String) => {
+ val getFsTimeMetric = metrics("io_time_getfs")
+ val currentTimeMillis = System.currentTimeMillis()
+ val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] {
+ override def run(): FileSystem = {
+ FileSystem.get(new URI(location), sharedConf)
+ }
+ })
+ getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)
+ fs
+ })
+ }
+
+ protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = {
+ val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
+ val hadoopConf =
+ sparkSession.sessionState.newHadoopConfWithOptions(Map.empty)
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+ }
+
+ def getFilePartitions(): Array[FilePartition]
+
+ override protected def doCanonicalize(): SparkPlan = basedHiveScan.canonicalized
+
+ override def simpleString(maxFields: Int): String =
+ s"$nodeName (${basedHiveScan.simpleString(maxFields)})"
+}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativePaimonTableScanExec.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativePaimonTableScanExec.scala
new file mode 100644
index 000000000..ec26379d0
--- /dev/null
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativePaimonTableScanExec.scala
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2022 The Blaze Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution.blaze.plan
+
+import java.util.UUID
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
+import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.utils.RowDataToObjectArrayConverter
+import org.apache.spark.Partition
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.blaze.MetricNode
+import org.apache.spark.sql.blaze.NativeRDD
+import org.apache.spark.sql.blaze.Shims
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Cast
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.FilePartition
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hive.blaze.PaimonUtil
+import org.apache.spark.sql.hive.execution.HiveTableScanExec
+import org.apache.spark.sql.types.StructType
+import org.blaze.{protobuf => pb}
+
+case class NativePaimonTableScanExec(basedHiveScan: HiveTableScanExec)
+ extends NativeHiveTableScanBase(basedHiveScan)
+ with Logging {
+
+ private lazy val table: FileStoreTable =
+ PaimonUtil.loadTable(relation.tableMeta.location.toString)
+ private lazy val fileFormat = PaimonUtil.paimonFileFormat(table)
+
+ override def doExecuteNative(): NativeRDD = {
+ val nativeMetrics = MetricNode(
+ metrics,
+ Nil,
+ Some({
+ case ("bytes_scanned", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incBytesRead(v)
+ case ("output_rows", v) =>
+ val inputMetric = TaskContext.get.taskMetrics().inputMetrics
+ inputMetric.incRecordsRead(v)
+ case _ =>
+ }))
+ // val nativePruningPredicateFilters = this.nativePruningPredicateFilters
+ val nativeFileSchema = this.nativeFileSchema
+ val nativeFileGroups = this.nativeFileGroups
+ val nativePartitionSchema = this.nativePartitionSchema
+
+ val projection = schema.map(field => relation.schema.fieldIndex(field.name))
+ val broadcastedHadoopConf = this.broadcastedHadoopConf
+ val numPartitions = partitions.length
+
+ new NativeRDD(
+ sparkContext,
+ nativeMetrics,
+ partitions.asInstanceOf[Array[Partition]],
+ Nil,
+ rddShuffleReadFull = true,
+ (partition, _) => {
+ val resourceId = s"NativePaimonTableScan:${UUID.randomUUID().toString}"
+ putJniBridgeResource(resourceId, broadcastedHadoopConf)
+
+ val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition])
+ val nativeFileScanConf = pb.FileScanExecConf
+ .newBuilder()
+ .setNumPartitions(numPartitions)
+ .setPartitionIndex(partition.index)
+ .setStatistics(pb.Statistics.getDefaultInstance)
+ .setSchema(nativeFileSchema)
+ .setFileGroup(nativeFileGroup)
+ .addAllProjection(projection.map(Integer.valueOf).asJava)
+ .setPartitionSchema(nativePartitionSchema)
+ .build()
+ if (fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)) {
+ val nativeOrcScanExecBuilder = pb.OrcScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setOrcScan(nativeOrcScanExecBuilder.build())
+ .build()
+ } else {
+ val nativeParquetScanExecBuilder = pb.ParquetScanExecNode
+ .newBuilder()
+ .setBaseConf(nativeFileScanConf)
+ .setFsResourceId(resourceId)
+ .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter
+
+ pb.PhysicalPlanNode
+ .newBuilder()
+ .setParquetScan(nativeParquetScanExecBuilder.build())
+ .build()
+ }
+ },
+ friendlyName = "NativeRDD.PaimonScan")
+ }
+
+ override val nodeName: String =
+ s"NativePaimonTableScan $tableName"
+
+ override def getFilePartitions(): Array[FilePartition] = {
+ val currentTimeMillis = System.currentTimeMillis()
+ val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
+ // TODO: Verify paimon cow table without level0 and deleted row in DataSplit and all DataFileMetas are same level
+ val splits =
+ table.newScan().plan().splits().asScala.map(split => split.asInstanceOf[DataSplit])
+ logInfo(
+ s"Get paimon table $tableName splits elapse: ${System.currentTimeMillis() - currentTimeMillis} ms")
+
+ val dataSplitPartitions = if (relation.isPartitioned) {
+ val rowDataToObjectArrayConverter = new RowDataToObjectArrayConverter(
+ table.schema().logicalPartitionType())
+ val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone
+ if (relation.prunedPartitions.nonEmpty) {
+ val partitionPathAndValues =
+ relation.prunedPartitions.get.map { catalogTablePartition =>
+ (
+ catalogTablePartition.spec.map { case (k, v) => s"$k=$v" }.mkString("/"),
+ catalogTablePartition.toRow(partitionSchema, sessionLocalTimeZone))
+ }.toMap
+ val partitionKeys = table.schema().partitionKeys()
+ // pruning paimon splits
+ splits
+ .map { split =>
+ val values = rowDataToObjectArrayConverter.convert(split.partition())
+ val partitionPath = values.zipWithIndex
+ .map { case (v, i) => s"${partitionKeys.get(i)}=${v.toString}" }
+ .mkString("/")
+ (split, partitionPathAndValues.getOrElse(partitionPath, null))
+ }
+ .filter(_._2 != null)
+ } else {
+ // don't prune partitions
+ // fork {@link CatalogDatabase#toRow}
+ def toRow(
+ values: Seq[String],
+ partitionSchema: StructType,
+ defaultTimeZondId: String): InternalRow = {
+ val caseInsensitiveProperties = CaseInsensitiveMap(
+ relation.tableMeta.storage.properties)
+ val timeZoneId =
+ caseInsensitiveProperties.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
+ InternalRow.fromSeq(partitionSchema.zipWithIndex.map { case (field, index) =>
+ val partValue = if (values(index) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
+ null
+ } else {
+ values(index)
+ }
+ Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
+ })
+ }
+ splits.map { split =>
+ val values = rowDataToObjectArrayConverter.convert(split.partition()).map(_.toString)
+ (split, toRow(values, partitionSchema, sessionLocalTimeZone))
+ }
+ }
+ } else {
+ splits.map((_, InternalRow.empty))
+ }
+ logInfo(
+ s"Table: $tableName, total splits: ${splits.length}, selected splits: ${dataSplitPartitions.length}")
+
+ val isSplitable =
+ fileFormat.equalsIgnoreCase(PaimonUtil.parquetFormat) || fileFormat.equalsIgnoreCase(
+ PaimonUtil.orcFormat)
+ val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+ val maxSplitBytes = getMaxSplitBytes(sparkSession, dataSplitPartitions.map(_._1))
+ logInfo(
+ s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
+ s"open cost is considered as scanning $openCostInBytes bytes.")
+ val partitionedFiles = dataSplitPartitions
+ .flatMap { partition =>
+ partition._1.dataFiles().asScala.flatMap { dataFileMeta =>
+ val filePath = s"${partition._1.bucketPath()}/${dataFileMeta.fileName()}"
+ splitFiles(dataFileMeta, filePath, isSplitable, maxSplitBytes, partition._2)
+ }
+ }
+ .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray
+ }
+
+ // fork {@link PartitionedFileUtil#splitFiles}
+ private def splitFiles(
+ dataFileMeta: DataFileMeta,
+ filePath: String,
+ isSplitable: Boolean,
+ maxSplitBytes: Long,
+ partitionValues: InternalRow): Seq[PartitionedFile] = {
+ if (isSplitable) {
+ (0L until dataFileMeta.fileSize() by maxSplitBytes).map { offset =>
+ val remaining = dataFileMeta.fileSize() - offset
+ val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
+ PartitionedFile(partitionValues, filePath, offset, size)
+ }
+ } else {
+ Seq(PartitionedFile(partitionValues, filePath, 0, dataFileMeta.fileSize()))
+ }
+ }
+
+ // fork {@link FilePartition#maxSplitBytes}
+ private def getMaxSplitBytes(
+ sparkSession: SparkSession,
+ selectedSplits: Seq[DataSplit]): Long = {
+ val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
+ val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
+ val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
+ .getOrElse(sparkSession.sparkContext.defaultParallelism)
+ val totalBytes = selectedSplits
+ .flatMap(_.dataFiles().asScala.map(_.fileSize() + openCostInBytes))
+ .sum
+ val bytesPerCore = totalBytes / minPartitionNum
+
+ Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ }
+}