Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native-engine/blaze-jni-bridge/src/jni_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,8 @@ pub struct JavaBuffer<'a> {
pub class: JClass<'a>,
pub method_hasRemaining: JMethodID,
pub method_hasRemaining_ret: ReturnType,
pub method_arrayOffset: JMethodID,
pub method_arrayOffset_ret: ReturnType,
pub method_position: JMethodID,
pub method_position_ret: ReturnType,
pub method_remaining: JMethodID,
Expand All @@ -832,6 +834,8 @@ impl<'a> JavaBuffer<'a> {
class,
method_hasRemaining: env.get_method_id(class, "hasRemaining", "()Z")?,
method_hasRemaining_ret: ReturnType::Primitive(Primitive::Boolean),
method_arrayOffset: env.get_method_id(class, "arrayOffset", "()I")?,
method_arrayOffset_ret: ReturnType::Primitive(Primitive::Int),
method_position: env.get_method_id(class, "position", "()I")?,
method_position_ret: ReturnType::Primitive(Primitive::Int),
method_remaining: env.get_method_id(class, "remaining", "()I")?,
Expand Down
12 changes: 7 additions & 5 deletions native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,14 @@ impl HeapByteBufferReader {
pub fn try_new(block: JObject, byte_buffer: JObject) -> Result<Self> {
let block_global_ref = jni_new_global_ref!(block)?;
let byte_array = jni_call!(JavaBuffer(byte_buffer).array() -> JObject)?;
let array_offset = jni_call!(JavaBuffer(byte_buffer).arrayOffset() -> i32)? as usize;
let pos = jni_call!(JavaBuffer(byte_buffer).position() -> i32)? as usize;
let remaining = jni_call!(JavaBuffer(byte_buffer).remaining() -> i32)? as usize;
let byet_array_global_ref = jni_new_global_ref!(byte_array.as_obj())?;
Ok(Self {
block: block_global_ref,
byte_array: byet_array_global_ref,
pos,
pos: array_offset + pos,
remaining,
})
}
Expand All @@ -414,10 +415,11 @@ impl HeapByteBufferReader {

fn read_impl(&mut self, buf: &mut [u8]) -> Result<usize> {
let read_len = buf.len().min(self.remaining);

jni_get_byte_array_region!(self.byte_array.as_obj(), self.pos, &mut buf[..read_len])?;
self.pos += read_len;
self.remaining -= read_len;
if read_len > 0 {
jni_get_byte_array_region!(self.byte_array.as_obj(), self.pos, &mut buf[..read_len])?;
self.pos += read_len;
self.remaining -= read_len;
}
Ok(read_len)
}
}
Expand Down
Loading