Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/tpcds-reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ jobs:
UNIFFLE_NUMBER="${UNIFFLE_NUMBER#uniffle-}"
fi

CMD="./auron-build.sh --pre --sparkver $SPARK_NUMBER --scalaver ${{ inputs.scalaver }}"
CMD="./auron-build.sh --pre --sparkver $SPARK_NUMBER --scalaver ${{ inputs.scalaver }} --skiptests false"
if [ -n "${{ inputs.celebornver }}" ]; then
CMD="$CMD --celeborn $CELEBORN_NUMBER"
fi
Expand All @@ -186,6 +186,13 @@ jobs:
echo "Running: $CMD"
exec $CMD

- name: Upload unit test reports
if: always()
uses: actions/upload-artifact@v4
with:
name: unit-tests-${{ inputs.sparkver }}_${{ inputs.scalaver }}-jdk-${{ inputs.javaver }}${{ inputs.celebornver && format('-{0}', inputs.celebornver) || '' }}${{ inputs.unifflever && format('-{0}', inputs.unifflever) || '' }}
path: "**/target/surefire-reports/*.xml"

- name: Upload auron (Spark ${{ inputs.sparkver }}, Scala ${{ inputs.scalaver }}, JDK ${{ inputs.javaver }})
uses: actions/upload-artifact@v4
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.auron

