Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 5 additions & 44 deletions native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use datafusion::{
common::{DataFusionError, Statistics},
error::Result,
execution::TaskContext,
logical_expr::UserDefinedLogicalNode,
physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning},
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
Expand Down Expand Up @@ -225,8 +224,8 @@ fn read_serialized_records_from_kafka(
batch_size: usize,
startup_mode: i32,
auron_operator_id: String,
data_format: i32,
format_config_json: String,
_data_format: i32,
_format_config_json: String,
) -> Result<SendableRecordBatchStream> {
let context = CustomContext;
// get source json string from jni bridge resource
Expand All @@ -237,10 +236,6 @@ fn read_serialized_records_from_kafka(
.expect("kafka_task_json_java is not valid java string");
let task_json = sonic_rs::from_str::<sonic_rs::Value>(&kafka_task_json)
.expect("source_json_str is not valid json");
let num_readers = task_json
.get("num_readers")
.as_i64()
.expect("num_readers is not valid json") as i32;
let subtask_index = task_json
.get("subtask_index")
.as_i64()
Expand Down Expand Up @@ -322,7 +317,7 @@ fn read_serialized_records_from_kafka(
} else {
offset
};
partition_list.add_partition_offset(&kafka_topic, *partition, partition_offset);
let _ = partition_list.add_partition_offset(&kafka_topic, *partition, partition_offset);
}
consumer
.assign(&partition_list)
Expand Down Expand Up @@ -389,7 +384,7 @@ fn read_serialized_records_from_kafka(
if let Some(obj) = offsets_to_commit.as_object() {
if !obj.is_empty() {
for (partition, offset) in obj {
partition_list.add_partition_offset(
let _ = partition_list.add_partition_offset(
&kafka_topic,
partition
.parse::<i32>()
Expand All @@ -400,7 +395,7 @@ fn read_serialized_records_from_kafka(
);
}
log::info!("auron consumer to commit offset: {partition_list:?}");
consumer.commit(&partition_list, CommitMode::Async);
let _ = consumer.commit(&partition_list, CommitMode::Async);
}
}
}
Expand Down Expand Up @@ -496,37 +491,3 @@ fn parse_records(
},
))
}

fn java_string_hashcode(s: &str) -> i32 {
let mut hash: i32 = 0;
for c in s.chars() {
let mut buf = [0; 2];
let encoded = c.encode_utf16(&mut buf);
for code_unit in encoded.iter().cloned() {
hash = hash.wrapping_mul(31).wrapping_add(code_unit as i32);
}
}
hash
}

fn flink_kafka_partition_assign(topic: String, partition_id: i32, num_readers: i32) -> Result<i32> {
if num_readers <= 0 {
return Err(DataFusionError::Execution(format!(
"num_readers must be positive: {num_readers}"
)));
}
// Java hashcode
let hash_code = java_string_hashcode(&topic);
let start_index = (hash_code.wrapping_mul(31) & i32::MAX) % num_readers;
Ok((start_index + partition_id).rem_euclid(num_readers))
}

#[test]
fn test_flink_kafka_partition_assign() {
let topic = "flink_test_topic".to_string();
let partition_id = 0;
let num_readers = 1000;
// the result same with flink
let result = flink_kafka_partition_assign(topic, partition_id, num_readers);
assert_eq!(result.expect("Error assigning partition"), 471);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,9 @@ use arrow::array::{
};
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, TimeUnit};
use bytes::Buf;
use datafusion::{
common::ExprSchema, error::DataFusionError, logical_expr::UserDefinedLogicalNode,
};
use datafusion::error::DataFusionError;
use datafusion_ext_commons::{df_execution_err, downcast_any};
use prost::{
DecodeError,
encoding::{DecodeContext, WireType},
};
use prost::encoding::{DecodeContext, WireType};
use prost_reflect::{DescriptorPool, FieldDescriptor, Kind, MessageDescriptor, UnknownField};

