diff --git a/Cargo.lock b/Cargo.lock index 82bb7454b..a277146f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,7 +97,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-arith", "arrow-array", @@ -117,7 +117,7 @@ dependencies = [ [[package]] name = "arrow-arith" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -131,7 +131,7 @@ dependencies = [ [[package]] name = "arrow-array" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "ahash", "arrow-buffer", @@ -147,7 +147,7 @@ dependencies = [ [[package]] name = "arrow-buffer" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "bytes", "half", @@ -157,7 +157,7 @@ dependencies = [ [[package]] name = "arrow-cast" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -177,7 +177,7 @@ dependencies = [ [[package]] name = "arrow-csv" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -195,7 +195,7 @@ dependencies = [ [[package]] name = "arrow-data" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-buffer", "arrow-schema", @@ -206,7 +206,7 @@ dependencies = [ [[package]] name = "arrow-ipc" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -220,7 +220,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -239,7 +239,7 @@ dependencies = [ [[package]] name = "arrow-ord" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -253,7 +253,7 @@ dependencies = [ [[package]] name = "arrow-row" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "ahash", "arrow-array", @@ -266,7 +266,7 @@ dependencies = [ [[package]] name = "arrow-schema" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "bitflags 2.6.0", "serde", @@ -275,7 +275,7 @@ dependencies = [ [[package]] name = "arrow-select" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "ahash", "arrow-array", @@ -288,7 +288,7 @@ dependencies = [ [[package]] name = "arrow-string" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "arrow-array", "arrow-buffer", @@ -750,7 +750,7 @@ dependencies = [ [[package]] name = "datafusion" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -806,7 +806,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow-schema", "async-trait", @@ -820,7 +820,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -843,7 +843,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "log", "tokio", @@ -852,7 +852,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow", "chrono", @@ -872,7 +872,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -893,7 +893,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow", "datafusion-common", @@ -1012,7 +1012,7 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow", "arrow-buffer", @@ -1038,7 +1038,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -1058,7 +1058,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -1071,7 +1071,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow", "arrow-array", @@ -1093,7 +1093,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "datafusion-common", "datafusion-expr", @@ -1104,7 +1104,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow", "async-trait", @@ -1123,7 +1123,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -1154,7 +1154,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -1167,7 +1167,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow-schema", "datafusion-common", @@ -1180,7 +1180,7 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "ahash", "arrow", @@ -1214,7 +1214,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "42.0.0" -source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=9a09e14#9a09e14f674378a0d551ce3bb76471d327bafefb" +source = "git+https://github.com/blaze-init/arrow-datafusion.git?rev=2bc42ea73#2bc42ea73920afa3731f90f4bb8e7372fef370a8" dependencies = [ "arrow", "arrow-array", @@ -1945,7 +1945,7 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "orc-rust" version = "0.4.1" -source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=9c74ac3#9c74ac3779c2ebb371404d71adb915b49eee1930" +source = "git+https://github.com/blaze-init/datafusion-orc.git?rev=7833d7d#7833d7dcbb22b613d882b9217a1ca35fb9e9bd70" dependencies = [ "arrow", "async-trait", @@ -2008,7 +2008,7 @@ dependencies = [ [[package]] name = "parquet" version = "53.0.0" -source = "git+https://github.com/blaze-init/arrow-rs.git?rev=9dbfd9018e#9dbfd9018e2574748000d4ee689e33d946f9671f" +source = "git+https://github.com/blaze-init/arrow-rs.git?rev=91dc27dedf#91dc27dedf0a635e0caaeb8ca3103ef3f496154a" dependencies = [ "ahash", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 9b7700c25..05e2bbb29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,27 +66,27 @@ serde_json = { version = "1.0.96" } [patch.crates-io] # datafusion: branch=v42-blaze -datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "9a09e14"} -orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "9c74ac3"} +datafusion = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} +datafusion-common = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} +datafusion-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} +datafusion-execution = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} +datafusion-optimizer = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} +datafusion-physical-expr = { git = "https://github.com/blaze-init/arrow-datafusion.git", rev = "2bc42ea73"} +orc-rust = { git = "https://github.com/blaze-init/datafusion-orc.git", rev = "7833d7d"} # arrow: branch=v53-blaze -arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} -parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "9dbfd9018e"} +arrow = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-arith = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-array = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-buffer = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-cast = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-data = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-ord = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-row = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-schema = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-select = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +arrow-string = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} +parquet = { git = "https://github.com/blaze-init/arrow-rs.git", rev = "91dc27dedf"} # serde_json: branch=v1.0.96-blaze serde_json = { git = "https://github.com/blaze-init/json", branch = "v1.0.96-blaze" } diff --git a/hadoop-shim/pom.xml b/hadoop-shim/pom.xml new file mode 100644 index 000000000..3c2f906d4 --- /dev/null +++ b/hadoop-shim/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + + org.blaze + blaze-engine + ${revision} + ../ + + org.blaze + hadoop-shim + jar + + + + org.scala-lang + scala-library + provided + + + org.scala-lang.modules + scala-java8-compat_2.12 + 0.9.1 + + + org.scalatest + scalatest_${scalaVersion} + test + + + org.apache.hadoop + hadoop-client-api + 3.4.0 + provided + + + diff --git a/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataInputWrapper.scala b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataInputWrapper.scala new file mode 100644 index 000000000..97896eae1 --- /dev/null +++ b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataInputWrapper.scala @@ -0,0 +1,58 @@ +package org.apache.spark.blaze + +import java.io.EOFException +import java.nio.ByteBuffer +import java.nio.channels.Channels + +import org.apache.hadoop.fs.FSDataInputStream +import org.apache.hadoop.fs.StreamCapabilities + +trait FSDataInputWrapper extends AutoCloseable { + def readFully(pos: Long, buf: ByteBuffer): Unit +} + +object FSDataInputWrapper { + def wrap(input: FSDataInputStream): FSDataInputWrapper = { + if (canUsePositionedReadable(input)) { + new PositionedReadableFSDataInputWrapper(input) + } else { + new SeekableFSDataInputWrapper(input) + } + } + + private def canUsePositionedReadable(input: FSDataInputStream): Boolean = { + try { + input.getClass.getMethod("readFully", classOf[Long], classOf[ByteBuffer]) != null && + input.hasCapability(StreamCapabilities.PREADBYTEBUFFER) + } catch { + case _: Throwable => false + } + } +} + +class PositionedReadableFSDataInputWrapper(input: FSDataInputStream) extends FSDataInputWrapper { + override def readFully(pos: Long, buf: ByteBuffer): Unit = { + input.readFully(pos, buf) + if (buf.hasRemaining) { + throw new EOFException(s"cannot read more ${buf.remaining()} bytes") + } + } + + override def close(): Unit = input.close() +} + +class SeekableFSDataInputWrapper(input: FSDataInputStream) extends FSDataInputWrapper { + override def readFully(pos: Long, buf: ByteBuffer): Unit = { + input.synchronized { + input.seek(pos) + while (buf.hasRemaining) { + val channel = Channels.newChannel(input) + if (channel.read(buf) == -1) { + throw new EOFException(s"cannot read more ${buf.remaining()} bytes") + } + } + } + } + + override def close(): Unit = input.close() +} \ No newline at end of file diff --git a/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataOutputWrapper.scala b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataOutputWrapper.scala new file mode 100644 index 000000000..7b1531e9e --- /dev/null +++ b/hadoop-shim/src/main/scala/org/apache/spark/blaze/FSDataOutputWrapper.scala @@ -0,0 +1,30 @@ +package org.apache.spark.blaze + +import java.io.EOFException +import java.nio.ByteBuffer +import java.nio.channels.Channels + +import org.apache.hadoop.fs.FSDataOutputStream + +trait FSDataOutputWrapper extends AutoCloseable { + def writeFully(buf: ByteBuffer): Unit +} + +object FSDataOutputWrapper { + def wrap(output: FSDataOutputStream): FSDataOutputWrapper = { + new SeekableFSDataOutputWrapper(output) + } +} + +class SeekableFSDataOutputWrapper(output: FSDataOutputStream) extends FSDataOutputWrapper { + override def writeFully(buf: ByteBuffer): Unit = { + output.synchronized { + val channel = Channels.newChannel(output) + while (buf.hasRemaining) if (channel.write(buf) == -1) { + throw new EOFException("writeFullyToFSDataOutputStream() got unexpected EOF") + } + } + } + + override def close(): Unit = output.close() +} \ No newline at end of file diff --git a/native-engine/blaze-jni-bridge/src/jni_bridge.rs b/native-engine/blaze-jni-bridge/src/jni_bridge.rs index 5e7d2c96d..260e76231 100644 --- a/native-engine/blaze-jni-bridge/src/jni_bridge.rs +++ b/native-engine/blaze-jni-bridge/src/jni_bridge.rs @@ -387,7 +387,6 @@ pub struct JavaClasses<'a> { pub classloader: JObject<'a>, pub cJniBridge: JniBridge<'a>, - pub cJniUtil: JniUtil<'a>, pub cClass: JavaClass<'a>, pub cJavaThrowable: JavaThrowable<'a>, pub cJavaRuntimeException: JavaRuntimeException<'a>, @@ -406,7 +405,6 @@ pub struct JavaClasses<'a> { pub cHadoopFileSystem: HadoopFileSystem<'a>, pub cHadoopPath: HadoopPath<'a>, - pub cHadoopFSDataInputStream: HadoopFSDataInputStream<'a>, pub cSparkFileSegment: SparkFileSegment<'a>, pub cSparkSQLMetric: SparkSQLMetric<'a>, @@ -420,6 +418,8 @@ pub struct JavaClasses<'a> { pub cBlazeNativeParquetSinkUtils: BlazeNativeParquetSinkUtils<'a>, pub cBlazeBlockObject: BlazeBlockObject<'a>, pub cBlazeArrowFFIExporter: BlazeArrowFFIExporter<'a>, + pub cBlazeFSDataInputWrapper: BlazeFSDataInputWrapper<'a>, + pub cBlazeFSDataOutputWrapper: BlazeFSDataOutputWrapper<'a>, } #[allow(clippy::non_send_fields_in_send_ty)] @@ -447,7 +447,6 @@ impl JavaClasses<'static> { jvm: env.get_java_vm()?, classloader: get_global_ref_jobject(env, classloader)?, cJniBridge: jni_bridge, - cJniUtil: JniUtil::new(env)?, cClass: JavaClass::new(env)?, cJavaThrowable: JavaThrowable::new(env)?, @@ -467,7 +466,6 @@ impl JavaClasses<'static> { cHadoopFileSystem: HadoopFileSystem::new(env)?, cHadoopPath: HadoopPath::new(env)?, - cHadoopFSDataInputStream: HadoopFSDataInputStream::new(env)?, cSparkFileSegment: SparkFileSegment::new(env)?, cSparkSQLMetric: SparkSQLMetric::new(env)?, @@ -481,6 +479,8 @@ impl JavaClasses<'static> { cBlazeNativeParquetSinkUtils: BlazeNativeParquetSinkUtils::new(env)?, cBlazeBlockObject: BlazeBlockObject::new(env)?, cBlazeArrowFFIExporter: BlazeArrowFFIExporter::new(env)?, + cBlazeFSDataInputWrapper: BlazeFSDataInputWrapper::new(env)?, + cBlazeFSDataOutputWrapper: BlazeFSDataOutputWrapper::new(env)?, }; log::info!("Initializing JavaClasses finished"); Ok(java_classes) @@ -527,6 +527,10 @@ pub struct JniBridge<'a> { pub method_isTaskRunning_ret: ReturnType, pub method_isDriverSide: JStaticMethodID, pub method_isDriverSide_ret: ReturnType, + pub method_openFileAsDataInputWrapper: JStaticMethodID, + pub method_openFileAsDataInputWrapper_ret: ReturnType, + pub method_createFileAsDataOutputWrapper: JStaticMethodID, + pub method_createFileAsDataOutputWrapper_ret: ReturnType, pub method_getDirectMemoryUsed: JStaticMethodID, pub method_getDirectMemoryUsed_ret: ReturnType, pub method_getDirectWriteSpillToDiskFile: JStaticMethodID, @@ -584,6 +588,18 @@ impl<'a> JniBridge<'a> { method_isTaskRunning: env.get_static_method_id(class, "isTaskRunning", "()Z")?, method_isTaskRunning_ret: ReturnType::Primitive(Primitive::Boolean), method_isDriverSide: env.get_static_method_id(class, "isDriverSide", "()Z")?, + method_openFileAsDataInputWrapper: env.get_static_method_id( + class, + "openFileAsDataInputWrapper", + "(Lorg/apache/hadoop/fs/FileSystem;Ljava/lang/String;)Lorg/apache/spark/blaze/FSDataInputWrapper;", + )?, + method_openFileAsDataInputWrapper_ret: ReturnType::Object, + method_createFileAsDataOutputWrapper: env.get_static_method_id( + class, + "createFileAsDataOutputWrapper", + "(Lorg/apache/hadoop/fs/FileSystem;Ljava/lang/String;)Lorg/apache/spark/blaze/FSDataOutputWrapper;", + )?, + method_createFileAsDataOutputWrapper_ret: ReturnType::Object, method_isDriverSide_ret: ReturnType::Primitive(Primitive::Boolean), method_getDirectMemoryUsed: env.get_static_method_id( class, @@ -601,37 +617,6 @@ impl<'a> JniBridge<'a> { } } -#[allow(non_snake_case)] -pub struct JniUtil<'a> { - pub class: JClass<'a>, - pub method_readFullyFromFSDataInputStream: JStaticMethodID, - pub method_readFullyFromFSDataInputStream_ret: ReturnType, - pub method_writeFullyToFSDataOutputStream: JStaticMethodID, - pub method_writeFullyToFSDataOutputStream_ret: ReturnType, -} -impl<'a> JniUtil<'a> { - pub const SIG_TYPE: &'static str = "org/apache/spark/sql/blaze/JniUtil"; - - pub fn new(env: &JNIEnv<'a>) -> JniResult> { - let class = get_global_jclass(env, Self::SIG_TYPE)?; - Ok(JniUtil { - class, - method_readFullyFromFSDataInputStream: env.get_static_method_id( - class, - "readFullyFromFSDataInputStream", - "(Lorg/apache/hadoop/fs/FSDataInputStream;JLjava/nio/ByteBuffer;)V", - )?, - method_readFullyFromFSDataInputStream_ret: ReturnType::Primitive(Primitive::Void), - method_writeFullyToFSDataOutputStream: env.get_static_method_id( - class, - "writeFullyToFSDataOutputStream", - "(Lorg/apache/hadoop/fs/FSDataOutputStream;Ljava/nio/ByteBuffer;)V", - )?, - method_writeFullyToFSDataOutputStream_ret: ReturnType::Primitive(Primitive::Void), - }) - } -} - #[allow(non_snake_case)] pub struct JavaClass<'a> { pub class: JClass<'a>, @@ -1404,6 +1389,50 @@ impl<'a> BlazeArrowFFIExporter<'a> { } } +#[allow(non_snake_case)] +pub struct BlazeFSDataInputWrapper<'a> { + pub class: JClass<'a>, + pub method_readFully: JMethodID, + pub method_readFully_ret: ReturnType, +} + +impl<'a> BlazeFSDataInputWrapper<'a> { + pub const SIG_TYPE: &'static str = "org/apache/spark/blaze/FSDataInputWrapper"; + + pub fn new(env: &JNIEnv<'a>) -> JniResult> { + let class = get_global_jclass(env, Self::SIG_TYPE)?; + Ok(BlazeFSDataInputWrapper { + class, + method_readFully: env.get_method_id(class, "readFully", "(JLjava/nio/ByteBuffer;)V")?, + method_readFully_ret: ReturnType::Primitive(Primitive::Void), + }) + } +} + +#[allow(non_snake_case)] +pub struct BlazeFSDataOutputWrapper<'a> { + pub class: JClass<'a>, + pub method_writeFully: JMethodID, + pub method_writeFully_ret: ReturnType, +} + +impl<'a> BlazeFSDataOutputWrapper<'a> { + pub const SIG_TYPE: &'static str = "org/apache/spark/blaze/FSDataOutputWrapper"; + + pub fn new(env: &JNIEnv<'a>) -> JniResult> { + let class = get_global_jclass(env, Self::SIG_TYPE)?; + Ok(BlazeFSDataOutputWrapper { + class, + method_writeFully: env.get_method_id( + class, + "writeFully", + "(Ljava/nio/ByteBuffer;)V", + )?, + method_writeFully_ret: ReturnType::Primitive(Primitive::Void), + }) + } +} + fn get_global_jclass(env: &JNIEnv<'_>, cls: &str) -> JniResult> { let local_jclass = env.find_class(cls)?; Ok(get_global_ref_jobject(env, local_jclass.into())?.into()) diff --git a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs index 239e8bef0..fe6353aa8 100644 --- a/native-engine/datafusion-ext-commons/src/hadoop_fs.rs +++ b/native-engine/datafusion-ext-commons/src/hadoop_fs.rs @@ -51,87 +51,79 @@ impl Fs { Ok(()) } - pub fn open(&self, path: &str) -> Result> { + pub fn open(&self, path: &str) -> Result> { let _timer = self.io_time.timer(); - let path_str = jni_new_string!(path)?; - let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?; - let path = jni_new_object!(HadoopPath(path_uri.as_obj()))?; - let fin = jni_call!( - HadoopFileSystem(self.fs.as_obj()).open(path.as_obj()) -> JObject + let path = jni_new_string!(path)?; + let wrapper = jni_call_static!( + JniBridge.openFileAsDataInputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject )?; - Ok(Arc::new(FsDataInputStream { - stream: jni_new_global_ref!(fin.as_obj())?, + Ok(Arc::new(FsDataInputWrapper { + obj: jni_new_global_ref!(wrapper.as_obj())?, io_time: self.io_time.clone(), })) } - pub fn create(&self, path: &str) -> Result> { + pub fn create(&self, path: &str) -> Result> { let _timer = self.io_time.timer(); - let path_str = jni_new_string!(path)?; - let path_uri = jni_new_object!(JavaURI(path_str.as_obj()))?; - let path = jni_new_object!(HadoopPath(path_uri.as_obj()))?; - let fin = jni_call!( - HadoopFileSystem(self.fs.as_obj()).create(path.as_obj()) -> JObject + let path = jni_new_string!(path)?; + let wrapper = jni_call_static!( + JniBridge.createFileAsDataOutputWrapper(self.fs.as_obj(), path.as_obj()) -> JObject )?; - Ok(Arc::new(FsDataOutputStream { - stream: jni_new_global_ref!(fin.as_obj())?, + Ok(Arc::new(FsDataOutputWrapper { + obj: jni_new_global_ref!(wrapper.as_obj())?, io_time: self.io_time.clone(), })) } } -pub struct FsDataInputStream { - stream: GlobalRef, +pub struct FsDataInputWrapper { + obj: GlobalRef, io_time: Time, } -impl FsDataInputStream { +impl FsDataInputWrapper { pub fn read_fully(&self, pos: u64, buf: &mut [u8]) -> Result<()> { let _timer = self.io_time.timer(); let buf = jni_new_direct_byte_buffer!(buf)?; - jni_call_static!(JniUtil.readFullyFromFSDataInputStream( - self.stream.as_obj(), pos as i64, buf.as_obj()) -> () - )?; + jni_call!(BlazeFSDataInputWrapper(self.obj.as_obj()) + .readFully(pos as i64, buf.as_obj()) -> ())?; Ok(()) } } -impl Drop for FsDataInputStream { +impl Drop for FsDataInputWrapper { fn drop(&mut self) { let _timer = self.io_time.timer(); - if let Err(e) = jni_call!(JavaAutoCloseable(self.stream.as_obj()).close() -> ()) { + if let Err(e) = jni_call!(JavaAutoCloseable(self.obj.as_obj()).close() -> ()) { log::warn!("error closing hadoop FSDataInputStream: {:?}", e); } } } -pub struct FsDataOutputStream { - stream: GlobalRef, +pub struct FsDataOutputWrapper { + obj: GlobalRef, io_time: Time, } -impl FsDataOutputStream { +impl FsDataOutputWrapper { pub fn write_fully(&self, buf: &[u8]) -> Result<()> { let _timer = self.io_time.timer(); let buf = jni_new_direct_byte_buffer!(buf)?; - - jni_call_static!(JniUtil.writeFullyToFSDataOutputStream( - self.stream.as_obj(), buf.as_obj()) -> () - )?; + jni_call!(BlazeFSDataOutputWrapper(self.obj.as_obj()).writeFully(buf.as_obj()) -> ())?; Ok(()) } pub fn close(self) -> Result<()> { - jni_call!(JavaAutoCloseable(self.stream.as_obj()).close() -> ()) + jni_call!(JavaAutoCloseable(self.obj.as_obj()).close() -> ()) } } -impl Drop for FsDataOutputStream { +impl Drop for FsDataOutputWrapper { fn drop(&mut self) { - let _ = jni_call!(JavaAutoCloseable(self.stream.as_obj()).close() -> ()); + let _ = jni_call!(JavaAutoCloseable(self.obj.as_obj()).close() -> ()); } } diff --git a/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs b/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs index 939f8383c..507d2baa2 100644 --- a/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs +++ b/native-engine/datafusion-ext-plans/src/common/internal_file_reader.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use datafusion::common::Result; use datafusion_ext_commons::{ df_execution_err, - hadoop_fs::{Fs, FsDataInputStream, FsProvider}, + hadoop_fs::{Fs, FsDataInputWrapper, FsProvider}, }; use object_store::ObjectMeta; use once_cell::sync::OnceCell; @@ -31,7 +31,7 @@ pub struct InternalFileReader { fs: Fs, meta: ObjectMeta, path: String, - input: OnceCell>, + input: OnceCell>, } impl InternalFileReader { @@ -53,7 +53,7 @@ impl InternalFileReader { }) } - fn get_input(&self) -> Result> { + fn get_input(&self) -> Result> { let input = self .input .get_or_try_init(|| self.fs.open(&self.path)) diff --git a/native-engine/datafusion-ext-plans/src/parquet_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_exec.rs index 28e25916c..6ddc51113 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_exec.rs @@ -30,7 +30,7 @@ use datafusion::{ FileMeta, FileScanConfig, FileStream, OnError, ParquetFileMetrics, ParquetFileReaderFactory, }, - error::Result, + error::{DataFusionError, Result}, execution::context::TaskContext, parquet::{ arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader}, @@ -51,6 +51,7 @@ use datafusion::{ use datafusion_ext_commons::{batch_size, hadoop_fs::FsProvider}; use fmt::Debug; use futures::{future::BoxFuture, stream::once, FutureExt, StreamExt, TryStreamExt}; +use itertools::Itertools; use object_store::ObjectMeta; use once_cell::sync::OnceCell; use parking_lot::Mutex; @@ -335,6 +336,16 @@ impl ParquetFileReader { } impl AsyncFileReader for ParquetFileReaderRef { + fn get_bytes_sync( + &mut self, + range: Range, + ) -> datafusion::parquet::errors::Result { + self.0 + .get_internal_reader() + .read_fully(range) + .map_err(|e| ParquetError::External(Box::new(e))) + } + fn get_bytes( &mut self, range: Range, @@ -354,6 +365,84 @@ impl AsyncFileReader for ParquetFileReaderRef { .boxed() } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { + const MAX_OVER_READ_SIZE: usize = 16384; // TODO: make it configurable + let num_ranges = ranges.len(); + let (sorted_range_indices, sorted_ranges): (Vec, Vec>) = ranges + .into_iter() + .enumerate() + .sorted_unstable_by_key(|(_, r)| r.start) + .unzip(); + let mut merged_ranges = vec![]; + + for range in sorted_ranges.iter().cloned() { + if merged_ranges.is_empty() { + merged_ranges.push(range); + continue; + } + + let last_merged_range = merged_ranges.last_mut().unwrap(); + if range.start <= last_merged_range.end + MAX_OVER_READ_SIZE { + last_merged_range.end = range.end.max(last_merged_range.end); + } else { + merged_ranges.push(range); + } + } + + async move { + let inner = self.0.clone(); + let merged_bytes = Arc::new(Mutex::new(Vec::with_capacity(merged_ranges.len()))); + let merged_bytes_cloned = merged_bytes.clone(); + let merged_ranges_cloned = merged_ranges.clone(); + tokio::task::spawn_blocking(move || { + let merged_bytes = &mut *merged_bytes_cloned.lock(); + for range in merged_ranges_cloned { + inner.metrics.bytes_scanned.add(range.len()); + if range.is_empty() { + merged_bytes.push(Bytes::new()); + continue; + } + let bytes = inner.get_internal_reader().read_fully(range)?; + merged_bytes.push(bytes); + } + Ok::<_, DataFusionError>(()) + }) + .await + .expect("tokio spawn_blocking error") + .map_err(|e| ParquetError::External(Box::new(e)))?; + + let merged_bytes = &*merged_bytes.lock(); + let mut sorted_range_bytes = Vec::with_capacity(num_ranges); + let mut m = 0; + for range in sorted_ranges { + if range.is_empty() { + sorted_range_bytes.push(Bytes::new()); + continue; + } + while merged_ranges[m].end <= range.start { + m += 1; + } + let len = range.len(); + if len < merged_ranges[m].len() { + let offset = range.start - merged_ranges[m].start; + sorted_range_bytes.push(merged_bytes[m].slice(offset..offset + len)); + } else { + sorted_range_bytes.push(merged_bytes[m].clone()); + } + } + + let mut range_bytes = Vec::with_capacity(num_ranges); + for i in 0..num_ranges { + range_bytes.push(sorted_range_bytes[sorted_range_indices[i]].clone()); + } + Ok(range_bytes) + } + .boxed() + } + fn get_metadata( &mut self, ) -> BoxFuture<'_, datafusion::parquet::errors::Result>> { diff --git a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs index 8b00a7d1c..215fbe025 100644 --- a/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs +++ b/native-engine/datafusion-ext-plans/src/parquet_sink_exec.rs @@ -43,7 +43,7 @@ use datafusion_ext_commons::{ array_size::ArraySize, cast::cast, df_execution_err, - hadoop_fs::{FsDataOutputStream, FsProvider}, + hadoop_fs::{FsDataOutputWrapper, FsProvider}, }; use futures::{stream::once, StreamExt, TryStreamExt}; use once_cell::sync::OnceCell; @@ -542,12 +542,12 @@ fn get_dyn_part_values( // Write wrapper for FSDataOutputStream struct FSDataWriter { - inner: FsDataOutputStream, + inner: FsDataOutputWrapper, bytes_written: Count, } impl FSDataWriter { - pub fn new(inner: FsDataOutputStream, bytes_written: &Count) -> Self { + pub fn new(inner: FsDataOutputWrapper, bytes_written: &Count) -> Self { Self { inner, bytes_written: bytes_written.clone(), diff --git a/pom.xml b/pom.xml index ad0f004c0..d847e35a1 100644 --- a/pom.xml +++ b/pom.xml @@ -9,6 +9,7 @@ spark-extension ${shimPkg} + hadoop-shim dev/mvn-build-helper/assembly dev/mvn-build-helper/proto @@ -18,7 +19,6 @@ UTF-8 16.0.0 3.21.9 - 3.4.0 @@ -43,11 +43,6 @@ spark-sql_${scalaVersion} ${sparkVersion} - - org.apache.hadoop - hadoop-client-api - ${hadoopClientApiVersion} - org.apache.arrow arrow-c-data diff --git a/spark-extension/pom.xml b/spark-extension/pom.xml index 7a58c8cce..88763cd3b 100644 --- a/spark-extension/pom.xml +++ b/spark-extension/pom.xml @@ -55,9 +55,9 @@ arrow-vector - org.apache.hadoop - hadoop-client-api - provided + org.blaze + hadoop-shim + ${revision} io.netty diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java index f4b024f26..644a22ae9 100644 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java +++ b/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniBridge.java @@ -19,9 +19,16 @@ import java.lang.management.ManagementFactory; import java.util.List; import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; +import org.apache.spark.blaze.FSDataInputWrapper; +import org.apache.spark.blaze.FSDataInputWrapper$; +import org.apache.spark.blaze.FSDataOutputWrapper; +import org.apache.spark.blaze.FSDataOutputWrapper$; import org.apache.spark.sql.blaze.memory.OnHeapSpillManager; import org.apache.spark.sql.blaze.memory.OnHeapSpillManager$; @@ -78,6 +85,18 @@ public static boolean isDriverSide() { return tc == null; } + public static FSDataInputWrapper openFileAsDataInputWrapper( + FileSystem fs, + String path) throws Exception { + return FSDataInputWrapper$.MODULE$.wrap(fs.open(new Path(path))); + } + + public static FSDataOutputWrapper createFileAsDataOutputWrapper( + FileSystem fs, + String path) throws Exception { + return FSDataOutputWrapper$.MODULE$.wrap(fs.create(new Path(path))); + } + private static final List directMXBeans = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); diff --git a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniUtil.java b/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniUtil.java deleted file mode 100644 index 4ef279954..000000000 --- a/spark-extension/src/main/java/org/apache/spark/sql/blaze/JniUtil.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2022 The Blaze Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.blaze; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; - -@SuppressWarnings("unused") -public class JniUtil { - static ReadImpl readImpl; - static ReadImpl readFallback = new ReadFailback(); - - static { - try { - readImpl = new ReadUsingByteBufferPositionedReadable(); - } catch (NoClassDefFoundError e) { - readImpl = readFallback; - } - } - - public static void readFullyFromFSDataInputStream(FSDataInputStream in, long pos, ByteBuffer buf) - throws IOException { - try { - readFallback.read(in, pos, buf); - } catch (UnsupportedOperationException e) { - readImpl.read(in, pos, buf); - } - } - - public static void writeFullyToFSDataOutputStream(FSDataOutputStream out, ByteBuffer buf) throws IOException { - synchronized (out) { - WritableByteChannel channel = Channels.newChannel(out); - - while (buf.hasRemaining()) { - if (channel.write(buf) == -1) { - throw new EOFException("writeFullyToFSDataOutputStream() got unexpected EOF"); - } - } - } - } - - private interface ReadImpl { - void read(FSDataInputStream in, long pos, ByteBuffer buf) throws IOException; - } - - private static class ReadUsingByteBufferPositionedReadable implements ReadImpl { - @Override - public void read(FSDataInputStream in, long pos, ByteBuffer buf) throws IOException { - while (buf.hasRemaining()) { - if (((ByteBufferPositionedReadable) in).read(pos, buf) == -1) { - throw new EOFException("readFullyFromFSDataInputStream() got unexpected EOF"); - } - pos += buf.position(); - } - } - } - - private static class ReadFailback implements ReadImpl { - @Override - public void read(FSDataInputStream in, long pos, ByteBuffer buf) throws IOException { - synchronized (in) { - in.seek(pos); - while (buf.hasRemaining()) { - ReadableByteChannel channel = Channels.newChannel(in); - if (channel.read(buf) == -1) { - throw new EOFException("readFullyFromFSDataInputStream() got unexpected EOF"); - } - } - } - } - } -} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala index bfd8ef0b5..32f4d27e5 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeCallNativeWrapper.scala @@ -188,15 +188,12 @@ object BlazeCallNativeWrapper extends Logging { s"nativeMemory=${NativeHelper.nativeMemory}, " + s"memoryFraction=${BlazeConf.MEMORY_FRACTION.doubleConf()})") + assert(classOf[JniBridge] != null) // preload JNI bridge classes BlazeCallNativeWrapper.loadLibBlaze() ShutdownHookManager.addShutdownHook(() => JniBridge.onExit()) } private def loadLibBlaze(): Unit = { - // preload JNI bridge classes - Class.forName(classOf[JniBridge].getName) - Class.forName(classOf[JniUtil].getName) - val libName = System.mapLibraryName("blaze") try { val classLoader = classOf[NativeSupports].getClassLoader