Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrowVersion>16.0.0</arrowVersion>
<protobufVersion>3.21.9</protobufVersion>
<paimonVersion>0.9.0</paimonVersion>
</properties>

<dependencyManagement>
Expand All @@ -43,6 +44,11 @@
<artifactId>spark-sql_${scalaVersion}</artifactId>
<version>${sparkVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
<version>${paimonVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions spark-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>spark-sql_${scalaVersion}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.blaze.plan.BuildSide
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.hive.blaze.BlazeHiveConverters

object BlazeConvertStrategy extends Logging {
import BlazeConverters._
Expand Down Expand Up @@ -123,6 +124,8 @@ object BlazeConvertStrategy extends Logging {
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: FileSourceScanExec =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e if BlazeHiveConverters.isNativePaimonTableScan(e) =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: ProjectExec if isNative(e.child) =>
e.setTagValue(convertStrategyTag, AlwaysConvert)
case e: FilterExec if isNative(e.child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@ import org.apache.spark.sql.execution.blaze.plan.ConvertToNativeBase
import org.apache.spark.sql.execution.blaze.plan.NativeOrcScanBase
import org.apache.spark.sql.execution.blaze.plan.NativeParquetScanBase
import org.apache.spark.sql.execution.blaze.plan.NativeSortBase
import org.apache.spark.sql.hive.blaze.BlazeHiveConverters
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.execution.blaze.plan.NativeHiveTableScanBase
import org.apache.spark.sql.types.LongType

object BlazeConverters extends Logging {
val enableScan: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.scan", defaultValue = true)
val enablePaimonScan: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.paimon.scan", defaultValue = false)
val enableProject: Boolean =
SparkEnv.get.conf.getBoolean("spark.blaze.enable.project", defaultValue = true)
val enableFilter: Boolean =
Expand Down Expand Up @@ -152,6 +156,9 @@ object BlazeConverters extends Logging {
case e: BroadcastExchangeExec => tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
tryConvert(e, convertFileSourceScanExec)
case e
if enablePaimonScan && BlazeHiveConverters.isNativePaimonTableScan(e) => // scan paimon
tryConvert(e, BlazeHiveConverters.convertPaimonTableScanExec)
case e: ProjectExec if enableProject => // project
tryConvert(e, convertProjectExec)
case e: FilterExec if enableFilter => // filter
Expand Down Expand Up @@ -787,7 +794,7 @@ object BlazeConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(
s"Error projecting resultExpressions, failback to non-native projection: " +
s"Error projecting resultExpressions, fallback to non-native projection: " +
s"${e.getMessage}")
val proj = ProjectExec(exec.resultExpressions, nativeAggr)
proj.setTagValue(convertToNonNativeTag, true)
Expand Down Expand Up @@ -878,7 +885,9 @@ object BlazeConverters extends Logging {
return false
}
plan match {
case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeUnionBase => true
case _: NativeParquetScanBase | _: NativeOrcScanBase | _: NativeHiveTableScanBase |
_: NativeUnionBase =>
true
case _: ConvertToNativeBase => needRenameColumns(plan.children.head)
case exec if NativeHelper.isNative(exec) =>
NativeHelper.getUnderlyingNativePlan(exec).output != plan.output
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.blaze

import org.apache.spark.internal.Logging
import org.apache.spark.sql.blaze.BlazeConverters.addRenameColumnsExec
import org.apache.spark.sql.blaze.Shims
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.execution.blaze.plan.NativePaimonTableScanExec

object BlazeHiveConverters extends Logging {

def isNativePaimonTableScan(exec: SparkPlan): Boolean = {
exec match {
case e: HiveTableScanExec
if e.relation.tableMeta.storage.serde.isDefined
&& e.relation.tableMeta.storage.serde.get.contains("Paimon") =>
true
case _ => false
}
}

def convertPaimonTableScanExec(exec: SparkPlan): SparkPlan = {
val hiveExec = exec.asInstanceOf[HiveTableScanExec]
assert(
PaimonUtil.isPaimonCowTable(
PaimonUtil.loadTable(hiveExec.relation.tableMeta.location.toString)),
"paimon MOR/MOW mode is not supported")
val (relation, output, requestedAttributes, partitionPruningPred) = (
hiveExec.relation,
hiveExec.output,
hiveExec.requestedAttributes,
hiveExec.partitionPruningPred)
logDebug(s"Converting HiveTableScanExec: ${Shims.get.simpleStringWithNodeId(exec)}")
logDebug(s" relation: ${relation.getClass}")
logDebug(s" relation.location: ${relation.tableMeta.location}")
logDebug(s" output: $output")
logDebug(s" requestedAttributes: $requestedAttributes")
logDebug(s" partitionPruningPred: $partitionPruningPred")

addRenameColumnsExec(NativePaimonTableScanExec(hiveExec))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.blaze

import java.util.{HashMap => JHashMap}

import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.options.Options
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.FileStoreTableFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object PaimonUtil extends Logging {
private val paimonCowOptionKey = "full-compaction.delta-commits"
private val paimonFileFormatOptionKey = "file.format"
val parquetFormat = "parquet"
val orcFormat = "orc"

def loadTable(path: String): FileStoreTable = {
val parameters = new JHashMap[String, String]()
parameters.put(CoreOptions.PATH.key, path)
val catalogContext =
CatalogContext.create(
Options.fromMap(parameters),
SparkSession.active.sessionState.newHadoopConf())
FileStoreTableFactory.create(catalogContext)
}

def isPaimonCowTable(table: FileStoreTable): Boolean = {
// https://paimon.apache.org/docs/master/primary-key-table/table-mode/
// Paimon COW mode: 'full-compaction.delta-commits' = '1'
table
.options()
.get(paimonCowOptionKey) != null && table.options().get(paimonCowOptionKey).equals("1")
}

def paimonFileFormat(table: FileStoreTable): String = {
if (table.options().get(paimonFileFormatOptionKey) != null) {
table.options().get(paimonFileFormatOptionKey)
} else {
parquetFormat
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright 2022 The Blaze Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution.blaze.plan

import scala.collection.immutable.SortedMap
import scala.collection.JavaConverters._
import scala.collection.Seq

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.blaze.JniBridge
import org.apache.spark.sql.blaze.NativeConverters
import org.apache.spark.sql.blaze.NativeHelper
import org.apache.spark.sql.blaze.NativeSupports
import org.apache.spark.sql.blaze.Shims
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.blaze.{protobuf => pb}

import java.net.URI
import java.security.PrivilegedExceptionAction

abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec)
extends LeafExecNode
with NativeSupports {

override lazy val metrics: Map[String, SQLMetric] = SortedMap[String, SQLMetric]() ++ Map(
NativeHelper
.getDefaultNativeMetrics(sparkContext)
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq :+
("predicate_evaluation_errors", SQLMetrics
.createMetric(sparkContext, "Native.predicate_evaluation_errors")) :+
("row_groups_pruned", SQLMetrics
.createMetric(sparkContext, "Native.row_groups_pruned")) :+
("bytes_scanned", SQLMetrics.createSizeMetric(sparkContext, "Native.bytes_scanned")) :+
("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) :+
("io_time_getfs", SQLMetrics
.createNanoTimingMetric(sparkContext, "Native.io_time_getfs")): _*)

override val output: Seq[Attribute] = basedHiveScan.output
override val outputPartitioning: Partitioning = basedHiveScan.outputPartitioning

protected val relation: HiveTableRelation = basedHiveScan.relation
protected val partitionSchema: StructType = relation.tableMeta.partitionSchema
protected val tableName: String = relation.tableMeta.identifier.unquotedString

protected lazy val partitions: Array[FilePartition] = getFilePartitions()
private lazy val fileSizes = partitions
.flatMap(_.files)
.groupBy(_.filePath)
.mapValues(_.map(_.length).sum)
.map(identity) // make this map serializable

// should not include partition columns
protected def nativeFileSchema: pb.Schema =
NativeConverters.convertSchema(StructType(relation.tableMeta.dataSchema.map {
case field if basedHiveScan.requestedAttributes.exists(_.name == field.name) =>
field.copy(nullable = true)
case field =>
// avoid converting unsupported type in non-used fields
StructField(field.name, NullType, nullable = true)
}))

protected def nativePartitionSchema: pb.Schema =
NativeConverters.convertSchema(partitionSchema)

protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => {
// list input file statuses
val nativePartitionedFile = (file: PartitionedFile) => {
val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) =>
NativeConverters.convertValue(
file.partitionValues.get(index, field.dataType),
field.dataType)
}
pb.PartitionedFile
.newBuilder()
.setPath(s"${file.filePath}")
.setSize(fileSizes(file.filePath))
.addAllPartitionValues(nativePartitionValues.asJava)
.setLastModifiedNs(0)
.setRange(
pb.FileRange
.newBuilder()
.setStart(file.start)
.setEnd(file.start + file.length)
.build())
.build()
}
pb.FileGroup
.newBuilder()
.addAllFiles(partition.files.map(nativePartitionedFile).toList.asJava)
.build()
}

// check whether native converting is supported
nativeFileSchema
nativePartitionSchema
nativeFileGroups

protected def putJniBridgeResource(
resourceId: String,
broadcastedHadoopConf: Broadcast[SerializableConfiguration]): Unit = {
val sharedConf = broadcastedHadoopConf.value.value
JniBridge.resourcesMap.put(
resourceId,
(location: String) => {
val getFsTimeMetric = metrics("io_time_getfs")
val currentTimeMillis = System.currentTimeMillis()
val fs = NativeHelper.currentUser.doAs(new PrivilegedExceptionAction[FileSystem] {
override def run(): FileSystem = {
FileSystem.get(new URI(location), sharedConf)
}
})
getFsTimeMetric.add((System.currentTimeMillis() - currentTimeMillis) * 1000000)
fs
})
}

protected def broadcastedHadoopConf: Broadcast[SerializableConfiguration] = {
val sparkSession = Shims.get.getSqlContext(basedHiveScan).sparkSession
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(Map.empty)
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
}

def getFilePartitions(): Array[FilePartition]

override protected def doCanonicalize(): SparkPlan = basedHiveScan.canonicalized

override def simpleString(maxFields: Int): String =
s"$nodeName (${basedHiveScan.simpleString(maxFields)})"
}
Loading