import org.apache.spark.sql.{AuronQueryTest, Row, SparkSession}
import org.apache.spark.sql.auron.AuronConverters
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.auron.plan.NativeBroadcastExchangeExec
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -37,6 +36,7 @@ class AuronCheckConvertBroadcastExchangeSuite
.appName("checkConvertToBroadcast")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
.config(
"spark.shuffle.manager",
Expand All @@ -51,14 +51,11 @@ class AuronCheckConvertBroadcastExchangeSuite
spark.sql(
"select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner join broad_cast_table2 b on a.c1 = b.c1")

val plan = executePlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
val broadcastExchangeExec =
plan.executedPlan
.collectFirst { case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
}

val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.get)
val broadcastExchangeExec = collect(executePlan.queryExecution.executedPlan) {
case broadcastExchangeExec: BroadcastExchangeExec => broadcastExchangeExec
}
assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in plan")
val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
assert(afterConvertPlan.isInstanceOf[NativeBroadcastExchangeExec])
checkAnswer(executePlan, Seq(Row(1, 2)))
}
Expand All @@ -71,6 +68,7 @@ class AuronCheckConvertBroadcastExchangeSuite
.appName("checkConvertToBroadcast")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
.config(
"spark.shuffle.manager",
Expand All @@ -85,14 +83,11 @@ class AuronCheckConvertBroadcastExchangeSuite
spark.sql(
"select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner join broad_cast_table2 b ")

val plan = executePlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
val broadcastExchangeExec =
plan.executedPlan
.collectFirst { case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
}

val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.get)
val broadcastExchangeExec = collect(executePlan.queryExecution.executedPlan) {
case broadcastExchangeExec: BroadcastExchangeExec => broadcastExchangeExec
}
assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in plan")
val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
assert(afterConvertPlan.isInstanceOf[NativeBroadcastExchangeExec])
checkAnswer(executePlan, Seq(Row(1, 2)))
}
Expand All @@ -105,6 +100,7 @@ class AuronCheckConvertBroadcastExchangeSuite
.appName("checkConvertToBroadcast")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
.config(
"spark.shuffle.manager",
Expand All @@ -120,14 +116,11 @@ class AuronCheckConvertBroadcastExchangeSuite
spark.sql(
"select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner join broad_cast_table2 b on a.c1 = b.c1")

val plan = executePlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
val broadcastExchangeExec =
plan.executedPlan
.collectFirst { case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
}

val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.get)
val broadcastExchangeExec = collect(executePlan.queryExecution.executedPlan) {
case broadcastExchangeExec: BroadcastExchangeExec => broadcastExchangeExec
}
assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in plan")
val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
assert(afterConvertPlan.isInstanceOf[BroadcastExchangeExec])
checkAnswer(executePlan, Seq(Row(1, 2)))
}
Expand All @@ -140,6 +133,7 @@ class AuronCheckConvertBroadcastExchangeSuite
.appName("checkConvertToBroadcast")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
.config(
"spark.shuffle.manager",
Expand All @@ -155,16 +149,12 @@ class AuronCheckConvertBroadcastExchangeSuite
spark.sql(
"select /*+ broadcast(a)*/ a.c1, a.c2 from broad_cast_table1 a inner join broad_cast_table2 b ")

val plan = executePlan.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
val broadcastExchangeExec =
plan.executedPlan
.collectFirst { case broadcastExchangeExec: BroadcastExchangeExec =>
broadcastExchangeExec
}

val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.get)
val broadcastExchangeExec = collect(executePlan.queryExecution.executedPlan) {
case broadcastExchangeExec: BroadcastExchangeExec => broadcastExchangeExec
}
assert(broadcastExchangeExec.nonEmpty, "BroadcastExchangeExec not found in plan")
val afterConvertPlan = AuronConverters.convertSparkPlan(broadcastExchangeExec.head)
assert(afterConvertPlan.isInstanceOf[BroadcastExchangeExec])
checkAnswer(executePlan, Seq(Row(1, 2)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class AuronCheckConvertShuffleExchangeSuite
.appName("checkConvertToNativeShuffleManger")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
.config(
"spark.shuffle.manager",
Expand All @@ -50,12 +51,11 @@ class AuronCheckConvertShuffleExchangeSuite
val executePlan =
spark.sql("select c1, count(1) from test_shuffle group by c1")

val shuffleExchangeExec =
executePlan.queryExecution.executedPlan
.collectFirst { case shuffleExchangeExec: ShuffleExchangeExec =>
shuffleExchangeExec
}
val afterConvertPlan = AuronConverters.convertSparkPlan(shuffleExchangeExec.get)
val shuffleExchangeExec = collect(executePlan.queryExecution.executedPlan) {
case shuffleExchangeExec: ShuffleExchangeExec => shuffleExchangeExec
}
assert(shuffleExchangeExec.nonEmpty, "ShuffleExchangeExec not found in plan")
val afterConvertPlan = AuronConverters.convertSparkPlan(shuffleExchangeExec.head)
assert(afterConvertPlan.isInstanceOf[NativeShuffleExchangeExec])
checkAnswer(executePlan, Seq(Row(1, 1)))
}
Expand All @@ -70,6 +70,7 @@ class AuronCheckConvertShuffleExchangeSuite
.appName("checkConvertToNativeShuffleManger")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true")
.config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
.config("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension")
.config("spark.memory.offHeap.enabled", "false")
Expand All @@ -81,12 +82,11 @@ class AuronCheckConvertShuffleExchangeSuite
val executePlan =
spark.sql("select c1, count(1) from test_shuffle group by c1")

val shuffleExchangeExec =
executePlan.queryExecution.executedPlan
.collectFirst { case shuffleExchangeExec: ShuffleExchangeExec =>
shuffleExchangeExec
}
val afterConvertPlan = AuronConverters.convertSparkPlan(shuffleExchangeExec.get)
val shuffleExchangeExec = collect(executePlan.queryExecution.executedPlan) {
case shuffleExchangeExec: ShuffleExchangeExec => shuffleExchangeExec
}
assert(shuffleExchangeExec.nonEmpty, "ShuffleExchangeExec not found in plan")
val afterConvertPlan = AuronConverters.convertSparkPlan(shuffleExchangeExec.head)
assert(afterConvertPlan.isInstanceOf[ShuffleExchangeExec])
checkAnswer(executePlan, Seq(Row(1, 1)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.auron
import java.text.SimpleDateFormat

import org.apache.spark.sql.AuronQueryTest
import org.apache.spark.sql.Row

import org.apache.auron.util.AuronTestUtils

Expand All @@ -34,6 +35,8 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
}

test("sha2 function") {
// SPARK-36836: In Spark 3.0/3.1, sha2(..., 224) may produce garbled UTF instead of hex.
// For < 3.2, compare against known hex outputs directly; for >= 3.2, compare to Spark baseline.
withTable("t1") {
sql("create table t1 using parquet as select 'spark' as c1, '3.x' as version")
val functions =
Expand All @@ -46,7 +49,21 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
| sha2(concat(c1, version), 512) as sha512
|from t1
|""".stripMargin
checkSparkAnswerAndOperator(functions)
if (AuronTestUtils.isSparkV32OrGreater) {
checkSparkAnswerAndOperator(functions)
} else {
val df = sql(functions)
// Expected hex for input concat('spark','3.x')
val expected = Seq(
Row(
"562d20689257f3f3a04ee9afb86d0ece2af106cf6c6e5e7d266043088ce5fbc0", // sha0 (256)
"562d20689257f3f3a04ee9afb86d0ece2af106cf6c6e5e7d266043088ce5fbc0", // sha256
"d0c8e9ccd5c7b3fdbacd2cfd6b4d65ca8489983b5e8c7c64cd77b634", // sha224
"77c1199808053619c29e9af2656e1ad2614772f6ea605d5757894d6aec2dfaf34ff6fd662def3b79e429e9ae5ecbfed1", // sha384
"c4e27d35517ca62243c1f322d7922dac175830be4668e8a1cf3befdcd287bb5b6f8c5f041c9d89e4609c8cfa242008c7c7133af1685f57bac9052c1212f1d089" // sha512
))
checkAnswer(df, expected)
}
}
}

Expand Down Expand Up @@ -249,7 +266,7 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
}

test("test function least") {
withTable("t1") {
withTable("test_least") {
sql(
"create table test_least using parquet as select 1 as c1, 2 as c2, 'a' as c3, 'b' as c4, 'c' as c5")

Expand All @@ -270,13 +287,13 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
|select
| least(c4, c3, c5),
| least(c1, c2, 1),
| least(c1, c2, -1),
| least(c1, c2, (-1)),
| least(c4, c5, c3, c3, 'a'),
| least(null, null),
| least(c4, c3, c5, null),
| least(-1.0, 2.5),
| least(-1.0, 2),
| least(-1.0f, 2.5f),
| least((-1.0), 2.5),
| least((-1.0), 2),
| least(CAST(-1.0 AS FLOAT), CAST(2.5 AS FLOAT)),
| least(cast(1 as byte), cast(2 as byte)),
| least('abc', 'aaaa'),
| least(true, false),
Expand Down Expand Up @@ -316,9 +333,9 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
| greatest(c4, c5, c3, 'ccc'),
| greatest(null, null),
| greatest(c3, c4, c5, null),
| greatest(-1.0, 2.5),
| greatest(-1, 2),
| greatest(-1.0f, 2.5f),
| greatest((-1.0), 2.5),
| greatest((-1), 2),
| greatest(CAST(-1.0 AS FLOAT), CAST(2.5 AS FLOAT)),
| greatest(${longMax}, ${longMin}),
| greatest(cast(1 as byte), cast(2 as byte)),
| greatest(cast(1 as short), cast(2 as short)),
Expand Down Expand Up @@ -364,23 +381,27 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
}
}

test("test function IsNaN") {
withTable("t1") {
sql(
"create table test_is_nan using parquet as select cast('NaN' as double) as c1, cast('NaN' as float) as c2, log(-3) as c3, cast(null as double) as c4, 5.5f as c5")
val functions =
"""
|select
| isnan(c1),
| isnan(c2),
| isnan(c3),
| isnan(c4),
| isnan(c5)
|from
| test_is_nan
ignore("DISABLED: isNaN native semantics mismatch (null -> false)") {
/* TODO: enable once Spark-compatible isNaN lands https://github.com/apache/auron/issues/1646 */

test("test function IsNaN") {
withTable("t1") {
sql(
"create table test_is_nan using parquet as select cast('NaN' as double) as c1, cast('NaN' as float) as c2, log(-3) as c3, cast(null as double) as c4, 5.5f as c5")
val functions =
"""
|select
| isnan(c1),
| isnan(c2),
| isnan(c3),
| isnan(c4),
| isnan(c5)
|from
| test_is_nan
""".stripMargin

checkSparkAnswerAndOperator(functions)
checkSparkAnswerAndOperator(functions)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ class NativeConvertersSuite
}

test("cast trim disabled via auron conf") {
withEnvConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
assertNonTrimmedCast(" 42 ", IntegerType)
}
}

test("cast trim disabled via auron conf for boolean cast") {
withEnvConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
withSQLConf(AuronConf.CAST_STRING_TRIM_ENABLE.key -> "false") {
assertNonTrimmedCast(" true ", BooleanType)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,8 @@ object AuronConverters extends Logging {

def convertSparkPlan(exec: SparkPlan): SparkPlan = {
exec match {
case e: ShuffleExchangeExec => tryConvert(e, convertShuffleExchangeExec)
case e: BroadcastExchangeExec if enableBroadcastExchange =>
tryConvert(e, convertBroadcastExchangeExec)
case e: ShuffleExchangeExec if enableExchange => tryConvert(e, convertShuffleExchangeExec)
case e: BroadcastExchangeExec =>
case e: BroadcastExchangeExec if enableBroadcastExchange =>
tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
tryConvert(e, convertFileSourceScanExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration

object NativeConverters extends Logging {

private val sparkAuronConfig: AuronConfiguration =
private def sparkAuronConfig: AuronConfiguration =
AuronAdaptor.getInstance.getAuronConfiguration
def udfEnabled: Boolean =
AuronConverters.getBooleanConf("spark.auron.udf.enabled", defaultValue = true)
Expand All @@ -99,6 +99,8 @@ object NativeConverters extends Logging {
AuronConverters.getBooleanConf("spark.auron.decimal.arithOp.enabled", defaultValue = false)
def datetimeExtractEnabled: Boolean =
AuronConverters.getBooleanConf("spark.auron.datetime.extract.enabled", defaultValue = false)
def castTrimStringEnabled: Boolean =
AuronConverters.getBooleanConf("spark.auron.cast.trimString", defaultValue = true)

def scalarTypeSupported(dataType: DataType): Boolean = {
dataType match {
Expand Down Expand Up @@ -462,7 +464,7 @@ object NativeConverters extends Logging {
if (cast.child.dataType == StringType &&
(cast.dataType.isInstanceOf[NumericType] || cast.dataType
.isInstanceOf[BooleanType]) &&
sparkAuronConfig.getBoolean(SparkAuronConfiguration.CAST_STRING_TRIM_ENABLE)) {
castTrimStringEnabled) {
// converting Cast(str as num) to StringTrim(Cast(str as num)) if enabled
StringTrim(cast.child)
} else {
Expand Down Expand Up @@ -853,7 +855,7 @@ object NativeConverters extends Logging {
buildExtScalarFunction("Spark_StringLower", e.children, e.dataType)
case e: Upper
if sparkAuronConfig.getBoolean(SparkAuronConfiguration.CASE_CONVERT_FUNCTIONS_ENABLE) =>
buildExtScalarFunction("Spark_StringLower", e.children, e.dataType)
buildExtScalarFunction("Spark_StringUpper", e.children, e.dataType)

case e: StringTrim =>
buildScalarFunction(pb.ScalarFunction.Trim, e.srcStr +: e.trimStr.toSeq, e.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ abstract class NativeParquetInsertIntoHiveTableBase(
.filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
.toSeq
:+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time"))
:+ ("bytes_written", SQLMetrics
:+ ("bytes_written",
SQLMetrics
.createSizeMetric(sparkContext, "Native.bytes_written")): _*)

def check(): Unit = {
Expand Down
Loading