Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public enum BlazeConf {
/// partial aggregate skipping ratio
PARTIAL_AGG_SKIPPING_RATIO("spark.blaze.partialAggSkipping.ratio", 0.9),

/// mininum number of rows to trigger partial aggregate skipping
/// minimum number of rows to trigger partial aggregate skipping
PARTIAL_AGG_SKIPPING_MIN_ROWS("spark.blaze.partialAggSkipping.minRows", BATCH_SIZE.intConf() * 5),

/// always skip partial aggregate when triggered spilling
Expand All @@ -69,7 +69,7 @@ public enum BlazeConf {
// parquet enable page filtering
PARQUET_ENABLE_PAGE_FILTERING("spark.blaze.parquet.enable.pageFiltering", false),

// parqeut enable bloom filter
// parquet enable bloom filter
PARQUET_ENABLE_BLOOM_FILTER("spark.blaze.parquet.enable.bloomFilter", false),

// spark io compression codec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ object BlazeConverters extends Logging {
assert(NativeHelper.isNative(left), "broadcast join build side is not native")
}

// reuse NativeBroadcastJoin with empty equility keys
// reuse NativeBroadcastJoin with empty equality keys
Shims.get.createNativeBroadcastJoinExec(
addRenameColumnsExec(convertToNative(left)),
addRenameColumnsExec(convertToNative(right)),
Expand Down Expand Up @@ -738,7 +738,7 @@ object BlazeConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(
s"Error projecting resultExpressions, failback to non-native projection: " +
s"Error projecting resultExpressions, fallback to non-native projection: " +
s"${e.getMessage}")
val proj = ProjectExec(exec.resultExpressions, nativeAggr)
proj.setTagValue(convertToNonNativeTag, true)
Expand Down Expand Up @@ -795,7 +795,7 @@ object BlazeConverters extends Logging {
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
logWarning(
s"Error projecting resultExpressions, failback to non-native projection: " +
s"Error projecting resultExpressions, fallback to non-native projection: " +
s"${e.getMessage}")
val proj = ProjectExec(exec.resultExpressions, nativeAggr)
proj.setTagValue(convertToNonNativeTag, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ case class SparkUDAFWrapperContext[B](serialized: ByteBuffer) extends Logging {
ArrowArray.wrap(importBatchFFIArrayPtr)) { (inputRoot, inputArray) =>
// import into params root
Data.importIntoVectorSchemaRoot(ROOT_ALLOCATOR, inputArray, inputRoot, dictionaryProvider)
val inputRow = ColumnarHelper.rootRowReuseable(inputRoot)
val inputRow = ColumnarHelper.rootRowReusable(inputRoot)

for (zippedIdx <- zippedIndices) {
val rowIdx = ((zippedIdx >> 32) & 0xffffffff).toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ case class SparkUDTFWrapperContext(serialized: ByteBuffer) extends Logging {
// evaluate expression and write to output root
val reusedOutputRow = new GenericInternalRow(Array[Any](null, null))
val outputWriter = ArrowWriter.create(outputRoot)
val paramsRow = ColumnarHelper.rootRowReuseable(currentParamsRoot)
val paramsRow = ColumnarHelper.rootRowReusable(currentParamsRoot)

while (currentRowIdx < currentParamsRoot.getRowCount
&& allocator.getAllocatedMemory < maxBatchMemorySize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class JsonFallbackWrapper(jsonPath: String) extends Logging {
ArrowArray.wrap(jsonsStrArrayPtr),
ArrowArray.wrap(outputArrayPtr)) { case (inputRoot, outputRoot, inputArray, outputArray) =>
Data.importIntoVectorSchemaRoot(ROOT_ALLOCATOR, inputArray, inputRoot, dictionaryProvider)
val inputRow = ColumnarHelper.rootRowReuseable(inputRoot)
val inputRow = ColumnarHelper.rootRowReusable(inputRoot)
val nullOutputRow = new GenericInternalRow(1)
val outputRow = new GenericInternalRow(1)
val outputWriter = ArrowWriter.create(outputRoot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import org.apache.arrow.vector.VectorSchemaRoot

object ColumnarHelper {
def rootRowsIter(root: VectorSchemaRoot): Iterator[BlazeColumnarBatchRow] = {
val row = rootRowReuseable(root)
val row = rootRowReusable(root)
val numRows = root.getRowCount
Range(0, numRows).iterator.map { rowId =>
row.rowId = rowId
row
}
}

def rootRowReuseable(root: VectorSchemaRoot): BlazeColumnarBatchRow = {
def rootRowReusable(root: VectorSchemaRoot): BlazeColumnarBatchRow = {
val vectors = root.getFieldVectors.asScala.toArray
new BlazeColumnarBatchRow(
vectors.map(new BlazeArrowColumnVector(_).asInstanceOf[BlazeColumnVector]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi
.map(_.copy())
.toArray

val broadcast = relationFuture.get // bloadcast must be resolved
val broadcast = relationFuture.get // broadcast must be resolved
val v = mode.transform(dataRows)
val dummyBroadcasted = new Broadcast[Any](-1) {
override protected def getValue(): Any = v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ abstract class NativeParquetInsertIntoHiveTableBase(
val encryptEnabled: Boolean = hadoopConf.getBoolean("parquet.encrypt.enable", false)

assert(outputFormatClassName.endsWith("mapredparquetoutputformat"), "not parquet format")
assert(!encryptEnabled, "not supported writting encrypted table")
assert(!encryptEnabled, "not supported writing encrypted table")
}
check()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ case class NativePaimonTableScanExec(basedHiveScan: HiveTableScanExec)
def toRow(
values: Seq[String],
partitionSchema: StructType,
defaultTimeZondId: String): InternalRow = {
defaultTimeZoneId: String): InternalRow = {
val caseInsensitiveProperties = CaseInsensitiveMap(
relation.tableMeta.storage.properties)
val timeZoneId =
caseInsensitiveProperties.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
caseInsensitiveProperties.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
InternalRow.fromSeq(partitionSchema.zipWithIndex.map { case (field, index) =>
val partValue = if (values(index) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
Expand Down
Loading