diff --git a/pom.xml b/pom.xml index 29c65c9f2..e488a2174 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,7 @@ UTF-8 16.0.0 3.21.9 + 0.9.0 @@ -43,6 +44,11 @@ spark-sql_${scalaVersion} ${sparkVersion} + + org.apache.paimon + paimon-core + ${paimonVersion} + org.apache.arrow arrow-c-data diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml index 0441c165a..83c8c55bb 100644 --- a/spark-extension/pom.xml +++ b/spark-extension/pom.xml @@ -38,6 +38,11 @@ spark-sql_${scalaVersion} provided + + org.apache.paimon + paimon-core + provided + org.apache.arrow arrow-c-data diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala index 596ebcc55..1ef4712a2 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConvertStrategy.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.blaze.plan.BuildSide import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.hive.blaze.BlazeHiveConverters object BlazeConvertStrategy extends Logging { import BlazeConverters._ @@ -123,6 +124,8 @@ object BlazeConvertStrategy extends Logging { e.setTagValue(convertStrategyTag, AlwaysConvert) case e: FileSourceScanExec => e.setTagValue(convertStrategyTag, AlwaysConvert) + case e if BlazeHiveConverters.isNativePaimonTableScan(e) => + e.setTagValue(convertStrategyTag, AlwaysConvert) case e: ProjectExec if isNative(e.child) => e.setTagValue(convertStrategyTag, AlwaysConvert) case e: FilterExec if isNative(e.child) => diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala index ac656d7aa..f21be318b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala @@ -76,12 +76,16 @@ import org.apache.spark.sql.execution.blaze.plan.ConvertToNativeBase import org.apache.spark.sql.execution.blaze.plan.NativeOrcScanBase import org.apache.spark.sql.execution.blaze.plan.NativeParquetScanBase import org.apache.spark.sql.execution.blaze.plan.NativeSortBase +import org.apache.spark.sql.hive.blaze.BlazeHiveConverters import org.apache.spark.sql.hive.execution.InsertIntoHiveTable +import org.apache.spark.sql.hive.execution.blaze.plan.NativeHiveTableScanBase import org.apache.spark.sql.types.LongType object BlazeConverters extends Logging { val enableScan: Boolean = SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan", defaultValue = true) + val enablePaimonScan: Boolean = + SparkEnv.get.conf.getBoolean("spark.blaze.enable.paimon.scan", defaultValue = false) val enableProject: Boolean = SparkEnv.get.conf.getBoolean("spark.blaze.enable.project", defaultValue = true) val enableFilter: Boolean = @@ -152,6 +156,9 @@ object BlazeConverters extends Logging { case e: BroadcastExchangeExec => tryConvert(e, convertBroadcastExchangeExec) case e: FileSourceScanExec if enableScan => // scan tryConvert(e, convertFileSourceScanExec) + case e + if enablePaimonScan && BlazeHiveConverters.isNativePaimonTableScan(e) => // scan paimon + tryConvert(e, BlazeHiveConverters.convertPaimonTableScanExec) case e: ProjectExec if enableProject => // project tryConvert(e, convertProjectExec) case e: FilterExec if enableFilter => // filter @@ -787,7 +794,7 @@ object BlazeConverters extends Logging { } catch { case e @ (_: NotImplementedError | _: AssertionError | _: Exception) => logWarning( - s"Error projecting resultExpressions, failback to non-native projection: " + + s"Error projecting resultExpressions, fallback to non-native projection: " + s"${e.getMessage}") val proj = ProjectExec(exec.resultExpressions, nativeAggr) proj.setTagValue(convertToNonNativeTag, true) @@ -878,7 +885,9 @@ object BlazeConverters extends Logging { return false } plan match { - case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeUnionBase => true + case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeHiveTableScanBase | + _: NativeUnionBase => + true case _: ConvertToNativeBase => needRenameColumns(plan.children.head) case exec if NativeHelper.isNative(exec) => NativeHelper.getUnderlyingNativePlan(exec).output != plan.output diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/BlazeHiveConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/BlazeHiveConverters.scala new file mode 100644 index 000000000..c2d33e43b --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/BlazeHiveConverters.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2022 The Blaze Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.blaze + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.blaze.BlazeConverters.addRenameColumnsExec +import org.apache.spark.sql.blaze.Shims +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.hive.execution.blaze.plan.NativePaimonTableScanExec + +object BlazeHiveConverters extends Logging { + + def isNativePaimonTableScan(exec: SparkPlan): Boolean = { + exec match { + case e: HiveTableScanExec + if e.relation.tableMeta.storage.serde.isDefined + && e.relation.tableMeta.storage.serde.get.contains("Paimon") => + true + case _ => false + } + } + + def convertPaimonTableScanExec(exec: SparkPlan): SparkPlan = { + val hiveExec = exec.asInstanceOf[HiveTableScanExec] + assert( + PaimonUtil.isPaimonCowTable( + PaimonUtil.loadTable(hiveExec.relation.tableMeta.location.toString)), + "paimon MOR/MOW mode is not supported") + val (relation, output, requestedAttributes, partitionPruningPred) = ( + hiveExec.relation, + hiveExec.output, + hiveExec.requestedAttributes, + hiveExec.partitionPruningPred) + logDebug(s"Converting HiveTableScanExec: ${Shims.get.simpleStringWithNodeId(exec)}") + logDebug(s" relation: ${relation.getClass}") + logDebug(s" relation.location: ${relation.tableMeta.location}") + logDebug(s" output: $output") + logDebug(s" requestedAttributes: $requestedAttributes") + logDebug(s" partitionPruningPred: $partitionPruningPred") + + addRenameColumnsExec(NativePaimonTableScanExec(hiveExec)) + } +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/PaimonUtil.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/PaimonUtil.scala new file mode 100644 index 000000000..f0208cc52 --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/blaze/PaimonUtil.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2022 The Blaze Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.blaze + +import java.util.{HashMap => JHashMap} + +import org.apache.paimon.CoreOptions +import org.apache.paimon.catalog.CatalogContext +import org.apache.paimon.options.Options +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.FileStoreTableFactory +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +object PaimonUtil extends Logging { + private val paimonCowOptionKey = "full-compaction.delta-commits" + private val paimonFileFormatOptionKey = "file.format" + val parquetFormat = "parquet" + val orcFormat = "orc" + + def loadTable(path: String): FileStoreTable = { + val parameters = new JHashMap[String, String]() + parameters.put(CoreOptions.PATH.key, path) + val catalogContext = + CatalogContext.create( + Options.fromMap(parameters), + SparkSession.active.sessionState.newHadoopConf()) + FileStoreTableFactory.create(catalogContext) + } + + def isPaimonCowTable(table: FileStoreTable): Boolean = { + // https://paimon.apache.org/docs/master/primary-key-table/table-mode/ + // Paimon COW mode: 'full-compaction.delta-commits' = '1' + table + .options() + .get(paimonCowOptionKey) != null && table.options().get(paimonCowOptionKey).equals("1") + } + + def paimonFileFormat(table: FileStoreTable): String = { + if (table.options().get(paimonFileFormatOptionKey) != null) { + table.options().get(paimonFileFormatOptionKey) + } else { + parquetFormat + } + } +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativeHiveTableScanBase.scala new file mode 100644 index 000000000..b3f26de46 --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativeHiveTableScanBase.scala @@ -0,0 +1,158 @@ +/* + * Copyright 2022 The Blaze Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.blaze.plan + +import scala.collection.immutable.SortedMap +import scala.collection.JavaConverters._ +import scala.collection.Seq + +import org.apache.hadoop.fs.FileSystem +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.blaze.JniBridge +import org.apache.spark.sql.blaze.NativeConverters +import org.apache.spark.sql.blaze.NativeHelper +import org.apache.spark.sql.blaze.NativeSupports +import org.apache.spark.sql.blaze.Shims +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.types.NullType +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration +import org.blaze.{protobuf => pb} + +import java.net.URI +import java.security.PrivilegedExceptionAction + +abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) + extends LeafExecNode + with NativeSupports { + + override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map( + NativeHelper + .getDefaultNativeMetrics(sparkContext) + .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) + .toSeq :+ + ("predicate_evaluation_errors", SQLMetrics + .createMetric(sparkContext, "Native.predicate_evaluation_errors")) :+ + ("row_groups_pruned", SQLMetrics + .createMetric(sparkContext, "Native.row_groups_pruned")) :+ + ("bytes_scanned", SQLMetrics.createSizeMetric(sparkContext, "Native.bytes_scanned")) :+ + ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) :+ + ("io_time_getfs", SQLMetrics + .createNanoTimingMetric(sparkContext, "Native.io_time_getfs")): _*) + + override val output: Seq[Attribute] = basedHiveScan.output + override val outputPartitioning: Partitioning = basedHiveScan.outputPartitioning + + protected val relation: HiveTableRelation = basedHiveScan.relation + protected val partitionSchema: StructType = relation.tableMeta.partitionSchema + protected val tableName: String = relation.tableMeta.identifier.unquotedString + + protected lazy val partitions: Array[FilePartition] = getFilePartitions() + private lazy val fileSizes = partitions + .flatMap(_.files) + .groupBy(_.filePath) + .mapValues(_.map(_.length).sum) + .map(identity) // make this map serializable + + // should not include partition columns + protected def nativeFileSchema: pb.Schema = + NativeConverters.convertSchema(StructType(relation.tableMeta.dataSchema.map { + case field if basedHiveScan.requestedAttributes.exists(_.name == field.name) => + field.copy(nullable = true) + case field => + // avoid converting unsupported type in non-used fields + StructField(field.name, NullType, nullable = true) + })) + + protected def nativePartitionSchema: pb.Schema = + NativeConverters.convertSchema(partitionSchema) + + protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => { + // list input file statuses + val nativePartitionedFile = (file: PartitionedFile) => { + val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => + NativeConverters.convertValue( + file.partitionValues.get(index, field.dataType), + field.dataType) + } + pb.PartitionedFile + .newBuilder() + .setPath(s"${file.filePath}") + .setSize(fileSizes(file.filePath)) + .addAllPartitionValues(nativePartitionValues.asJava) + .setLastModifiedNs(0) + .setRange( + pb.FileRange + .newBuilder() + .setStart(file.start) + .setEnd(file.start + file.length) + .build()) + .build() + } + pb.FileGroup + .newBuilder() + .addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava) + .build() + } + + // check whether native converting is supported + nativeFileSchema + nativePartitionSchema + nativeFileGroups + + protected def putJniBridgeResource( + resourceId: String, + broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = { + val sharedConf = broadcastedHadoopConf.value.value + JniBridge.resourcesMap.put( + resourceId, + (location: String) => { + val getFsTimeMetric = metrics("io_time_getfs") + val currentTimeMillis = System.currentTimeMillis() + val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] { + override def run(): FileSystem = { + FileSystem.get(new URI(location), sharedConf) + } + }) + getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000) + fs + }) + } + + protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = { + val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession + val hadoopConf = + sparkSession.sessionState.newHadoopConfWithOptions(Map.empty) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + } + + def getFilePartitions(): Array[FilePartition] + + override protected def doCanonicalize(): SparkPlan = basedHiveScan.canonicalized + + override def simpleString(maxFields: Int): String = + s"$nodeName (${basedHiveScan.simpleString(maxFields)})" +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativePaimonTableScanExec.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativePaimonTableScanExec.scala new file mode 100644 index 000000000..ec26379d0 --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/blaze/plan/NativePaimonTableScanExec.scala @@ -0,0 +1,241 @@ +/* + * Copyright 2022 The Blaze Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.blaze.plan + +import java.util.UUID +import scala.collection.JavaConverters._ +import scala.collection.Seq + +import org.apache.paimon.io.DataFileMeta +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.source.DataSplit +import org.apache.paimon.utils.RowDataToObjectArrayConverter +import org.apache.spark.Partition +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.blaze.MetricNode +import org.apache.spark.sql.blaze.NativeRDD +import org.apache.spark.sql.blaze.Shims +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.hive.blaze.PaimonUtil +import org.apache.spark.sql.hive.execution.HiveTableScanExec +import org.apache.spark.sql.types.StructType +import org.blaze.{protobuf => pb} + +case class NativePaimonTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + private lazy val table: FileStoreTable = + PaimonUtil.loadTable(relation.tableMeta.location.toString) + private lazy val fileFormat = PaimonUtil.paimonFileFormat(table) + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = MetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + // val nativePruningPredicateFilters = this.nativePruningPredicateFilters + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativePaimonTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + if (fileFormat.equalsIgnoreCase(PaimonUtil.orcFormat)) { + val nativeOrcScanExecBuilder = pb.OrcScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setOrcScan(nativeOrcScanExecBuilder.build()) + .build() + } else { + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + } + }, + friendlyName = "NativeRDD.PaimonScan") + } + + override val nodeName: String = + s"NativePaimonTableScan $tableName" + + override def getFilePartitions(): Array[FilePartition] = { + val currentTimeMillis = System.currentTimeMillis() + val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession + // TODO: Verify paimon cow table without level0 and deleted row in DataSplit and all DataFileMetas are same level + val splits = + table.newScan().plan().splits().asScala.map(split => split.asInstanceOf[DataSplit]) + logInfo( + s"Get paimon table $tableName splits elapse: ${System.currentTimeMillis() - currentTimeMillis} ms") + + val dataSplitPartitions = if (relation.isPartitioned) { + val rowDataToObjectArrayConverter = new RowDataToObjectArrayConverter( + table.schema().logicalPartitionType()) + val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone + if (relation.prunedPartitions.nonEmpty) { + val partitionPathAndValues = + relation.prunedPartitions.get.map { catalogTablePartition => + ( + catalogTablePartition.spec.map { case (k, v) => s"$k=$v" }.mkString("/"), + catalogTablePartition.toRow(partitionSchema, sessionLocalTimeZone)) + }.toMap + val partitionKeys = table.schema().partitionKeys() + // pruning paimon splits + splits + .map { split => + val values = rowDataToObjectArrayConverter.convert(split.partition()) + val partitionPath = values.zipWithIndex + .map { case (v, i) => s"${partitionKeys.get(i)}=${v.toString}" } + .mkString("/") + (split, partitionPathAndValues.getOrElse(partitionPath, null)) + } + .filter(_._2 != null) + } else { + // don't prune partitions + // fork {@link CatalogDatabase#toRow} + def toRow( + values: Seq[String], + partitionSchema: StructType, + defaultTimeZondId: String): InternalRow = { + val caseInsensitiveProperties = CaseInsensitiveMap( + relation.tableMeta.storage.properties) + val timeZoneId = + caseInsensitiveProperties.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) + InternalRow.fromSeq(partitionSchema.zipWithIndex.map { case (field, index) => + val partValue = if (values(index) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { + null + } else { + values(index) + } + Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval() + }) + } + splits.map { split => + val values = rowDataToObjectArrayConverter.convert(split.partition()).map(_.toString) + (split, toRow(values, partitionSchema, sessionLocalTimeZone)) + } + } + } else { + splits.map((_, InternalRow.empty)) + } + logInfo( + s"Table: $tableName, total splits: ${splits.length}, selected splits: ${dataSplitPartitions.length}") + + val isSplitable = + fileFormat.equalsIgnoreCase(PaimonUtil.parquetFormat) || fileFormat.equalsIgnoreCase( + PaimonUtil.orcFormat) + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val maxSplitBytes = getMaxSplitBytes(sparkSession, dataSplitPartitions.map(_._1)) + logInfo( + s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + val partitionedFiles = dataSplitPartitions + .flatMap { partition => + partition._1.dataFiles().asScala.flatMap { dataFileMeta => + val filePath = s"${partition._1.bucketPath()}/${dataFileMeta.fileName()}" + splitFiles(dataFileMeta, filePath, isSplitable, maxSplitBytes, partition._2) + } + } + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + FilePartition.getFilePartitions(sparkSession, partitionedFiles, maxSplitBytes).toArray + } + + // fork {@link PartitionedFileUtil#splitFiles} + private def splitFiles( + dataFileMeta: DataFileMeta, + filePath: String, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + if (isSplitable) { + (0L until dataFileMeta.fileSize() by maxSplitBytes).map { offset => + val remaining = dataFileMeta.fileSize() - offset + val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining + PartitionedFile(partitionValues, filePath, offset, size) + } + } else { + Seq(PartitionedFile(partitionValues, filePath, 0, dataFileMeta.fileSize())) + } + } + + // fork {@link FilePartition#maxSplitBytes} + private def getMaxSplitBytes( + sparkSession: SparkSession, + selectedSplits: Seq[DataSplit]): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum + .getOrElse(sparkSession.sparkContext.defaultParallelism) + val totalBytes = selectedSplits + .flatMap(_.dataFiles().asScala.map(_.fileSize() + openCostInBytes)) + .sum + val bytesPerCore = totalBytes / minPartitionNum + + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + } +}