diff --git a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs index f5be480c9..f5caea0a5 100644 --- a/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs +++ b/native-engine/datafusion-ext-plans/src/flink/kafka_scan_exec.rs @@ -25,7 +25,6 @@ use datafusion::{ common::{DataFusionError, Statistics}, error::Result, execution::TaskContext, - logical_expr::UserDefinedLogicalNode, physical_expr::{EquivalenceProperties, Partitioning::UnknownPartitioning}, physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, @@ -225,8 +224,8 @@ fn read_serialized_records_from_kafka( batch_size: usize, startup_mode: i32, auron_operator_id: String, - data_format: i32, - format_config_json: String, + _data_format: i32, + _format_config_json: String, ) -> Result { let context = CustomContext; // get source json string from jni bridge resource @@ -237,10 +236,6 @@ fn read_serialized_records_from_kafka( .expect("kafka_task_json_java is not valid java string"); let task_json = sonic_rs::from_str::(&kafka_task_json) .expect("source_json_str is not valid json"); - let num_readers = task_json - .get("num_readers") - .as_i64() - .expect("num_readers is not valid json") as i32; let subtask_index = task_json .get("subtask_index") .as_i64() @@ -322,7 +317,7 @@ fn read_serialized_records_from_kafka( } else { offset }; - partition_list.add_partition_offset(&kafka_topic, *partition, partition_offset); + let _ = partition_list.add_partition_offset(&kafka_topic, *partition, partition_offset); } consumer .assign(&partition_list) @@ -389,7 +384,7 @@ fn read_serialized_records_from_kafka( if let Some(obj) = offsets_to_commit.as_object() { if !obj.is_empty() { for (partition, offset) in obj { - partition_list.add_partition_offset( + let _ = partition_list.add_partition_offset( &kafka_topic, partition .parse::() @@ -400,7 +395,7 @@ fn read_serialized_records_from_kafka( ); } log::info!("auron consumer to commit offset: {partition_list:?}"); - consumer.commit(&partition_list, CommitMode::Async); + let _ = consumer.commit(&partition_list, CommitMode::Async); } } } @@ -496,37 +491,3 @@ fn parse_records( }, )) } - -fn java_string_hashcode(s: &str) -> i32 { - let mut hash: i32 = 0; - for c in s.chars() { - let mut buf = [0; 2]; - let encoded = c.encode_utf16(&mut buf); - for code_unit in encoded.iter().cloned() { - hash = hash.wrapping_mul(31).wrapping_add(code_unit as i32); - } - } - hash -} - -fn flink_kafka_partition_assign(topic: String, partition_id: i32, num_readers: i32) -> Result { - if num_readers <= 0 { - return Err(DataFusionError::Execution(format!( - "num_readers must be positive: {num_readers}" - ))); - } - // Java hashcode - let hash_code = java_string_hashcode(&topic); - let start_index = (hash_code.wrapping_mul(31) & i32::MAX) % num_readers; - Ok((start_index + partition_id).rem_euclid(num_readers)) -} - -#[test] -fn test_flink_kafka_partition_assign() { - let topic = "flink_test_topic".to_string(); - let partition_id = 0; - let num_readers = 1000; - // the result same with flink - let result = flink_kafka_partition_assign(topic, partition_id, num_readers); - assert_eq!(result.expect("Error assigning partition"), 471); -} diff --git a/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs b/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs index 3c4ea4f43..a2ecb417e 100644 --- a/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs +++ b/native-engine/datafusion-ext-plans/src/flink/serde/pb_deserializer.rs @@ -29,14 +29,9 @@ use arrow::array::{ }; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, TimeUnit}; use bytes::Buf; -use datafusion::{ - common::ExprSchema, error::DataFusionError, logical_expr::UserDefinedLogicalNode, -}; +use datafusion::error::DataFusionError; use datafusion_ext_commons::{df_execution_err, downcast_any}; -use prost::{ - DecodeError, - encoding::{DecodeContext, WireType}, -}; +use prost::encoding::{DecodeContext, WireType}; use prost_reflect::{DescriptorPool, FieldDescriptor, Kind, MessageDescriptor, UnknownField}; use crate::flink::serde::{ @@ -261,7 +256,7 @@ fn transfer_output_schema_to_pb_schema( let mut sub_pb_schema_mapping: HashMap> = HashMap::new(); // To ensure sequential processing, the output schema is used to traverse the // data. - for (output_index, field) in output_schema.fields().iter().enumerate() { + for field in output_schema.fields().iter() { if let Some(pb_nested_msg_name) = nested_msg_mapping.get(field.name()) { let index_start = pb_nested_msg_name.find("."); if let Some(index) = index_start { @@ -279,7 +274,7 @@ fn transfer_output_schema_to_pb_schema( } } let mut msg_set: HashSet = HashSet::new(); - for (index, field) in output_schema.fields().iter().enumerate() { + for field in output_schema.fields().iter() { if let Some(field_name) = nested_msg_mapping.get(field.name()) { let index_start = field_name.find("."); if let Some(index) = index_start { @@ -458,7 +453,7 @@ fn convert_pb_type_to_arrow( Ok(DataType::UInt64) } } - Kind::Enum(enum_descriptor) => { + Kind::Enum(_enum_descriptor) => { // Enum to get the Name, so use String. if is_list { Ok(DataType::List(create_arrow_field_ref( @@ -596,7 +591,7 @@ fn create_output_array_builders( struct_builder, ))); } - DataType::Map(field_ref, boolean) => { + DataType::Map(field_ref, _boolean) => { let field_kind = field_desc.kind(); let sub_msg_desc = field_kind.as_message().expect("map as_message failed"); if let DataType::Struct(fields) = field_ref.data_type() { @@ -686,7 +681,7 @@ fn create_shared_array_builder_by_data_type( struct_builder, ))); } - DataType::Map(field_ref, boolean) => { + DataType::Map(field_ref, _boolean) => { let field_kind = field_desc.kind(); let sub_msg_desc = field_kind.as_message().expect("map as_message failed"); if let DataType::Struct(fields) = field_ref.data_type() { @@ -983,7 +978,7 @@ fn create_value_handler( macro_rules! impl_for_message_builder { ($handle_fn:expr) => {{ - Box::new(move |cursor: &mut Cursor<&[u8]>, tag, wire_type| { + Box::new(move |cursor: &mut Cursor<&[u8]>, _tag, wire_type| { prost::encoding::check_wire_type(WireType::LengthDelimited, wire_type) .or_else(|err| df_execution_err!("{err}"))?; let len = prost::encoding::decode_varint(cursor) @@ -1380,7 +1375,7 @@ fn create_value_handler( output_field.data_type() ))); } - } else if let DataType::Map(struct_fields, boolean) = output_field.data_type() { + } else if let DataType::Map(struct_fields, _boolean) = output_field.data_type() { if let DataType::Struct(sub_fields) = struct_fields.data_type() { let sub_pb_schema = Arc::new(Schema::new(sub_fields.clone())); let sub_tag_to_output_mapping = create_tag_to_output_mapping( @@ -1473,7 +1468,7 @@ fn create_value_handler( })); } } - other => { + _other => { return Err(DataFusionError::Execution(format!( "Failed to create value handler field: {:?}, {}", field.kind(), @@ -1487,29 +1482,37 @@ fn create_value_handler( let mut skip_value = move || { match wire_type { WireType::Varint => { - prost::encoding::decode_varint(cursor)?; + prost::encoding::decode_varint(cursor) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; } WireType::ThirtyTwoBit => { if cursor.remaining() < 4 { - return Err(DecodeError::new("buffer underflow")); + return df_execution_err!("buffer underflow"); } cursor.advance(4); } WireType::SixtyFourBit => { if cursor.remaining() < 8 { - return Err(DecodeError::new("buffer underflow")); + return df_execution_err!("buffer underflow"); } cursor.advance(8); } WireType::LengthDelimited => { - let len = prost::encoding::decode_varint(cursor)? as usize; + let len = prost::encoding::decode_varint(cursor) + .map_err(|e| DataFusionError::Execution(e.to_string()))? + as usize; if cursor.remaining() < len { - return Err(DecodeError::new("buffer underflow")); + return df_execution_err!("buffer underflow"); } cursor.advance(len); } _ => { - UnknownField::decode_value(tag, wire_type, cursor, DecodeContext::default())?; + UnknownField::decode_value(tag, wire_type, cursor, DecodeContext::default()) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to decode unknown value: {e}" + )) + })?; } } Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs b/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs index b5545e481..184bb9ca5 100644 --- a/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs +++ b/native-engine/datafusion-ext-plans/src/flink/serde/shared_map_array_builder.rs @@ -119,11 +119,13 @@ impl SharedMapArrayBuilder { } /// Returns the key array builder of the map + #[allow(dead_code)] pub(crate) fn keys(&mut self) -> &mut SharedArrayBuilder { &mut self.key_builder } /// Returns the value array builder of the map + #[allow(dead_code)] pub(crate) fn values(&mut self) -> &mut SharedArrayBuilder { &mut self.value_builder }