diff --git a/.github/workflows/build-ce7-releases.yml b/.github/workflows/build-ce7-releases.yml
index 21fb6ded4..d5c1df209 100644
--- a/.github/workflows/build-ce7-releases.yml
+++ b/.github/workflows/build-ce7-releases.yml
@@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- sparkver: [spark303, spark320, spark324, spark333, spark351]
+ sparkver: [spark-3.0, spark-3.1, spark-3.2, spark-3.3, spark-3.5]
blazever: [3.0.1]
steps:
diff --git a/.github/workflows/tpcds.yml b/.github/workflows/tpcds.yml
index de37515bc..f710f3775 100644
--- a/.github/workflows/tpcds.yml
+++ b/.github/workflows/tpcds.yml
@@ -5,44 +5,37 @@ on:
push:
jobs:
- test-spark303:
- name: Test Spark303
+ test-spark-30:
+ name: Test spark-3.0
uses: ./.github/workflows/tpcds-reusable.yml
with:
- sparkver: spark303
+ sparkver: spark-3.0
sparkurl: https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
- test-spark313:
- name: Test Spark313
+ test-spark-31:
+ name: Test spark-3.1
uses: ./.github/workflows/tpcds-reusable.yml
with:
- sparkver: spark313
+ sparkver: spark-3.1
sparkurl: https://archive.apache.org/dist/spark/spark-3.1.3/spark-3.1.3-bin-hadoop2.7.tgz
- test-spark320:
- name: Test Spark320
+ test-spark-32:
+ name: Test spark-3.2
uses: ./.github/workflows/tpcds-reusable.yml
with:
- sparkver: spark320
- sparkurl: https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz
-
- test-spark324:
- name: Test Spark324
- uses: ./.github/workflows/tpcds-reusable.yml
- with:
- sparkver: spark324
+ sparkver: spark-3.2
sparkurl: https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz
- test-spark333:
- name: Test Spark333
+ test-spark-33:
+ name: Test spark-3.3
uses: ./.github/workflows/tpcds-reusable.yml
with:
- sparkver: spark333
+ sparkver: spark-3.3
sparkurl: https://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
- test-spark351:
- name: Test Spark351
+ test-spark-35:
+ name: Test spark-3.5
uses: ./.github/workflows/tpcds-reusable.yml
with:
- sparkver: spark351
- sparkurl: https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
+ sparkver: spark-3.5
+ sparkurl: https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
diff --git a/README.md b/README.md
index 8506c8bdf..1526ac31c 100644
--- a/README.md
+++ b/README.md
@@ -76,17 +76,17 @@ Specify shims package of which spark version that you would like to run on.
Currently we have supported these shims:
-* spark303 - for spark3.0.x
-* spark313 - for spark3.1.x
-* spark324 - for spark3.2.x
-* spark333 - for spark3.3.x
-* spark351 - for spark3.5.x.
+* spark-3.0 - for spark3.0.x
+* spark-3.1 - for spark3.1.x
+* spark-3.2 - for spark3.2.x
+* spark-3.3 - for spark3.3.x
+* spark-3.5 - for spark3.5.x.
You could either build Blaze in pre mode for debugging or in release mode to unlock the full potential of
Blaze.
```shell
-SHIM=spark333 # or spark303/spark313/spark320/spark324/spark333/spark351
+SHIM=spark-3.3 # or spark-3.0/spark-3.1/spark-3.2/spark-3.3/spark-3.5
MODE=release # or pre
mvn package -P"${SHIM}" -P"${MODE}"
```
@@ -98,7 +98,7 @@ directory.
You can use the following command to build a centos-7 compatible release:
```shell
-SHIM=spark333 MODE=release ./release-docker.sh
+SHIM=spark-3.3 MODE=release ./release-docker.sh
```
## Run Spark Job with Blaze Accelerator
@@ -132,10 +132,10 @@ comparison with vanilla Spark 3.3.3. The benchmark result shows that Blaze save
Stay tuned and join us for more upcoming thrilling numbers.
TPC-DS Query time: ([How can I run TPC-DS benchmark?](./tpcds/README.md))
-
+
TPC-H Query time:
-
+
We also encourage you to benchmark Blaze and share the results with us. 🤗
diff --git a/benchmark-results/20240701-blaze300.md b/benchmark-results/20240701-blaze300.md
index 1106d5451..2a0121ff5 100644
--- a/benchmark-results/20240701-blaze300.md
+++ b/benchmark-results/20240701-blaze300.md
@@ -60,7 +60,7 @@ spark.sql.readSideCharPadding false
### TPC-DS Results
Blaze saved 46% total query time comparing to spark, benchmarks using the above configuration.
Query time comparison (seconds):
-
+
| | Blaze | Spark | Speedup(x) |
| ------ | -------- | -------- | ---------- |
@@ -172,7 +172,7 @@ Query time comparison (seconds):
### TPC-H Results
Blaze saved 55% total query time comparing to spark, benchmarks using the above configuration.
Query time comparison (seconds):
-
+
| | Blaze | Spark | Speedup(x) |
| ------ | ------- | -------- | ---------- |
diff --git a/benchmark-results/spark333-vs-blaze300-query-time-20240701-tpch.png b/benchmark-results/spark-3.3-vs-blaze300-query-time-20240701-tpch.png
similarity index 100%
rename from benchmark-results/spark333-vs-blaze300-query-time-20240701-tpch.png
rename to benchmark-results/spark-3.3-vs-blaze300-query-time-20240701-tpch.png
diff --git a/benchmark-results/spark333-vs-blaze300-query-time-20240701.png b/benchmark-results/spark-3.3-vs-blaze300-query-time-20240701.png
similarity index 100%
rename from benchmark-results/spark333-vs-blaze300-query-time-20240701.png
rename to benchmark-results/spark-3.3-vs-blaze300-query-time-20240701.png
diff --git a/dev/docker-build/docker-compose.yml b/dev/docker-build/docker-compose.yml
index 444d9b4c2..039b08f95 100644
--- a/dev/docker-build/docker-compose.yml
+++ b/dev/docker-build/docker-compose.yml
@@ -11,8 +11,7 @@ services:
- ./../../:/blaze:rw
- ./../../target-docker:/blaze/target:rw
- ./../../target-docker/spark-extension-target:/blaze/spark-extension/target:rw
- - ./../../target-docker/spark-extension-shims-spark303-target:/blaze/spark-extension-shims-spark303/target:rw
- - ./../../target-docker/spark-extension-shims-spark241kwaiae-target:/blaze/spark-extension-shims-spark241kwaiae/target:rw
+ - ./../../target-docker/spark-extension-shims-spark-3.0-target:/blaze/spark-extension-shims-spark-3.0/target:rw
- ./../../target-docker/build-helper-proto-target:/blaze/build-helper/proto/target:rw
- ./../../target-docker/build-helper-assembly-target:/blaze/build-helper/assembly/target:rw
environment:
diff --git a/pom.xml b/pom.xml
index eaab07c43..53ff1a18d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -261,9 +261,9 @@
- spark303
+ spark-3.0
- spark303
+ spark-3.0
spark-extension-shims-spark3
1.8
2.12
@@ -275,9 +275,9 @@
- spark313
+ spark-3.1
- spark313
+ spark-3.1
spark-extension-shims-spark3
1.8
2.12
@@ -289,23 +289,9 @@
- spark320
+ spark-3.2
- spark320
- spark-extension-shims-spark3
- 1.8
- 2.12
- 2.12.15
- 3.2.9
- 3.0.0
- 3.2.0
-
-
-
-
- spark324
-
- spark324
+ spark-3.2
spark-extension-shims-spark3
1.8
2.12
@@ -317,9 +303,9 @@
- spark333
+ spark-3.3
- spark333
+ spark-3.3
spark-extension-shims-spark3
1.8
2.12
@@ -331,16 +317,16 @@
- spark351
+ spark-3.5
- spark351
+ spark-3.5
spark-extension-shims-spark3
1.8
2.12
2.12.15
3.2.9
3.0.0
- 3.5.1
+ 3.5.2
diff --git a/release-docker.sh b/release-docker.sh
index fdf2ac679..12821d630 100755
--- a/release-docker.sh
+++ b/release-docker.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-export SHIM="${SHIM:-spark303}"
+export SHIM="${SHIM:-spark-3.0}"
export MODE="${MODE:-release}"
docker-compose -f dev/docker-build/docker-compose.yml up
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala
index b34db7c75..590d904d6 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/InterceptedValidateSparkPlan.scala
@@ -22,7 +22,7 @@ import com.thoughtworks.enableIf
object InterceptedValidateSparkPlan extends Logging {
- @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
def validate(plan: SparkPlan): Unit = {
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.blaze.plan.NativeRenameColumnsBase
@@ -70,13 +70,12 @@ object InterceptedValidateSparkPlan extends Logging {
}
}
- @enableIf(Seq("spark303", "spark313", "spark320").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
def validate(plan: SparkPlan): Unit = {
- throw new UnsupportedOperationException(
- "validate is not supported in spark 3.0.3 or 3.1.3 or spark 3.2.0")
+ throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3")
}
- @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = {
import org.apache.spark.sql.execution.adaptive.InvalidAQEPlanException
throw InvalidAQEPlanException("Invalid broadcast query stage", plan)
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
index ab187380b..2d554c66d 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/blaze/ShimsImpl.scala
@@ -115,20 +115,18 @@ import com.thoughtworks.enableIf
class ShimsImpl extends Shims with Logging {
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
- override def shimVersion: String = "spark303"
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
- override def shimVersion: String = "spark313"
- @enableIf(Seq("spark320").contains(System.getProperty("blaze.shim")))
- override def shimVersion: String = "spark320"
- @enableIf(Seq("spark324").contains(System.getProperty("blaze.shim")))
- override def shimVersion: String = "spark324"
- @enableIf(Seq("spark333").contains(System.getProperty("blaze.shim")))
- override def shimVersion: String = "spark333"
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
- override def shimVersion: String = "spark351"
-
- @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
+ override def shimVersion: String = "spark-3.0"
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
+ override def shimVersion: String = "spark-3.1"
+ @enableIf(Seq("spark-3.2").contains(System.getProperty("blaze.shim")))
+ override def shimVersion: String = "spark-3.2"
+ @enableIf(Seq("spark-3.3").contains(System.getProperty("blaze.shim")))
+ override def shimVersion: String = "spark-3.3"
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
+ override def shimVersion: String = "spark-3.5"
+
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def initExtension(): Unit = {
ValidateSparkPlanInjector.inject()
@@ -143,7 +141,7 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(Seq("spark303", "spark320").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def initExtension(): Unit = {
if (BlazeConf.FORCE_SHUFFLED_HASH_JOIN.booleanConf()) {
logWarning(s"${BlazeConf.FORCE_SHUFFLED_HASH_JOIN.key} is not supported in $shimVersion")
@@ -370,9 +368,7 @@ class ShimsImpl extends Shims with Logging {
length: Long,
numRecords: Long): FileSegment = new FileSegment(file, offset, length)
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def commit(
dep: ShuffleDependency[_, _, _],
shuffleBlockResolver: IndexShuffleBlockResolver,
@@ -392,7 +388,7 @@ class ShimsImpl extends Shims with Logging {
MapStatus.apply(SparkEnv.get.blockManager.shuffleServerId, partitionLengths, mapId)
}
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def commit(
dep: ShuffleDependency[_, _, _],
shuffleBlockResolver: IndexShuffleBlockResolver,
@@ -513,23 +509,19 @@ class ShimsImpl extends Shims with Logging {
expr.asInstanceOf[AggregateExpression].filter
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def isAQEShuffleRead(exec: SparkPlan): Boolean = {
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
exec.isInstanceOf[AQEShuffleReadExec]
}
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
private def isAQEShuffleRead(exec: SparkPlan): Boolean = {
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
exec.isInstanceOf[CustomShuffleReaderExec]
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = {
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec
@@ -619,7 +611,7 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = {
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
@@ -698,7 +690,7 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = {
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
@@ -787,13 +779,11 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def getSqlContext(sparkPlan: SparkPlan): SQLContext =
sparkPlan.session.sqlContext
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def getSqlContext(sparkPlan: SparkPlan): SQLContext = sparkPlan.sqlContext
override def createNativeExprWrapper(
@@ -804,7 +794,7 @@ class ShimsImpl extends Shims with Logging {
}
@enableIf(
- Seq("spark303", "spark313", "spark320", "spark324", "spark333").contains(
+ Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains(
System.getProperty("blaze.shim")))
private def convertPromotePrecision(
e: Expression,
@@ -818,13 +808,13 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
private def convertPromotePrecision(
e: Expression,
isPruningExpr: Boolean,
fallback: Expression => pb.PhysicalExprNode): Option[pb.PhysicalExprNode] = None
- @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = {
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
agg match {
@@ -844,12 +834,10 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(
- Seq("spark303", "spark313", "spark320", "spark324").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1", "spark-3.2").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None
- @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterMightContain(
e: Expression,
isPruningExpr: Boolean,
@@ -870,9 +858,7 @@ class ShimsImpl extends Shims with Logging {
}
}
- @enableIf(
- Seq("spark303", "spark313", "spark320", "spark324").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1", "spark-3.2").contains(System.getProperty("blaze.shim")))
private def convertBloomFilterMightContain(
e: Expression,
isPruningExpr: Boolean,
@@ -883,13 +869,11 @@ class ShimsImpl extends Shims with Logging {
case class ForceNativeExecutionWrapper(override val child: SparkPlan)
extends ForceNativeExecutionWrapperBase(child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
@@ -900,8 +884,6 @@ case class NativeExprWrapper(
override val nullable: Boolean)
extends NativeExprWrapperBase(nativeExpr, dataType, nullable) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy()
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala
index 4f5270fa5..e44ea1cdd 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/ConvertToNativeExec.scala
@@ -21,13 +21,11 @@ import com.thoughtworks.enableIf
case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala
index 11e55ebd9..2f91b1fe4 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala
@@ -48,12 +48,12 @@ case class NativeAggExec(
with BaseAggregateExec {
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override val requiredChildDistributionExpressions: Option[Seq[Expression]] =
theRequiredChildDistributionExpressions
- @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override val initialInputBufferOffset: Int = theInitialInputBufferOffset
override def output: Seq[Attribute] =
@@ -65,21 +65,19 @@ case class NativeAggExec(
ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID))
}
- @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def isStreaming: Boolean = false
- @enableIf(Seq("spark324", "spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def numShufflePartitions: Option[Int] = None
override def resultExpressions: Seq[NamedExpression] = output
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala
index e380875f5..8838d8bc8 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeBroadcastExchangeExec.scala
@@ -42,13 +42,11 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child:
relationFuturePromise.future
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala
index 629a52e1a..b04057518 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeExpandExec.scala
@@ -27,13 +27,11 @@ case class NativeExpandExec(
override val child: SparkPlan)
extends NativeExpandBase(projections, output, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala
index 1e63d9a1a..2a4b06c6a 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeFilterExec.scala
@@ -23,13 +23,11 @@ import com.thoughtworks.enableIf
case class NativeFilterExec(condition: Expression, override val child: SparkPlan)
extends NativeFilterBase(condition, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala
index e5a4a2738..3168036fb 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGenerateExec.scala
@@ -29,13 +29,11 @@ case class NativeGenerateExec(
override val child: SparkPlan)
extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala
index 3861a29d3..25c387e06 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeGlobalLimitExec.scala
@@ -22,13 +22,11 @@ import com.thoughtworks.enableIf
case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan)
extends NativeGlobalLimitBase(limit, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala
index 282a0867c..faf3b28b0 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeLocalLimitExec.scala
@@ -22,13 +22,11 @@ import com.thoughtworks.enableIf
case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan)
extends NativeLocalLimitBase(limit, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
index a6eb27783..00b8e6d1b 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetInsertIntoHiveTableExec.scala
@@ -32,7 +32,7 @@ case class NativeParquetInsertIntoHiveTableExec(
extends NativeParquetInsertIntoHiveTableBase(cmd, child) {
@enableIf(
- Seq("spark303", "spark313", "spark320", "spark324", "spark333").contains(
+ Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains(
System.getProperty("blaze.shim")))
override protected def getInsertIntoHiveTableCommand(
table: CatalogTable,
@@ -52,7 +52,7 @@ case class NativeParquetInsertIntoHiveTableExec(
metrics)
}
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def getInsertIntoHiveTableCommand(
table: CatalogTable,
partition: Map[String, Option[String]],
@@ -71,18 +71,16 @@ case class NativeParquetInsertIntoHiveTableExec(
metrics)
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
@enableIf(
- Seq("spark303", "spark313", "spark320", "spark324", "spark333").contains(
+ Seq("spark-3.0", "spark-3.1", "spark-3.2", "spark-3.3").contains(
System.getProperty("blaze.shim")))
class BlazeInsertIntoHiveTable303(
table: CatalogTable,
@@ -108,7 +106,7 @@ case class NativeParquetInsertIntoHiveTableExec(
super.run(sparkSession, nativeParquetSink)
}
- @enableIf(Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim")))
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
@@ -134,7 +132,7 @@ case class NativeParquetInsertIntoHiveTableExec(
}
}
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
@@ -172,7 +170,7 @@ case class NativeParquetInsertIntoHiveTableExec(
}
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override def basicWriteJobStatsTracker(hadoopConf: org.apache.hadoop.conf.Configuration) = {
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
@@ -202,7 +200,7 @@ case class NativeParquetInsertIntoHiveTableExec(
}
}
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
class BlazeInsertIntoHiveTable351(
table: CatalogTable,
partition: Map[String, Option[String]],
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala
index 0faba5021..aa673db86 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeParquetSinkExec.scala
@@ -30,13 +30,11 @@ case class NativeParquetSinkExec(
override val metrics: Map[String, SQLMetric])
extends NativeParquetSinkBase(sparkSession, table, partition, child, metrics) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala
index d54323d77..c30350db1 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativePartialTakeOrderedExec.scala
@@ -28,13 +28,11 @@ case class NativePartialTakeOrderedExec(
override val metrics: Map[String, SQLMetric])
extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala
index a1eafbb58..3943516d0 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeProjectExecProvider.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.SparkPlan
import com.thoughtworks.enableIf
case object NativeProjectExecProvider {
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
def provide(
projectList: Seq[NamedExpression],
child: SparkPlan,
@@ -49,9 +49,7 @@ case object NativeProjectExecProvider {
NativeProjectExec(projectList, child, addTypeCast)
}
- @enableIf(
- Seq("spark313", "spark320", "spark324", "spark333").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1", "spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim")))
def provide(
projectList: Seq[NamedExpression],
child: SparkPlan,
@@ -67,12 +65,11 @@ case object NativeProjectExecProvider {
with AliasAwareOutputPartitioning
with AliasAwareOutputOrdering {
- @enableIf(
- Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
@@ -85,7 +82,7 @@ case object NativeProjectExecProvider {
NativeProjectExec(projectList, child, addTypeCast)
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
def provide(
projectList: Seq[NamedExpression],
child: SparkPlan,
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala
index 5d8e9d30c..853b76686 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeRenameColumnsExecProvider.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.SparkPlan
import com.thoughtworks.enableIf
case object NativeRenameColumnsExecProvider {
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = {
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.SortOrder
@@ -44,9 +44,7 @@ case object NativeRenameColumnsExecProvider {
NativeRenameColumnsExec(child, renamedColumnNames)
}
- @enableIf(
- Seq("spark313", "spark320", "spark324", "spark333").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1", "spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim")))
def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = {
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.SortOrder
@@ -60,12 +58,11 @@ case object NativeRenameColumnsExecProvider {
with AliasAwareOutputPartitioning
with AliasAwareOutputOrdering {
- @enableIf(
- Seq("spark320", "spark324", "spark333").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
@@ -76,7 +73,7 @@ case object NativeRenameColumnsExecProvider {
NativeRenameColumnsExec(child, renamedColumnNames)
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = {
case class NativeRenameColumnsExec(
override val child: SparkPlan,
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala
index acd93fe91..78c8dd481 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeShuffleExchangeExec.scala
@@ -148,28 +148,26 @@ case class NativeShuffleExchangeExec(
// for databricks testing
val causedBroadcastJoinBuildOOM = false
- @enableIf(Seq("spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.5").contains(System.getProperty("blaze.shim")))
override def advisoryPartitionSize: Option[Long] = None
// If users specify the num partitions via APIs like `repartition`, we shouldn't change it.
// For `SinglePartition`, it requires exactly one partition and we can't change it either.
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override def canChangeNumPartitions: Boolean =
outputPartitioning != SinglePartition
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def shuffleOrigin =
org.apache.spark.sql.execution.exchange.ENSURE_REQUIREMENTS
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala
index 39186b71f..7e569837a 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeSortExec.scala
@@ -26,13 +26,11 @@ case class NativeSortExec(
override val child: SparkPlan)
extends NativeSortBase(sortOrder, global, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala
index fda51d776..03afc5e31 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeTakeOrderedExec.scala
@@ -26,13 +26,11 @@ case class NativeTakeOrderedExec(
override val child: SparkPlan)
extends NativeTakeOrderedBase(limit, sortOrder, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala
index 73aadc5f3..be790c558 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeUnionExec.scala
@@ -22,13 +22,11 @@ import com.thoughtworks.enableIf
case class NativeUnionExec(override val children: Seq[SparkPlan])
extends NativeUnionBase(children) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
copy(children = newChildren)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(children = newChildren)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala
index 235d5c02c..0b2f4a0d9 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeWindowExec.scala
@@ -29,13 +29,11 @@ case class NativeWindowExec(
override val child: SparkPlan)
extends NativeWindowBase(windowExpression, partitionSpec, orderSpec, child) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala
index e9a48bbf7..9d9d10032 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeBlockStoreShuffleReader.scala
@@ -45,8 +45,7 @@ class BlazeBlockStoreShuffleReader[K, C](
override def readBlocks(): Iterator[(BlockId, InputStream)] = {
@enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
def fetchIterator = new ShuffleBlockFetcherIterator(
context,
blockManager.blockStoreClient,
@@ -67,7 +66,7 @@ class BlazeBlockStoreShuffleReader[K, C](
readMetrics,
fetchContinuousBlocksInBatch).toCompletionIterator
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
def fetchIterator = new ShuffleBlockFetcherIterator(
context,
blockManager.blockStoreClient,
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala
index 662358544..96a6b3f34 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeRssShuffleManagerBase.scala
@@ -76,7 +76,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def getReader[K, C](
handle: ShuffleHandle,
@@ -108,7 +108,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage
}
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
@@ -123,7 +123,7 @@ abstract class BlazeRssShuffleManagerBase(conf: SparkConf) extends ShuffleManage
}
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override def getReaderForRange[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala
index 75c74affb..06c8fb2c3 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleManager.scala
@@ -48,9 +48,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
sortShuffleManager.registerShuffle(shuffleId, dependency)
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
@@ -63,9 +61,9 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
if (isArrowShuffle(handle)) {
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
- @enableIf(Seq("spark320", "spark324").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2").contains(System.getProperty("blaze.shim")))
def shuffleMergeFinalized = baseShuffleHandle.dependency.shuffleMergeFinalized
- @enableIf(Seq("spark333", "spark351").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
def shuffleMergeFinalized = baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked
val (blocksByAddress, canEnableBatchFetch) =
@@ -108,7 +106,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
}
}
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
@@ -145,7 +143,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
}
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
@@ -172,7 +170,7 @@ class BlazeShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
}
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override def getReaderForRange[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala
index 04043b70d..8ad299f34 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/BlazeShuffleWriter.scala
@@ -22,8 +22,6 @@ import com.thoughtworks.enableIf
class BlazeShuffleWriter[K, V](metrics: ShuffleWriteMetricsReporter)
extends BlazeShuffleWriterBase[K, V](metrics) {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override def getPartitionLengths(): Array[Long] = partitionLengths
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala
index ec6b88171..45b6f55f8 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeBroadcastJoinExec.scala
@@ -48,7 +48,7 @@ case class NativeBroadcastJoinExec(
override val condition: Option[Expression] = None
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def buildSide: org.apache.spark.sql.catalyst.optimizer.BuildSide =
broadcastSide match {
@@ -56,14 +56,14 @@ case class NativeBroadcastJoinExec(
case BroadcastRight => org.apache.spark.sql.catalyst.optimizer.BuildRight
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
override val buildSide: org.apache.spark.sql.execution.joins.BuildSide = broadcastSide match {
case BroadcastLeft => org.apache.spark.sql.execution.joins.BuildLeft
case BroadcastRight => org.apache.spark.sql.execution.joins.BuildRight
}
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def requiredChildDistribution = {
import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution
@@ -80,19 +80,19 @@ case class NativeBroadcastJoinExec(
}
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def supportCodegen: Boolean = false
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override def inputRDDs() = {
throw new NotImplementedError("NativeBroadcastJoin dose not support codegen")
}
@enableIf(
- Seq("spark313", "spark320", "spark324", "spark333", "spark351").contains(
+ Seq("spark-3.1", "spark-3.2", "spark-3.3", "spark-3.5").contains(
System.getProperty("blaze.shim")))
override protected def prepareRelation(
ctx: org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext)
@@ -100,15 +100,13 @@ case class NativeBroadcastJoinExec(
throw new NotImplementedError("NativeBroadcastJoin dose not support codegen")
}
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): SparkPlan =
copy(left = newLeft, right = newRight)
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(left = newChildren(0), right = newChildren(1))
}
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala
index 6582154b5..2e04013ac 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeShuffledHashJoinExecProvider.scala
@@ -25,9 +25,7 @@ import com.thoughtworks.enableIf
case object NativeShuffledHashJoinExecProvider {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
def provide(
left: SparkPlan,
right: SparkPlan,
@@ -74,7 +72,7 @@ case object NativeShuffledHashJoinExecProvider {
NativeShuffledHashJoinExec(left, right, leftKeys, rightKeys, joinType, buildSide)
}
- @enableIf(Seq("spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.1").contains(System.getProperty("blaze.shim")))
def provide(
left: SparkPlan,
right: SparkPlan,
@@ -120,7 +118,7 @@ case object NativeShuffledHashJoinExecProvider {
NativeShuffledHashJoinExec(left, right, leftKeys, rightKeys, joinType, buildSide)
}
- @enableIf(Seq("spark303").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0").contains(System.getProperty("blaze.shim")))
def provide(
left: SparkPlan,
right: SparkPlan,
diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala
index 42b8787f5..50fbd98b5 100644
--- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala
+++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/joins/blaze/plan/NativeSortMergeJoinExecProvider.scala
@@ -24,9 +24,7 @@ import com.thoughtworks.enableIf
case object NativeSortMergeJoinExecProvider {
- @enableIf(
- Seq("spark320", "spark324", "spark333", "spark351").contains(
- System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.2", "spark-3.3", "spark-3.5").contains(System.getProperty("blaze.shim")))
def provide(
left: SparkPlan,
right: SparkPlan,
@@ -71,7 +69,7 @@ case object NativeSortMergeJoinExecProvider {
NativeSortMergeJoinExec(left, right, leftKeys, rightKeys, joinType)
}
- @enableIf(Seq("spark303", "spark313").contains(System.getProperty("blaze.shim")))
+ @enableIf(Seq("spark-3.0", "spark-3.1").contains(System.getProperty("blaze.shim")))
def provide(
left: SparkPlan,
right: SparkPlan,