diff --git a/Cargo.lock b/Cargo.lock
index 82bb7454b..a277146f3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -97,7 +97,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "arrow"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -117,7 +117,7 @@ dependencies = [
[[package]]
name = "arrow-arith"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -131,7 +131,7 @@ dependencies = [
[[package]]
name = "arrow-array"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"ahash",
"arrow-buffer",
@@ -147,7 +147,7 @@ dependencies = [
[[package]]
name = "arrow-buffer"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"bytes",
"half",
@@ -157,7 +157,7 @@ dependencies = [
[[package]]
name = "arrow-cast"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -177,7 +177,7 @@ dependencies = [
[[package]]
name = "arrow-csv"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -195,7 +195,7 @@ dependencies = [
[[package]]
name = "arrow-data"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -206,7 +206,7 @@ dependencies = [
[[package]]
name = "arrow-ipc"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -220,7 +220,7 @@ dependencies = [
[[package]]
name = "arrow-json"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -239,7 +239,7 @@ dependencies = [
[[package]]
name = "arrow-ord"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -253,7 +253,7 @@ dependencies = [
[[package]]
name = "arrow-row"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"ahash",
"arrow-array",
@@ -266,7 +266,7 @@ dependencies = [
[[package]]
name = "arrow-schema"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"bitflags 2.6.0",
"serde",
@@ -275,7 +275,7 @@ dependencies = [
[[package]]
name = "arrow-select"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"ahash",
"arrow-array",
@@ -288,7 +288,7 @@ dependencies = [
[[package]]
name = "arrow-string"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -750,7 +750,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -806,7 +806,7 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow-schema",
"async-trait",
@@ -820,7 +820,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -843,7 +843,7 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"log",
"tokio",
@@ -852,7 +852,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow",
"chrono",
@@ -872,7 +872,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -893,7 +893,7 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow",
"datafusion-common",
@@ -1012,7 +1012,7 @@ dependencies = [
[[package]]
name = "datafusion-functions"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow",
"arrow-buffer",
@@ -1038,7 +1038,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -1058,7 +1058,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate-common"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -1071,7 +1071,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow",
"arrow-array",
@@ -1093,7 +1093,7 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"datafusion-common",
"datafusion-expr",
@@ -1104,7 +1104,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow",
"async-trait",
@@ -1123,7 +1123,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -1154,7 +1154,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -1167,7 +1167,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow-schema",
"datafusion-common",
@@ -1180,7 +1180,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"ahash",
"arrow",
@@ -1214,7 +1214,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "42.0.0"
-source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb"
+source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8"
dependencies = [
"arrow",
"arrow-array",
@@ -1945,7 +1945,7 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775"
[[package]]
name = "orc-rust"
version = "0.4.1"
-source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=9c74ac3#9c74ac3779c2ebb371404d71adb915b49eee1930"
+source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=7833d7d#7833d7dcbb22b613d882b9217a1ca35fb9e9bd70"
dependencies = [
"arrow",
"async-trait",
@@ -2008,7 +2008,7 @@ dependencies = [
[[package]]
name = "parquet"
version = "53.0.0"
-source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f"
+source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a"
dependencies = [
"ahash",
"arrow-array",
diff --git a/Cargo.toml b/Cargo.toml
index 9b7700c25..05e2bbb29 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -66,27 +66,27 @@ serde_json = { version = "1.0.96" }
[patch.crates-io]
# datafusion: branch=v42-blaze
-datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
-datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
-datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
-datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
-datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
-datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"}
-orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "9c74ac3"}
+datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
+datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
+datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
+datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
+datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
+datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"}
+orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "7833d7d"}
# arrow: branch=v53-blaze
-arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
-parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"}
+arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
+parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"}
# serde_json: branch=v1.0.96-blaze
serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" }
diff --git a/hadoop-shim/pom.xml b/hadoop-shim/pom.xml
new file mode 100644
index 000000000..3c2f906d4
--- /dev/null
+++ b/hadoop-shim/pom.xml
@@ -0,0 +1,38 @@
+
+
+ 4.0.0
+
+
+ org.blaze
+ blaze-engine
+ ${revision}
+ ../
+
+ org.blaze
+ hadoop-shim
+ jar
+
+
+
+ org.scala-lang
+ scala-library
+ provided
+
+
+ org.scala-lang.modules
+ scala-java8-compat_2.12
+ 0.9.1
+
+
+ org.scalatest
+ scalatest_${scalaVersion}
+ test
+
+
+ org.apache.hadoop
+ hadoop-client-api
+ 3.4.0
+ provided
+
+
+
diff --git a/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataInputWrapper.scala b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataInputWrapper.scala
new file mode 100644
index 000000000..97896eae1
--- /dev/null
+++ b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataInputWrapper.scala
@@ -0,0 +1,58 @@
+package org.apache.spark.blaze
+
+import java.io.EOFException
+import java.nio.ByteBuffer
+import java.nio.channels.Channels
+
+import org.apache.hadoop.fs.FSDataInputStream
+import org.apache.hadoop.fs.StreamCapabilities
+
+trait FSDataInputWrapper extends AutoCloseable {
+ def readFully(pos: Long, buf: ByteBuffer): Unit
+}
+
+object FSDataInputWrapper {
+ def wrap(input: FSDataInputStream): FSDataInputWrapper = {
+ if (canUsePositionedReadable(input)) {
+ new PositionedReadableFSDataInputWrapper(input)
+ } else {
+ new SeekableFSDataInputWrapper(input)
+ }
+ }
+
+ private def canUsePositionedReadable(input: FSDataInputStream): Boolean = {
+ try {
+ input.getClass.getMethod("readFully", classOf[Long], classOf[ByteBuffer]) != null &&
+ input.hasCapability(StreamCapabilities.PREADBYTEBUFFER)
+ } catch {
+ case _: Throwable => false
+ }
+ }
+}
+
+class PositionedReadableFSDataInputWrapper(input: FSDataInputStream) extends FSDataInputWrapper {
+ override def readFully(pos: Long, buf: ByteBuffer): Unit = {
+ input.readFully(pos, buf)
+ if (buf.hasRemaining) {
+ throw new EOFException(s"cannot read more ${buf.remaining()} bytes")
+ }
+ }
+
+ override def close(): Unit = input.close()
+}
+
+class SeekableFSDataInputWrapper(input: FSDataInputStream) extends FSDataInputWrapper {
+ override def readFully(pos: Long, buf: ByteBuffer): Unit = {
+ input.synchronized {
+ input.seek(pos)
+ while (buf.hasRemaining) {
+ val channel = Channels.newChannel(input)
+ if (channel.read(buf) == -1) {
+ throw new EOFException(s"cannot read more ${buf.remaining()} bytes")
+ }
+ }
+ }
+ }
+
+ override def close(): Unit = input.close()
+}
\ No newline at end of file
diff --git a/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataOutputWrapper.scala b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataOutputWrapper.scala
new file mode 100644
index 000000000..7b1531e9e
--- /dev/null
+++ b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataOutputWrapper.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.blaze
+
+import java.io.EOFException
+import java.nio.ByteBuffer
+import java.nio.channels.Channels
+
+import org.apache.hadoop.fs.FSDataOutputStream
+
+trait FSDataOutputWrapper extends AutoCloseable {
+ def writeFully(buf: ByteBuffer): Unit
+}
+
+object FSDataOutputWrapper {
+ def wrap(output: FSDataOutputStream): FSDataOutputWrapper = {
+ new SeekableFSDataOutputWrapper(output)
+ }
+}
+
+class SeekableFSDataOutputWrapper(output: FSDataOutputStream) extends FSDataOutputWrapper {
+ override def writeFully(buf: ByteBuffer): Unit = {
+ output.synchronized {
+ val channel = Channels.newChannel(output)
+ while (buf.hasRemaining) if (channel.write(buf) == -1) {
+ throw new EOFException("writeFullyToFSDataOutputStream() got unexpected EOF")
+ }
+ }
+ }
+
+ override def close(): Unit = output.close()
+}
\ No newline at end of file
diff --git a/native-engine/blaze-jni-bridge/src/jni_bridge.rs b/native-engine/blaze-jni-bridge/src/jni_bridge.rs
index 5e7d2c96d..260e76231 100644
--- a/native-engine/blaze-jni-bridge/src/jni_bridge.rs
+++ b/native-engine/blaze-jni-bridge/src/jni_bridge.rs
@@ -387,7 +387,6 @@ pub struct JavaClasses<'a> {
pub classloader: JObject<'a>,
pub cJniBridge: JniBridge<'a>,
- pub cJniUtil: JniUtil<'a>,
pub cClass: JavaClass<'a>,
pub cJavaThrowable: JavaThrowable<'a>,
pub cJavaRuntimeException: JavaRuntimeException<'a>,
@@ -406,7 +405,6 @@ pub struct JavaClasses<'a> {
pub cHadoopFileSystem: HadoopFileSystem<'a>,
pub cHadoopPath: HadoopPath<'a>,
- pub cHadoopFSDataInputStream: HadoopFSDataInputStream<'a>,
pub cSparkFileSegment: SparkFileSegment<'a>,
pub cSparkSQLMetric: SparkSQLMetric<'a>,
@@ -420,6 +418,8 @@ pub struct JavaClasses<'a> {
pub cBlazeNativeParquetSinkUtils: BlazeNativeParquetSinkUtils<'a>,
pub cBlazeBlockObject: BlazeBlockObject<'a>,
pub cBlazeArrowFFIExporter: BlazeArrowFFIExporter<'a>,
+ pub cBlazeFSDataInputWrapper: BlazeFSDataInputWrapper<'a>,
+ pub cBlazeFSDataOutputWrapper: BlazeFSDataOutputWrapper<'a>,
}
#[allow(clippy::non_send_fields_in_send_ty)]
@@ -447,7 +447,6 @@ impl JavaClasses<'static> {
jvm: env.get_java_vm()?,
classloader: get_global_ref_jobject(env, classloader)?,
cJniBridge: jni_bridge,
- cJniUtil: JniUtil::new(env)?,
cClass: JavaClass::new(env)?,
cJavaThrowable: JavaThrowable::new(env)?,
@@ -467,7 +466,6 @@ impl JavaClasses<'static> {
cHadoopFileSystem: HadoopFileSystem::new(env)?,
cHadoopPath: HadoopPath::new(env)?,
- cHadoopFSDataInputStream: HadoopFSDataInputStream::new(env)?,
cSparkFileSegment: SparkFileSegment::new(env)?,
cSparkSQLMetric: SparkSQLMetric::new(env)?,
@@ -481,6 +479,8 @@ impl JavaClasses<'static> {
cBlazeNativeParquetSinkUtils: BlazeNativeParquetSinkUtils::new(env)?,
cBlazeBlockObject: BlazeBlockObject::new(env)?,
cBlazeArrowFFIExporter: BlazeArrowFFIExporter::new(env)?,
+ cBlazeFSDataInputWrapper: BlazeFSDataInputWrapper::new(env)?,
+ cBlazeFSDataOutputWrapper: BlazeFSDataOutputWrapper::new(env)?,
};
log::info!("Initializing JavaClasses finished");
Ok(java_classes)
@@ -527,6 +527,10 @@ pub struct JniBridge<'a> {
pub method_isTaskRunning_ret: ReturnType,
pub method_isDriverSide: JStaticMethodID,
pub method_isDriverSide_ret: ReturnType,
+ pub method_openFileAsDataInputWrapper: JStaticMethodID,
+ pub method_openFileAsDataInputWrapper_ret: ReturnType,
+ pub method_createFileAsDataOutputWrapper: JStaticMethodID,
+ pub method_createFileAsDataOutputWrapper_ret: ReturnType,
pub method_getDirectMemoryUsed: JStaticMethodID,
pub method_getDirectMemoryUsed_ret: ReturnType,
pub method_getDirectWriteSpillToDiskFile: JStaticMethodID,
@@ -584,6 +588,18 @@ impl<'a> JniBridge<'a> {
method_isTaskRunning: env.get_static_method_id(class, "isTaskRunning", "()Z")?,
method_isTaskRunning_ret: ReturnType::Primitive(Primitive::Boolean),
method_isDriverSide: env.get_static_method_id(class, "isDriverSide", "()Z")?,
+ method_openFileAsDataInputWrapper: env.get_static_method_id(
+ class,
+ "openFileAsDataInputWrapper",
+ "(Lorg/apache/hadoop/fs/FileSystem;Ljava/lang/String;)Lorg/apache/spark/blaze/FSDataInputWrapper;",
+ )?,
+ method_openFileAsDataInputWrapper_ret: ReturnType::Object,
+ method_createFileAsDataOutputWrapper: env.get_static_method_id(
+ class,
+ "createFileAsDataOutputWrapper",
+ "(Lorg/apache/hadoop/fs/FileSystem;Ljava/lang/String;)Lorg/apache/spark/blaze/FSDataOutputWrapper;",
+ )?,
+ method_createFileAsDataOutputWrapper_ret: ReturnType::Object,
method_isDriverSide_ret: ReturnType::Primitive(Primitive::Boolean),
method_getDirectMemoryUsed: env.get_static_method_id(
class,
@@ -601,37 +617,6 @@ impl<'a> JniBridge<'a> {
}
}
-#[allow(non_snake_case)]
-pub struct JniUtil<'a> {
- pub class: JClass<'a>,
- pub method_readFullyFromFSDataInputStream: JStaticMethodID,
- pub method_readFullyFromFSDataInputStream_ret: ReturnType,
- pub method_writeFullyToFSDataOutputStream: JStaticMethodID,
- pub method_writeFullyToFSDataOutputStream_ret: ReturnType,
-}
-impl<'a> JniUtil<'a> {
- pub const SIG_TYPE: &'static str = "org/apache/spark/sql/blaze/JniUtil";
-
- pub fn new(env: &JNIEnv<'a>) -> JniResult> {
- let class = get_global_jclass(env, Self::SIG_TYPE)?;
- Ok(JniUtil {
- class,
- method_readFullyFromFSDataInputStream: env.get_static_method_id(
- class,
- "readFullyFromFSDataInputStream",
- "(Lorg/apache/hadoop/fs/FSDataInputStream;JLjava/nio/ByteBuffer;)V",
- )?,
- method_readFullyFromFSDataInputStream_ret: ReturnType::Primitive(Primitive::Void),
- method_writeFullyToFSDataOutputStream: env.get_static_method_id(
- class,
- "writeFullyToFSDataOutputStream",
- "(Lorg/apache/hadoop/fs/FSDataOutputStream;Ljava/nio/ByteBuffer;)V",
- )?,
- method_writeFullyToFSDataOutputStream_ret: ReturnType::Primitive(Primitive::Void),
- })
- }
-}
-
#[allow(non_snake_case)]
pub struct JavaClass<'a> {
pub class: JClass<'a>,
@@ -1404,6 +1389,50 @@ impl<'a> BlazeArrowFFIExporter<'a> {
}
}
+#[allow(non_snake_case)]
+pub struct BlazeFSDataInputWrapper<'a> {
+ pub class: JClass<'a>,
+ pub method_readFully: JMethodID,
+ pub method_readFully_ret: ReturnType,
+}
+
+impl<'a> BlazeFSDataInputWrapper<'a> {
+ pub const SIG_TYPE: &'static str = "org/apache/spark/blaze/FSDataInputWrapper";
+
+ pub fn new(env: &JNIEnv<'a>) -> JniResult> {
+ let class = get_global_jclass(env, Self::SIG_TYPE)?;
+ Ok(BlazeFSDataInputWrapper {
+ class,
+ method_readFully: env.get_method_id(class, "readFully", "(JLjava/nio/ByteBuffer;)V")?,
+ method_readFully_ret: ReturnType::Primitive(Primitive::Void),
+ })
+ }
+}
+
+#[allow(non_snake_case)]
+pub struct BlazeFSDataOutputWrapper<'a> {
+ pub class: JClass<'a>,
+ pub method_writeFully: JMethodID,
+ pub method_writeFully_ret: ReturnType,
+}
+
+impl<'a> BlazeFSDataOutputWrapper<'a> {
+ pub const SIG_TYPE: &'static str = "org/apache/spark/blaze/FSDataOutputWrapper";
+
+ pub fn new(env: &JNIEnv<'a>) -> JniResult> {
+ let class = get_global_jclass(env, Self::SIG_TYPE)?;
+ Ok(BlazeFSDataOutputWrapper {
+ class,
+ method_writeFully: env.get_method_id(
+ class,
+ "writeFully",
+ "(Ljava/nio/ByteBuffer;)V",
+ )?,
+ method_writeFully_ret: ReturnType::Primitive(Primitive::Void),
+ })
+ }
+}
+
fn get_global_jclass(env: &JNIEnv<'_>, cls: &str) -> JniResult> {
let local_jclass = env.find_class(cls)?;
Ok(get_global_ref_jobject(env, local_jclass.into())?.into())
diff --git a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
index 239e8bef0..fe6353aa8 100644
--- a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
+++ b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs
@@ -51,87 +51,79 @@ impl Fs {
Ok(())
}
- pub fn open(&self, path: &str) -> Result> {
+ pub fn open(&self, path: &str) -> Result> {
let _timer = self.io_time.timer();
- let path_str = jni_new_string!(path)?;
- let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?;
- let path = jni_new_object!(HadoopPath(path_uri.as_obj()))?;
- let fin = jni_call!(
- HadoopFileSystem(self.fs.as_obj()).open(path.as_obj()) -> JObject
+ let path = jni_new_string!(path)?;
+ let wrapper = jni_call_static!(
+ JniBridge.openFileAsDataInputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject
)?;
- Ok(Arc::new(FsDataInputStream {
- stream: jni_new_global_ref!(fin.as_obj())?,
+ Ok(Arc::new(FsDataInputWrapper {
+ obj: jni_new_global_ref!(wrapper.as_obj())?,
io_time: self.io_time.clone(),
}))
}
- pub fn create(&self, path: &str) -> Result> {
+ pub fn create(&self, path: &str) -> Result> {
let _timer = self.io_time.timer();
- let path_str = jni_new_string!(path)?;
- let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?;
- let path = jni_new_object!(HadoopPath(path_uri.as_obj()))?;
- let fin = jni_call!(
- HadoopFileSystem(self.fs.as_obj()).create(path.as_obj()) -> JObject
+ let path = jni_new_string!(path)?;
+ let wrapper = jni_call_static!(
+ JniBridge.createFileAsDataOutputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject
)?;
- Ok(Arc::new(FsDataOutputStream {
- stream: jni_new_global_ref!(fin.as_obj())?,
+ Ok(Arc::new(FsDataOutputWrapper {
+ obj: jni_new_global_ref!(wrapper.as_obj())?,
io_time: self.io_time.clone(),
}))
}
}
-pub struct FsDataInputStream {
- stream: GlobalRef,
+pub struct FsDataInputWrapper {
+ obj: GlobalRef,
io_time: Time,
}
-impl FsDataInputStream {
+impl FsDataInputWrapper {
pub fn read_fully(&self, pos: u64, buf: &mut [u8]) -> Result<()> {
let _timer = self.io_time.timer();
let buf = jni_new_direct_byte_buffer!(buf)?;
- jni_call_static!(JniUtil.readFullyFromFSDataInputStream(
- self.stream.as_obj(), pos as i64, buf.as_obj()) -> ()
- )?;
+ jni_call!(BlazeFSDataInputWrapper(self.obj.as_obj())
+ .readFully(pos as i64, buf.as_obj()) -> ())?;
Ok(())
}
}
-impl Drop for FsDataInputStream {
+impl Drop for FsDataInputWrapper {
fn drop(&mut self) {
let _timer = self.io_time.timer();
- if let Err(e) = jni_call!(JavaAutoCloseable(self.stream.as_obj()).close() -> ()) {
+ if let Err(e) = jni_call!(JavaAutoCloseable(self.obj.as_obj()).close() -> ()) {
log::warn!("error closing hadoop FSDataInputStream: {:?}", e);
}
}
}
-pub struct FsDataOutputStream {
- stream: GlobalRef,
+pub struct FsDataOutputWrapper {
+ obj: GlobalRef,
io_time: Time,
}
-impl FsDataOutputStream {
+impl FsDataOutputWrapper {
pub fn write_fully(&self, buf: &[u8]) -> Result<()> {
let _timer = self.io_time.timer();
let buf = jni_new_direct_byte_buffer!(buf)?;
-
- jni_call_static!(JniUtil.writeFullyToFSDataOutputStream(
- self.stream.as_obj(), buf.as_obj()) -> ()
- )?;
+ jni_call!(BlazeFSDataOutputWrapper(self.obj.as_obj()).writeFully(buf.as_obj()) -> ())?;
Ok(())
}
pub fn close(self) -> Result<()> {
- jni_call!(JavaAutoCloseable(self.stream.as_obj()).close() -> ())
+ jni_call!(JavaAutoCloseable(self.obj.as_obj()).close() -> ())
}
}
-impl Drop for FsDataOutputStream {
+impl Drop for FsDataOutputWrapper {
fn drop(&mut self) {
- let _ = jni_call!(JavaAutoCloseable(self.stream.as_obj()).close() -> ());
+ let _ = jni_call!(JavaAutoCloseable(self.obj.as_obj()).close() -> ());
}
}
diff --git a/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs b/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs
index 939f8383c..507d2baa2 100644
--- a/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs
+++ b/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs
@@ -22,7 +22,7 @@ use bytes::Bytes;
use datafusion::common::Result;
use datafusion_ext_commons::{
df_execution_err,
- hadoop_fs::{Fs, FsDataInputStream, FsProvider},
+ hadoop_fs::{Fs, FsDataInputWrapper, FsProvider},
};
use object_store::ObjectMeta;
use once_cell::sync::OnceCell;
@@ -31,7 +31,7 @@ pub struct InternalFileReader {
fs: Fs,
meta: ObjectMeta,
path: String,
- input: OnceCell>,
+ input: OnceCell>,
}
impl InternalFileReader {
@@ -53,7 +53,7 @@ impl InternalFileReader {
})
}
- fn get_input(&self) -> Result> {
+ fn get_input(&self) -> Result> {
let input = self
.input
.get_or_try_init(|| self.fs.open(&self.path))
diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
index 28e25916c..6ddc51113 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs
@@ -30,7 +30,7 @@ use datafusion::{
FileMeta, FileScanConfig, FileStream, OnError, ParquetFileMetrics,
ParquetFileReaderFactory,
},
- error::Result,
+ error::{DataFusionError, Result},
execution::context::TaskContext,
parquet::{
arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader},
@@ -51,6 +51,7 @@ use datafusion::{
use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider};
use fmt::Debug;
use futures::{future::BoxFuture, stream::once, FutureExt, StreamExt, TryStreamExt};
+use itertools::Itertools;
use object_store::ObjectMeta;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
@@ -335,6 +336,16 @@ impl ParquetFileReader {
}
impl AsyncFileReader for ParquetFileReaderRef {
+ fn get_bytes_sync(
+ &mut self,
+ range: Range,
+ ) -> datafusion::parquet::errors::Result {
+ self.0
+ .get_internal_reader()
+ .read_fully(range)
+ .map_err(|e| ParquetError::External(Box::new(e)))
+ }
+
fn get_bytes(
&mut self,
range: Range,
@@ -354,6 +365,84 @@ impl AsyncFileReader for ParquetFileReaderRef {
.boxed()
}
+ fn get_byte_ranges(
+ &mut self,
+ ranges: Vec>,
+ ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> {
+ const MAX_OVER_READ_SIZE: usize = 16384; // TODO: make it configurable
+ let num_ranges = ranges.len();
+ let (sorted_range_indices, sorted_ranges): (Vec, Vec>) = ranges
+ .into_iter()
+ .enumerate()
+ .sorted_unstable_by_key(|(_, r)| r.start)
+ .unzip();
+ let mut merged_ranges = vec![];
+
+ for range in sorted_ranges.iter().cloned() {
+ if merged_ranges.is_empty() {
+ merged_ranges.push(range);
+ continue;
+ }
+
+ let last_merged_range = merged_ranges.last_mut().unwrap();
+ if range.start <= last_merged_range.end + MAX_OVER_READ_SIZE {
+ last_merged_range.end = range.end.max(last_merged_range.end);
+ } else {
+ merged_ranges.push(range);
+ }
+ }
+
+ async move {
+ let inner = self.0.clone();
+ let merged_bytes = Arc::new(Mutex::new(Vec::with_capacity(merged_ranges.len())));
+ let merged_bytes_cloned = merged_bytes.clone();
+ let merged_ranges_cloned = merged_ranges.clone();
+ tokio::task::spawn_blocking(move || {
+ let merged_bytes = &mut *merged_bytes_cloned.lock();
+ for range in merged_ranges_cloned {
+ inner.metrics.bytes_scanned.add(range.len());
+ if range.is_empty() {
+ merged_bytes.push(Bytes::new());
+ continue;
+ }
+ let bytes = inner.get_internal_reader().read_fully(range)?;
+ merged_bytes.push(bytes);
+ }
+ Ok::<_, DataFusionError>(())
+ })
+ .await
+ .expect("tokio spawn_blocking error")
+ .map_err(|e| ParquetError::External(Box::new(e)))?;
+
+ let merged_bytes = &*merged_bytes.lock();
+ let mut sorted_range_bytes = Vec::with_capacity(num_ranges);
+ let mut m = 0;
+ for range in sorted_ranges {
+ if range.is_empty() {
+ sorted_range_bytes.push(Bytes::new());
+ continue;
+ }
+ while merged_ranges[m].end <= range.start {
+ m += 1;
+ }
+ let len = range.len();
+ if len < merged_ranges[m].len() {
+ let offset = range.start - merged_ranges[m].start;
+ sorted_range_bytes.push(merged_bytes[m].slice(offset..offset + len));
+ } else {
+ sorted_range_bytes.push(merged_bytes[m].clone());
+ }
+ }
+
+ let mut range_bytes = Vec::with_capacity(num_ranges);
+ for i in 0..num_ranges {
+ range_bytes.push(sorted_range_bytes[sorted_range_indices[i]].clone());
+ }
+ Ok(range_bytes)
+ }
+ .boxed()
+ }
+
fn get_metadata(
&mut self,
) -> BoxFuture<'_, datafusion::parquet::errors::Result>> {
diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
index 8b00a7d1c..215fbe025 100644
--- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs
@@ -43,7 +43,7 @@ use datafusion_ext_commons::{
array_size::ArraySize,
cast::cast,
df_execution_err,
- hadoop_fs::{FsDataOutputStream, FsProvider},
+ hadoop_fs::{FsDataOutputWrapper, FsProvider},
};
use futures::{stream::once, StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
@@ -542,12 +542,12 @@ fn get_dyn_part_values(
// Write wrapper for FSDataOutputStream
struct FSDataWriter {
- inner: FsDataOutputStream,
+ inner: FsDataOutputWrapper,
bytes_written: Count,
}
impl FSDataWriter {
- pub fn new(inner: FsDataOutputStream, bytes_written: &Count) -> Self {
+ pub fn new(inner: FsDataOutputWrapper, bytes_written: &Count) -> Self {
Self {
inner,
bytes_written: bytes_written.clone(),
diff --git a/pom.xml b/pom.xml
index ad0f004c0..d847e35a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,6 +9,7 @@
spark-extension
${shimPkg}
+ hadoop-shim
dev/mvn-build-helper/assembly
dev/mvn-build-helper/proto
@@ -18,7 +19,6 @@
UTF-8
16.0.0
3.21.9
- 3.4.0
@@ -43,11 +43,6 @@
spark-sql_${scalaVersion}
${sparkVersion}
-
- org.apache.hadoop
- hadoop-client-api
- ${hadoopClientApiVersion}
-
org.apache.arrow
arrow-c-data
diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml
index 7a58c8cce..88763cd3b 100644
--- a/spark-extension/pom.xml
+++ b/spark-extension/pom.xml
@@ -55,9 +55,9 @@
arrow-vector
- org.apache.hadoop
- hadoop-client-api
- provided
+ org.blaze
+ hadoop-shim
+ ${revision}
io.netty
diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java
index f4b024f26..644a22ae9 100644
--- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java
+++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java
@@ -19,9 +19,16 @@
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
+import org.apache.spark.blaze.FSDataInputWrapper;
+import org.apache.spark.blaze.FSDataInputWrapper$;
+import org.apache.spark.blaze.FSDataOutputWrapper;
+import org.apache.spark.blaze.FSDataOutputWrapper$;
import org.apache.spark.sql.blaze.memory.OnHeapSpillManager;
import org.apache.spark.sql.blaze.memory.OnHeapSpillManager$;
@@ -78,6 +85,18 @@ public static boolean isDriverSide() {
return tc == null;
}
+ public static FSDataInputWrapper openFileAsDataInputWrapper(
+ FileSystem fs,
+ String path) throws Exception {
+ return FSDataInputWrapper$.MODULE$.wrap(fs.open(new Path(path)));
+ }
+
+ public static FSDataOutputWrapper createFileAsDataOutputWrapper(
+ FileSystem fs,
+ String path) throws Exception {
+ return FSDataOutputWrapper$.MODULE$.wrap(fs.create(new Path(path)));
+ }
+
private static final List directMXBeans =
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniUtil.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniUtil.java
deleted file mode 100644
index 4ef279954..000000000
--- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniUtil.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2022 The Blaze Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.blaze;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import org.apache.hadoop.fs.ByteBufferPositionedReadable;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-@SuppressWarnings("unused")
-public class JniUtil {
- static ReadImpl readImpl;
- static ReadImpl readFallback = new ReadFailback();
-
- static {
- try {
- readImpl = new ReadUsingByteBufferPositionedReadable();
- } catch (NoClassDefFoundError e) {
- readImpl = readFallback;
- }
- }
-
- public static void readFullyFromFSDataInputStream(FSDataInputStream in, long pos, ByteBuffer buf)
- throws IOException {
- try {
- readFallback.read(in, pos, buf);
- } catch (UnsupportedOperationException e) {
- readImpl.read(in, pos, buf);
- }
- }
-
- public static void writeFullyToFSDataOutputStream(FSDataOutputStream out, ByteBuffer buf) throws IOException {
- synchronized (out) {
- WritableByteChannel channel = Channels.newChannel(out);
-
- while (buf.hasRemaining()) {
- if (channel.write(buf) == -1) {
- throw new EOFException("writeFullyToFSDataOutputStream() got unexpected EOF");
- }
- }
- }
- }
-
- private interface ReadImpl {
- void read(FSDataInputStream in, long pos, ByteBuffer buf) throws IOException;
- }
-
- private static class ReadUsingByteBufferPositionedReadable implements ReadImpl {
- @Override
- public void read(FSDataInputStream in, long pos, ByteBuffer buf) throws IOException {
- while (buf.hasRemaining()) {
- if (((ByteBufferPositionedReadable) in).read(pos, buf) == -1) {
- throw new EOFException("readFullyFromFSDataInputStream() got unexpected EOF");
- }
- pos += buf.position();
- }
- }
- }
-
- private static class ReadFailback implements ReadImpl {
- @Override
- public void read(FSDataInputStream in, long pos, ByteBuffer buf) throws IOException {
- synchronized (in) {
- in.seek(pos);
- while (buf.hasRemaining()) {
- ReadableByteChannel channel = Channels.newChannel(in);
- if (channel.read(buf) == -1) {
- throw new EOFException("readFullyFromFSDataInputStream() got unexpected EOF");
- }
- }
- }
- }
- }
-}
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala
index bfd8ef0b5..32f4d27e5 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala
@@ -188,15 +188,12 @@ object BlazeCallNativeWrapper extends Logging {
s"nativeMemory=${NativeHelper.nativeMemory}, " +
s"memoryFraction=${BlazeConf.MEMORY_FRACTION.doubleConf()})")
+ assert(classOf[JniBridge] != null) // preload JNI bridge classes
BlazeCallNativeWrapper.loadLibBlaze()
ShutdownHookManager.addShutdownHook(() => JniBridge.onExit())
}
private def loadLibBlaze(): Unit = {
- // preload JNI bridge classes
- Class.forName(classOf[JniBridge].getName)
- Class.forName(classOf[JniUtil].getName)
-
val libName = System.mapLibraryName("blaze")
try {
val classLoader = classOf[NativeSupports].getClassLoader