use crate::flink::serde::{
Expand Down Expand Up @@ -261,7 +256,7 @@ fn transfer_output_schema_to_pb_schema(
let mut sub_pb_schema_mapping: HashMap<String, Vec<Field>> = HashMap::new();
// To ensure sequential processing, the output schema is used to traverse the
// data.
for (output_index, field) in output_schema.fields().iter().enumerate() {
for field in output_schema.fields().iter() {
if let Some(pb_nested_msg_name) = nested_msg_mapping.get(field.name()) {
let index_start = pb_nested_msg_name.find(".");
if let Some(index) = index_start {
Expand All @@ -279,7 +274,7 @@ fn transfer_output_schema_to_pb_schema(
}
}
let mut msg_set: HashSet<String> = HashSet::new();
for (index, field) in output_schema.fields().iter().enumerate() {
for field in output_schema.fields().iter() {
if let Some(field_name) = nested_msg_mapping.get(field.name()) {
let index_start = field_name.find(".");
if let Some(index) = index_start {
Expand Down Expand Up @@ -458,7 +453,7 @@ fn convert_pb_type_to_arrow(
Ok(DataType::UInt64)
}
}
Kind::Enum(enum_descriptor) => {
Kind::Enum(_enum_descriptor) => {
// Enum to get the Name, so use String.
if is_list {
Ok(DataType::List(create_arrow_field_ref(
Expand Down Expand Up @@ -596,7 +591,7 @@ fn create_output_array_builders(
struct_builder,
)));
}
DataType::Map(field_ref, boolean) => {
DataType::Map(field_ref, _boolean) => {
let field_kind = field_desc.kind();
let sub_msg_desc = field_kind.as_message().expect("map as_message failed");
if let DataType::Struct(fields) = field_ref.data_type() {
Expand Down Expand Up @@ -686,7 +681,7 @@ fn create_shared_array_builder_by_data_type(
struct_builder,
)));
}
DataType::Map(field_ref, boolean) => {
DataType::Map(field_ref, _boolean) => {
let field_kind = field_desc.kind();
let sub_msg_desc = field_kind.as_message().expect("map as_message failed");
if let DataType::Struct(fields) = field_ref.data_type() {
Expand Down Expand Up @@ -983,7 +978,7 @@ fn create_value_handler(

macro_rules! impl_for_message_builder {
($handle_fn:expr) => {{
Box::new(move |cursor: &mut Cursor<&[u8]>, tag, wire_type| {
Box::new(move |cursor: &mut Cursor<&[u8]>, _tag, wire_type| {
prost::encoding::check_wire_type(WireType::LengthDelimited, wire_type)
.or_else(|err| df_execution_err!("{err}"))?;
let len = prost::encoding::decode_varint(cursor)
Expand Down Expand Up @@ -1380,7 +1375,7 @@ fn create_value_handler(
output_field.data_type()
)));
}
} else if let DataType::Map(struct_fields, boolean) = output_field.data_type() {
} else if let DataType::Map(struct_fields, _boolean) = output_field.data_type() {
if let DataType::Struct(sub_fields) = struct_fields.data_type() {
let sub_pb_schema = Arc::new(Schema::new(sub_fields.clone()));
let sub_tag_to_output_mapping = create_tag_to_output_mapping(
Expand Down Expand Up @@ -1473,7 +1468,7 @@ fn create_value_handler(
}));
}
}
other => {
_other => {
return Err(DataFusionError::Execution(format!(
"Failed to create value handler field: {:?}, {}",
field.kind(),
Expand All @@ -1487,29 +1482,37 @@ fn create_value_handler(
let mut skip_value = move || {
match wire_type {
WireType::Varint => {
prost::encoding::decode_varint(cursor)?;
prost::encoding::decode_varint(cursor)
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
}
WireType::ThirtyTwoBit => {
if cursor.remaining() < 4 {
return Err(DecodeError::new("buffer underflow"));
return df_execution_err!("buffer underflow");
}
cursor.advance(4);
}
WireType::SixtyFourBit => {
if cursor.remaining() < 8 {
return Err(DecodeError::new("buffer underflow"));
return df_execution_err!("buffer underflow");
}
cursor.advance(8);
}
WireType::LengthDelimited => {
let len = prost::encoding::decode_varint(cursor)? as usize;
let len = prost::encoding::decode_varint(cursor)
.map_err(|e| DataFusionError::Execution(e.to_string()))?
as usize;
if cursor.remaining() < len {
return Err(DecodeError::new("buffer underflow"));
return df_execution_err!("buffer underflow");
}
cursor.advance(len);
}
_ => {
UnknownField::decode_value(tag, wire_type, cursor, DecodeContext::default())?;
UnknownField::decode_value(tag, wire_type, cursor, DecodeContext::default())
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to decode unknown value: {e}"
))
})?;
}
}
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ impl SharedMapArrayBuilder {
}

/// Returns the key array builder of the map
#[allow(dead_code)]
pub(crate) fn keys(&mut self) -> &mut SharedArrayBuilder {
&mut self.key_builder
}

/// Returns the value array builder of the map
#[allow(dead_code)]
pub(crate) fn values(&mut self) -> &mut SharedArrayBuilder {
&mut self.value_builder
}
Expand Down
Loading