-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Introduce user defined SQL planner API #11180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
a7c9e7f
3c6801c
3362985
42f6414
8ece394
d5d3189
cdf5342
d386702
24372e4
e889009
10477a8
b662b1e
160e032
c4e2b33
dafd53e
5e9af66
a0786eb
5963626
f1f1b4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,133 @@ | ||||||||||||||||||||||||||||||||||
| // Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||||||||||||||||||||||
| // or more contributor license agreements. See the NOTICE file | ||||||||||||||||||||||||||||||||||
| // distributed with this work for additional information | ||||||||||||||||||||||||||||||||||
| // regarding copyright ownership. The ASF licenses this file | ||||||||||||||||||||||||||||||||||
| // to you under the Apache License, Version 2.0 (the | ||||||||||||||||||||||||||||||||||
| // "License"); you may not use this file except in compliance | ||||||||||||||||||||||||||||||||||
| // with the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||
| // Unless required by applicable law or agreed to in writing, | ||||||||||||||||||||||||||||||||||
| // software distributed under the License is distributed on an | ||||||||||||||||||||||||||||||||||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||||||||||||||||||||||||
| // KIND, either express or implied. See the License for the | ||||||||||||||||||||||||||||||||||
| // specific language governing permissions and limitations | ||||||||||||||||||||||||||||||||||
| // under the License. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| //! SQL query planner module | ||||||||||||||||||||||||||||||||||
|
jayzhan211 marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| use std::sync::Arc; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| use arrow::datatypes::{DataType, SchemaRef}; | ||||||||||||||||||||||||||||||||||
| use datafusion_common::{ | ||||||||||||||||||||||||||||||||||
| config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema, | ||||||||||||||||||||||||||||||||||
| Result, TableReference, | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| use crate::{AggregateUDF, Expr, GetFieldAccess, ScalarUDF, TableSource, WindowUDF}; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// The ContextProvider trait allows the query planner to obtain meta-data about tables and | ||||||||||||||||||||||||||||||||||
| /// functions referenced in SQL statements | ||||||||||||||||||||||||||||||||||
|
jayzhan211 marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||
| pub trait ContextProvider { | ||||||||||||||||||||||||||||||||||
| /// Getter for a datasource | ||||||||||||||||||||||||||||||||||
| fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>>; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| fn get_file_type(&self, _ext: &str) -> Result<Arc<dyn FileType>> { | ||||||||||||||||||||||||||||||||||
| not_impl_err!("Registered file types are not supported") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Getter for a table function | ||||||||||||||||||||||||||||||||||
| fn get_table_function_source( | ||||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||||
| _name: &str, | ||||||||||||||||||||||||||||||||||
| _args: Vec<Expr>, | ||||||||||||||||||||||||||||||||||
| ) -> Result<Arc<dyn TableSource>> { | ||||||||||||||||||||||||||||||||||
| not_impl_err!("Table Functions are not supported") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// This provides a worktable (an intermediate table that is used to store the results of a CTE during execution) | ||||||||||||||||||||||||||||||||||
| /// We don't directly implement this in the logical plan's ['SqlToRel`] | ||||||||||||||||||||||||||||||||||
| /// because the sql code needs access to a table that contains execution-related types that can't be a direct dependency | ||||||||||||||||||||||||||||||||||
| /// of the sql crate (namely, the `CteWorktable`). | ||||||||||||||||||||||||||||||||||
| /// The [`ContextProvider`] provides a way to "hide" this dependency. | ||||||||||||||||||||||||||||||||||
| fn create_cte_work_table( | ||||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||||
| _name: &str, | ||||||||||||||||||||||||||||||||||
| _schema: SchemaRef, | ||||||||||||||||||||||||||||||||||
| ) -> Result<Arc<dyn TableSource>> { | ||||||||||||||||||||||||||||||||||
| not_impl_err!("Recursive CTE is not implemented") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Getter for a UDF description | ||||||||||||||||||||||||||||||||||
| fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>; | ||||||||||||||||||||||||||||||||||
| /// Getter for a UDAF description | ||||||||||||||||||||||||||||||||||
| fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>; | ||||||||||||||||||||||||||||||||||
| /// Getter for a UDWF | ||||||||||||||||||||||||||||||||||
| fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>>; | ||||||||||||||||||||||||||||||||||
| /// Getter for system/user-defined variable type | ||||||||||||||||||||||||||||||||||
| fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType>; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Get configuration options | ||||||||||||||||||||||||||||||||||
| fn options(&self) -> &ConfigOptions; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Get all user defined scalar function names | ||||||||||||||||||||||||||||||||||
| fn udf_names(&self) -> Vec<String>; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Get all user defined aggregate function names | ||||||||||||||||||||||||||||||||||
| fn udaf_names(&self) -> Vec<String>; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Get all user defined window function names | ||||||||||||||||||||||||||||||||||
| fn udwf_names(&self) -> Vec<String>; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// This trait allows users to customize the behavior of the SQL planner | ||||||||||||||||||||||||||||||||||
| pub trait UserDefinedPlanner { | ||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about different names for this struct that might be more specific to SQL and thus make it easier to find How about |
||||||||||||||||||||||||||||||||||
| /// Plan the binary operation between two expressions, returns OriginalBinaryExpr if not possible | ||||||||||||||||||||||||||||||||||
| fn plan_binary_op( | ||||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||||
| expr: BinaryExpr, | ||||||||||||||||||||||||||||||||||
| _schema: &DFSchema, | ||||||||||||||||||||||||||||||||||
| ) -> Result<PlannerSimplifyResult> { | ||||||||||||||||||||||||||||||||||
| Ok(PlannerSimplifyResult::OriginalBinaryExpr(expr)) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /// Plan the field access expression, returns OriginalFieldAccessExpr if not possible | ||||||||||||||||||||||||||||||||||
| fn plan_field_access( | ||||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||||
| expr: FieldAccessExpr, | ||||||||||||||||||||||||||||||||||
| _schema: &DFSchema, | ||||||||||||||||||||||||||||||||||
| ) -> Result<PlannerSimplifyResult> { | ||||||||||||||||||||||||||||||||||
| Ok(PlannerSimplifyResult::OriginalFieldAccessExpr(expr)) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Plan the array literal, returns OriginalArray if not possible | ||||||||||||||||||||||||||||||||||
| fn plan_array_literal( | ||||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||||
| exprs: Vec<Expr>, | ||||||||||||||||||||||||||||||||||
| _schema: &DFSchema, | ||||||||||||||||||||||||||||||||||
| ) -> Result<PlannerSimplifyResult> { | ||||||||||||||||||||||||||||||||||
| Ok(PlannerSimplifyResult::OriginalArray(exprs)) | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| pub struct BinaryExpr { | ||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it may be confusing if this struct is also called Here is some suggested comments to explain the structure (it would be good to do the same for FieldAccessExpr too):
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also struggle with this naming too 😆 . I will take |
||||||||||||||||||||||||||||||||||
| pub op: sqlparser::ast::BinaryOperator, | ||||||||||||||||||||||||||||||||||
| pub left: Expr, | ||||||||||||||||||||||||||||||||||
| pub right: Expr, | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| pub struct FieldAccessExpr { | ||||||||||||||||||||||||||||||||||
| pub field_access: GetFieldAccess, | ||||||||||||||||||||||||||||||||||
| pub expr: Expr, | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| pub enum PlannerSimplifyResult { | ||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this isn't really "simplifying" anything maybe we could call it |
||||||||||||||||||||||||||||||||||
| /// The function call was simplified to an entirely new Expr | ||||||||||||||||||||||||||||||||||
| Simplified(Expr), | ||||||||||||||||||||||||||||||||||
| /// the function call could not be simplified, and the arguments | ||||||||||||||||||||||||||||||||||
| /// are return unmodified. | ||||||||||||||||||||||||||||||||||
| OriginalBinaryExpr(BinaryExpr), | ||||||||||||||||||||||||||||||||||
| OriginalFieldAccessExpr(FieldAccessExpr), | ||||||||||||||||||||||||||||||||||
| OriginalArray(Vec<Expr>), | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this might be easier to use if it were generic so that the callsites would know exactly the type of the returns Original SO something like
Suggested change
|
||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -52,6 +52,7 @@ datafusion-functions = { workspace = true } | |||
| itertools = { version = "0.12", features = ["use_std"] } | ||||
| log = { workspace = true } | ||||
| paste = "1.0.14" | ||||
| sqlparser = { workspace = true } | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since datafusion-expr already has a explicit dependency on datafusion/datafusion/expr/Cargo.toml Line 49 in d19487c
I think it would be cleaner here to avoid the explicit dependency on SO like in That would ensure the versions always matched (though I think workspace mostly takes care of this) as well as make it clear that the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw do we also need to cleanup |
||||
|
|
||||
| [dev-dependencies] | ||||
| criterion = { version = "0.5", features = ["async_tokio"] } | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use datafusion_common::{utils::list_ndims, DFSchema, Result}; | ||
|
jayzhan211 marked this conversation as resolved.
|
||
| use datafusion_expr::{ | ||
| planner::{BinaryExpr, FieldAccessExpr, PlannerSimplifyResult, UserDefinedPlanner}, | ||
| AggregateFunction, Expr, ExprSchemable, GetFieldAccess, | ||
| }; | ||
| use datafusion_functions::expr_fn::get_field; | ||
|
|
||
| use crate::{ | ||
| array_has::array_has_all, | ||
| expr_fn::{array_append, array_concat, array_prepend}, | ||
| extract::{array_element, array_slice}, | ||
| make_array::make_array, | ||
| }; | ||
|
|
||
| #[derive(Default)] | ||
| pub struct ArrayFunctionPlanner {} | ||
|
|
||
| impl UserDefinedPlanner for ArrayFunctionPlanner { | ||
| fn plan_binary_op( | ||
| &self, | ||
| expr: BinaryExpr, | ||
| schema: &DFSchema, | ||
| ) -> Result<PlannerSimplifyResult> { | ||
| let BinaryExpr { op, left, right } = expr; | ||
|
|
||
| if op == sqlparser::ast::BinaryOperator::StringConcat { | ||
| let left_type = left.get_type(schema)?; | ||
| let right_type = right.get_type(schema)?; | ||
| let left_list_ndims = list_ndims(&left_type); | ||
| let right_list_ndims = list_ndims(&right_type); | ||
|
|
||
| // Rewrite string concat operator to function based on types | ||
| // if we get list || list then we rewrite it to array_concat() | ||
| // if we get list || non-list then we rewrite it to array_append() | ||
| // if we get non-list || list then we rewrite it to array_prepend() | ||
| // if we get string || string then we rewrite it to concat() | ||
|
|
||
| // We determine the target function to rewrite based on the list n-dimension, the check is not exact but sufficient. | ||
| // The exact validity check is handled in the actual function, so even if there is 3d list appended with 1d list, it is also fine to rewrite. | ||
| if left_list_ndims + right_list_ndims == 0 { | ||
| // TODO: concat function ignore null, but string concat takes null into consideration | ||
| // we can rewrite it to concat if we can configure the behaviour of concat function to the one like `string concat operator` | ||
| } else if left_list_ndims == right_list_ndims { | ||
| return Ok(PlannerSimplifyResult::Simplified(array_concat(vec![ | ||
| left, right, | ||
| ]))); | ||
| } else if left_list_ndims > right_list_ndims { | ||
| return Ok(PlannerSimplifyResult::Simplified(array_append(left, right))); | ||
| } else if left_list_ndims < right_list_ndims { | ||
| return Ok(PlannerSimplifyResult::Simplified(array_prepend( | ||
| left, right, | ||
| ))); | ||
| } | ||
| } else if matches!( | ||
| op, | ||
| sqlparser::ast::BinaryOperator::AtArrow | ||
| | sqlparser::ast::BinaryOperator::ArrowAt | ||
| ) { | ||
| let left_type = left.get_type(schema)?; | ||
| let right_type = right.get_type(schema)?; | ||
| let left_list_ndims = list_ndims(&left_type); | ||
| let right_list_ndims = list_ndims(&right_type); | ||
| // if both are list | ||
| if left_list_ndims > 0 && right_list_ndims > 0 { | ||
| if op == sqlparser::ast::BinaryOperator::AtArrow { | ||
| // array1 @> array2 -> array_has_all(array1, array2) | ||
| return Ok(PlannerSimplifyResult::Simplified(array_has_all( | ||
| left, right, | ||
| ))); | ||
| } else { | ||
| // array1 <@ array2 -> array_has_all(array2, array1) | ||
| return Ok(PlannerSimplifyResult::Simplified(array_has_all( | ||
| right, left, | ||
| ))); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(PlannerSimplifyResult::OriginalBinaryExpr(BinaryExpr { | ||
| op, | ||
| left, | ||
| right, | ||
| })) | ||
| } | ||
|
|
||
| fn plan_array_literal( | ||
| &self, | ||
| exprs: Vec<Expr>, | ||
| _schema: &DFSchema, | ||
| ) -> Result<PlannerSimplifyResult> { | ||
| Ok(PlannerSimplifyResult::Simplified(make_array(exprs))) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Default)] | ||
| pub struct FieldAccessPlanner {} | ||
|
|
||
| impl UserDefinedPlanner for FieldAccessPlanner { | ||
| fn plan_field_access( | ||
| &self, | ||
| expr: FieldAccessExpr, | ||
| _schema: &DFSchema, | ||
| ) -> Result<PlannerSimplifyResult> { | ||
| let FieldAccessExpr { expr, field_access } = expr; | ||
|
|
||
| match field_access { | ||
| // expr["field"] => get_field(expr, "field") | ||
| GetFieldAccess::NamedStructField { name } => { | ||
| Ok(PlannerSimplifyResult::Simplified(get_field(expr, name))) | ||
| } | ||
| // expr[idx] ==> array_element(expr, idx) | ||
| GetFieldAccess::ListIndex { key: index } => { | ||
| match expr { | ||
| // Special case for array_agg(expr)[index] to NTH_VALUE(expr, index) | ||
| Expr::AggregateFunction(agg_func) if is_array_agg(&agg_func) => { | ||
| Ok(PlannerSimplifyResult::Simplified(Expr::AggregateFunction( | ||
| datafusion_expr::expr::AggregateFunction::new( | ||
| AggregateFunction::NthValue, | ||
| agg_func | ||
| .args | ||
| .into_iter() | ||
| .chain(std::iter::once(*index)) | ||
| .collect(), | ||
| agg_func.distinct, | ||
| agg_func.filter, | ||
| agg_func.order_by, | ||
| agg_func.null_treatment, | ||
| ), | ||
| ))) | ||
| } | ||
| _ => Ok(PlannerSimplifyResult::Simplified(array_element( | ||
| expr, *index, | ||
| ))), | ||
| } | ||
| } | ||
| // expr[start, stop, stride] ==> array_slice(expr, start, stop, stride) | ||
| GetFieldAccess::ListRange { | ||
| start, | ||
| stop, | ||
| stride, | ||
| } => Ok(PlannerSimplifyResult::Simplified(array_slice( | ||
| expr, | ||
| *start, | ||
| *stop, | ||
| Some(*stride), | ||
| ))), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn is_array_agg(agg_func: &datafusion_expr::expr::AggregateFunction) -> bool { | ||
| agg_func.func_def | ||
| == datafusion_expr::expr::AggregateFunctionDefinition::BuiltIn( | ||
| AggregateFunction::ArrayAgg, | ||
| ) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