Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 8 additions & 97 deletions native-engine/blaze-serde/proto/blaze.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ message PhysicalPlanNode {
}
}

enum JoinConstraint {
ON = 0;
USING = 1;
}

// physical expressions
message PhysicalExprNode {
oneof ExprType {
Expand Down Expand Up @@ -387,6 +382,12 @@ message ScanLimit {
uint32 limit = 1;
}

message ColumnStats {
ScalarValue min_value = 1;
ScalarValue max_value = 2;
uint32 null_count = 3;
uint32 distinct_count = 4;
}

message Statistics {
int64 num_rows = 1;
Expand Down Expand Up @@ -636,7 +637,7 @@ message PhysicalRoundRobinRepartition {
message PhysicalRangeRepartition {
SortExecNode sort_expr = 1;
uint64 partition_count = 2;
repeated ScalarValue list_value= 3;
repeated ScalarValue list_value = 3;
}


Expand Down Expand Up @@ -717,60 +718,13 @@ message PartitionId {
uint32 partition_id = 4;
}

message PartitionStats {
int64 num_rows = 1;
int64 num_batches = 2;
int64 num_bytes = 3;
repeated ColumnStats column_stats = 4;
}

message ColumnStats {
ScalarValue min_value = 1;
ScalarValue max_value = 2;
uint32 null_count = 3;
uint32 distinct_count = 4;
}

message RunningTask {
string executor_id = 1;
}

message FailedTask {
string error = 1;
}

message CompletedTask {
string executor_id = 1;
// TODO tasks are currently always shuffle writes but this will not always be the case
// so we might want to think about some refactoring of the task definitions
repeated ShuffleWritePartition partitions = 2;
}

message ShuffleWritePartition {
uint64 partition_id = 1;
string path = 2;
uint64 num_batches = 3;
uint64 num_rows = 4;
uint64 num_bytes = 5;
}

message TaskStatus {
PartitionId partition_id = 1;
oneof status {
RunningTask running = 2;
FailedTask failed = 3;
CompletedTask completed = 4;
}
}

message TaskDefinition {
PartitionId task_id = 1;
PhysicalPlanNode plan = 2;
// Output partition for shuffle writer
PhysicalRepartition output_partitioning = 3;
}


///////////////////////////////////////////////////////////////////////////////////////////////////
// Arrow Data Types
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -853,40 +807,8 @@ message Union {
UnionMode union_mode = 2;
}

message ScalarListValue {
ScalarType datatype = 1;
repeated ScalarValue values = 2;
}

message ScalarDecimalValue {
Decimal decimal = 1;
int64 long_value = 2; // datafusion has i128 decimal value, only use i64 for blaze
}

message ScalarValue {
oneof value {
bool bool_value = 1;
string utf8_value = 2;
string large_utf8_value = 3;
int32 int8_value = 4;
int32 int16_value = 5;
int32 int32_value = 6;
int64 int64_value = 7;
uint32 uint8_value = 8;
uint32 uint16_value = 9;
uint32 uint32_value = 10;
uint64 uint64_value = 11;
float float32_value = 12;
double float64_value = 13;
int32 date32_value = 14;
int64 timestamp_second_value = 15;
int64 timestamp_millisecond_value = 16;
int64 timestamp_microsecond_value = 17;
int64 timestamp_nanosecond_value = 18;
ScalarListValue list_value = 19;
ScalarDecimalValue decimal_value = 20;
ScalarType null_value = 1000;
}
bytes ipc_bytes = 1;
}

// Contains all valid datafusion scalar type except for
Expand Down Expand Up @@ -917,17 +839,6 @@ enum PrimitiveScalarType {
INTERVAL_DAYTIME = 22;
}

message ScalarListType {
ScalarType element_type = 1;
}

message ScalarType {
oneof datatype {
PrimitiveScalarType scalar = 1;
ScalarListType list = 2;
}
}

// Broke out into multiple message types so that type
// metadata did not need to be in separate message
//All types that are of the empty message types contain no additional metadata
Expand Down
37 changes: 12 additions & 25 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ fn try_parse_physical_expr(
let pexpr: Arc<dyn PhysicalExpr> =
match expr_type {
ExprType::Column(c) => Arc::new(Column::new(&c.name, input_schema.index_of(&c.name)?)),
ExprType::Literal(scalar) => Arc::new(Literal::new(convert_required!(scalar.value)?)),
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
ExprType::BoundReference(bound_reference) => {
let pcol: Column = bound_reference.into();
Arc::new(pcol)
Expand Down Expand Up @@ -1134,7 +1134,11 @@ pub fn parse_protobuf_partitioning(
let sort = range_part.sort_expr.clone().unwrap();
let exprs = try_parse_physical_sort_expr(&input, &sort).unwrap();

let value_list = &range_part.list_value;
let value_list: Vec<ScalarValue> = range_part
.list_value
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?;

let sort_row_converter = Arc::new(SyncMutex::new(RowConverter::new(
exprs
Expand All @@ -1151,30 +1155,13 @@ pub fn parse_protobuf_partitioning(
let bound_cols: Vec<ArrayRef> = value_list
.iter()
.map(|x| {
let xx = x.clone().value.unwrap();
let values_ref = match xx {
protobuf::scalar_value::Value::ListValue(scalar_list) => {
let protobuf::ScalarListValue {
values,
datatype: _opt_scalar_type,
} = scalar_list;
let value_vec: Vec<ScalarValue> = values
.iter()
.map(|val| val.try_into())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| {
proto_error("partition::from_proto() error")
})?;
ScalarValue::iter_to_array(value_vec)
.map_err(|_| proto_error("partition::from_proto() error"))
}
_ => Err(proto_error(
"partition::from_proto() bound_list type error",
)),
};
values_ref
if let ScalarValue::List(single) = x {
return single.value(0);
} else {
unreachable!("expect list scalar value");
}
})
.collect::<Result<Vec<ArrayRef>, _>>()?;
.collect::<Vec<ArrayRef>>();

let bound_rows = sort_row_converter.lock().convert_columns(&bound_cols)?;
Ok(Some(Partitioning::RangePartitioning(
Expand Down
Loading
Loading