-
Notifications
You must be signed in to change notification settings - Fork 770
[CBO] Make optimizer get only columns stats that are used in request #7550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
pashandor789
merged 20 commits into
ydb-platform:main
from
pashandor789:serious_columns_getter
Aug 16, 2024
Merged
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
9d6c60d
[KQP] Added columns getter
pashandor789 9c83d1b
fix
pashandor789 cf09e55
[KQP] Added columns getter
pashandor789 4cad270
[KQP] added request
pashandor789 9adf853
Merge branch 'main' of github.com:ydb-platform/ydb into main
pashandor789 785884b
fix
pashandor789 150d97b
[KQP] Added columns getter
pashandor789 4ad5af9
[KQP] added request
pashandor789 c553268
fix
pashandor789 e37a22e
fix
pashandor789 e037928
[KQP] added request for column stats
pashandor789 f77b55c
Fix conflict
pashandor789 23b423b
Fix clang14
pashandor789 03eafd1
Fix clang14
pashandor789 38bd695
fix
pashandor789 75fd270
Fix use after scope
pashandor789 8ac0d5a
[CBO] fix namings
pashandor789 e07b9ee
[CBO] fix namings
pashandor789 7d45458
[CBO] fix namings
pashandor789 dfc4813
[CBO] fix namings
pashandor789 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,253 @@ | ||
| #include "kqp_columns_getter_transformer.h" | ||
|
|
||
| #include <ydb/library/yql/core/yql_expr_optimize.h> | ||
| #include <ydb/core/statistics/service/service.h> | ||
| #include <ydb/core/statistics/events.h> | ||
| #include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h> | ||
| #include <ydb/library/yql/core/yql_statistics.h> | ||
| #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> | ||
| #include <ydb/library/yql/dq/opt/dq_opt_stat.h> | ||
| #include <ydb/library/yql/utils/log/log.h> | ||
|
|
||
| namespace NKikimr::NKqp { | ||
|
|
||
| using namespace NThreading; | ||
| using namespace NYql; | ||
|
|
||
| void TKqpColumnsGetterTransformer::PropagateTableToLambdaArgument(const TExprNode::TPtr& input) { | ||
| if (input->ChildrenSize() < 2) { | ||
| return; | ||
| } | ||
|
|
||
| auto callableInput = input->ChildRef(0); | ||
|
|
||
|
|
||
| for (size_t i = 1; i < input->ChildrenSize(); ++i) { | ||
| auto maybeLambda = TExprBase(input->ChildRef(i)); | ||
| if (!maybeLambda.Maybe<TCoLambda>()) { | ||
| continue; | ||
| } | ||
|
|
||
| auto lambda = maybeLambda.Cast<TCoLambda>(); | ||
| if (!lambda.Args().Size()){ | ||
| continue; | ||
| } | ||
|
|
||
| if (callableInput->IsList()){ | ||
| for (size_t j = 0; j < callableInput->ChildrenSize(); ++j){ | ||
| TableByExprNode[lambda.Args().Arg(j).Ptr()] = TableByExprNode[callableInput->Child(j)]; | ||
| } | ||
| } else { | ||
| TableByExprNode[lambda.Args().Arg(0).Ptr()] = TableByExprNode[callableInput.Get()]; | ||
|
pashandor789 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| IGraphTransformer::TStatus TKqpColumnsGetterTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { | ||
| Y_UNUSED(ctx); | ||
|
|
||
| output = input; | ||
| auto optLvl = Config->CostBasedOptimizationLevel.Get().GetOrElse(TDqSettings::TDefault::CostBasedOptimizationLevel); | ||
| auto enableColumnStats = Config->FeatureFlags.GetEnableColumnStatistics(); | ||
| if (!(optLvl > 0 && enableColumnStats)) { | ||
| return IGraphTransformer::TStatus::Ok; | ||
| } | ||
|
|
||
| VisitExprLambdasLast( | ||
| input, | ||
| [&](const TExprNode::TPtr& input) { | ||
| BeforeLambdas(input) || BeforeLambdasUnmatched(input); | ||
|
|
||
| if (input->IsCallable()) { | ||
| PropagateTableToLambdaArgument(input); | ||
| } | ||
|
|
||
| return true; | ||
| }, | ||
| [&](const TExprNode::TPtr& input) { | ||
| return AfterLambdas(input) || AfterLambdasUnmatched(input); | ||
| } | ||
| ); | ||
|
|
||
| if (ColumnsByTableName.empty()) { | ||
| return IGraphTransformer::TStatus::Ok; | ||
| } | ||
|
|
||
| struct TTableMeta { | ||
| TString TableName; | ||
| THashMap<ui32, TString> ColumnNameByTag; | ||
| }; | ||
| THashMap<TPathId, TTableMeta> tableMetaByPathId; | ||
|
|
||
| // TODO: Add other statistics, not only COUNT_MIN_SKETCH. | ||
| auto getStatisticsRequest = MakeHolder<NStat::TEvStatistics::TEvGetStatistics>(); | ||
| getStatisticsRequest->StatType = NKikimr::NStat::EStatType::COUNT_MIN_SKETCH; | ||
|
|
||
| for (const auto& [table, columns]: ColumnsByTableName) { | ||
| auto tableMeta = Tables.GetTable(Cluster, table).Metadata; | ||
| auto& columnsMeta = tableMeta->Columns; | ||
|
|
||
| auto pathId = TPathId(tableMeta->PathId.OwnerId(), tableMeta->PathId.TableId()); | ||
| for (const auto& column: columns) { | ||
| if (TypesCtx.ColumnStatisticsByTableName.contains(table) && TypesCtx.ColumnStatisticsByTableName[table]->Data.contains(column)) { | ||
| continue; | ||
| } | ||
|
|
||
| if (!columns.contains(column)) { | ||
| YQL_CLOG(DEBUG, ProviderKikimr) << "Table: " + table + " doesn't contain " + column + " to request for column statistics"; | ||
| } | ||
|
|
||
| NKikimr::NStat::TRequest req; | ||
| req.ColumnTag = columnsMeta[column].Id; | ||
| req.PathId = pathId; | ||
| getStatisticsRequest->StatRequests.push_back(req); | ||
|
|
||
| tableMetaByPathId[pathId].TableName = table; | ||
| tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column; | ||
| } | ||
| } | ||
|
|
||
| if (getStatisticsRequest->StatRequests.empty()) { | ||
| return IGraphTransformer::TStatus::Ok; | ||
| } | ||
|
|
||
| using TRequest = NStat::TEvStatistics::TEvGetStatistics; | ||
| using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult; | ||
| struct TResult : public NYql::IKikimrGateway::TGenericResult { | ||
| THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName; | ||
| }; | ||
|
|
||
| auto promise = NewPromise<TResult>(); | ||
| auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)] | ||
| (TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { | ||
| if (!response.Success) { | ||
| promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!")); | ||
| } | ||
|
|
||
| THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName; | ||
|
|
||
| for (auto&& stat: response.StatResponses) { | ||
| auto meta = tableMetaByPathId[stat.Req.PathId]; | ||
| auto columnName = meta.ColumnNameByTag[stat.Req.ColumnTag.value()]; | ||
| auto& columnStatistics = columnStatisticsByTableName[meta.TableName].Data[columnName]; | ||
| columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin); | ||
| } | ||
|
|
||
| promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)}); | ||
| }; | ||
| auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId); | ||
| IActor* requestHandler = | ||
| new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback); | ||
| auto actorId = ActorSystem | ||
| ->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); | ||
| Y_UNUSED(actorId); | ||
|
|
||
| auto res = promise.GetFuture().GetValueSync(); | ||
| if (!res.Issues().Empty()) { | ||
| TStringStream ss; | ||
| res.Issues().PrintTo(ss); | ||
| YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); | ||
| return IGraphTransformer::TStatus::Ok; | ||
| } | ||
|
|
||
| for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) { | ||
| TypesCtx.ColumnStatisticsByTableName.insert( | ||
| {std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))} | ||
| ); | ||
| } | ||
|
|
||
| return IGraphTransformer::TStatus::Ok; | ||
| } | ||
|
|
||
| bool TKqpColumnsGetterTransformer::BeforeLambdas(const TExprNode::TPtr& input) { | ||
| bool matched = true; | ||
|
|
||
| if (TKqpTable::Match(input.Get())) { | ||
| TableByExprNode[input.Get()] = input.Get(); | ||
| } else if (auto maybeStreamLookup = TExprBase(input).Maybe<TKqpCnStreamLookup>()) { | ||
| TableByExprNode[input.Get()] = maybeStreamLookup.Cast().Table().Ptr(); | ||
| } else { | ||
| matched = false; | ||
| } | ||
|
|
||
| return matched; | ||
| } | ||
|
|
||
| bool TKqpColumnsGetterTransformer::BeforeLambdasUnmatched(const TExprNode::TPtr& input) { | ||
| for (const auto& node: input->Children()) { | ||
| if (TableByExprNode.contains(node)) { | ||
| TableByExprNode[input.Get()] = TableByExprNode[node]; | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| bool TKqpColumnsGetterTransformer::AfterLambdas(const TExprNode::TPtr& input) { | ||
| bool matched = true; | ||
|
|
||
| if ( | ||
| TCoFilterBase::Match(input.Get()) || | ||
| TCoFlatMapBase::Match(input.Get()) && IsPredicateFlatMap(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body().Ref()) | ||
| ) { | ||
| std::shared_ptr<TOptimizerStatistics> dummyStats = nullptr; | ||
| auto computer = NDq::TPredicateSelectivityComputer(dummyStats, true); | ||
|
|
||
| if (TCoFilterBase::Match(input.Get())) { | ||
| computer.Compute(TExprBase(input).Cast<TCoFilterBase>().Lambda().Body()); | ||
| } else if (TCoFlatMapBase::Match(input.Get())) { | ||
| computer.Compute(TExprBase(input).Cast<TCoFlatMapBase>().Lambda().Body()); | ||
| } else { | ||
| Y_ENSURE(false); | ||
| } | ||
|
|
||
| auto columnStatsUsedMembers = computer.GetColumnStatsUsedMembers(); | ||
| for (const auto& item: columnStatsUsedMembers.Data) { | ||
| auto exprNode = TExprBase(item.Member).Ptr(); | ||
| if (!TableByExprNode.contains(exprNode) || TableByExprNode[exprNode] == nullptr) { | ||
| continue; | ||
| } | ||
|
|
||
| auto table = TExprBase(TableByExprNode[exprNode]).Cast<TKqpTable>().Path().StringValue(); | ||
| auto column = item.Member.Name().StringValue(); | ||
| size_t pointPos = column.find('.'); // table.column | ||
| if (pointPos != TString::npos) { | ||
| column = column.substr(pointPos + 1); | ||
| } | ||
|
|
||
| ColumnsByTableName[table].insert(std::move(column)); | ||
| } | ||
| } else { | ||
| matched = false; | ||
| } | ||
|
|
||
| return matched; | ||
| } | ||
|
|
||
| bool TKqpColumnsGetterTransformer::AfterLambdasUnmatched(const TExprNode::TPtr& input) { | ||
| if (TableByExprNode.contains(input.Get())) { | ||
| return true; | ||
| } | ||
|
|
||
| for (const auto& node: input->Children()) { | ||
| if (TableByExprNode.contains(node)) { | ||
| TableByExprNode[input.Get()] = TableByExprNode[node]; | ||
| return true; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| TAutoPtr<IGraphTransformer> CreateKqpColumnsGetterTransformer( | ||
| const TKikimrConfiguration::TPtr& config, | ||
| TTypeAnnotationContext& typesCtx, | ||
| TKikimrTablesData& tables, | ||
| TString cluster, | ||
| TActorSystem* actorSystem | ||
| ) { | ||
| return THolder<IGraphTransformer>(new TKqpColumnsGetterTransformer(config, typesCtx, tables, cluster, actorSystem)); | ||
| } | ||
|
|
||
| } // end of NKikimr::NKqp | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.