Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 107 additions & 9 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#include <unordered_map>
#include <regex>

#include <ydb/library/yql/utils/log/log.h>


namespace NKikimr {
namespace NKqp {

Expand Down Expand Up @@ -1015,6 +1018,10 @@ class TxPlanSerializer {
operatorId = Visit(maybeExtend.Cast(), planNode);
} else if (auto maybeToFlow = TMaybeNode<TCoToFlow>(node)) {
operatorId = Visit(maybeToFlow.Cast(), planNode);
} else if (auto maybeAssumeSorted = TMaybeNode<TCoAssumeSorted>(node)) {
operatorId = Visit(maybeAssumeSorted.Cast(), planNode);
} else if (auto maybeMember = TMaybeNode<TCoMember>(node)) {
operatorId = Visit(maybeMember.Cast(), planNode);
} else if (auto maybeIter = TMaybeNode<TCoIterator>(node)) {
operatorId = Visit(maybeIter.Cast(), planNode);
} else if (auto maybePartitionByKey = TMaybeNode<TCoPartitionByKey>(node)) {
Expand Down Expand Up @@ -1183,6 +1190,56 @@ class TxPlanSerializer {
return AddOperator(planNode, "ConstantExpr", std::move(op));
}

TMaybe<std::variant<ui32, TArgContext>> Visit(const TCoAssumeSorted& assumeSorted, TQueryPlanNode& planNode) {
const auto precomputeValue = NPlanUtils::PrettyExprStr(assumeSorted.Input());

TOperator op;
op.Properties["Name"] = "AssumeSorted";

if (auto maybeResultBinding = ContainResultBinding(precomputeValue)) {
auto [txId, resId] = *maybeResultBinding;
planNode.CteRefName = TStringBuilder() << "precompute_" << txId << "_" << resId;
op.Properties["AssumeSorted"] = *planNode.CteRefName;
} else {
auto inputs = Visit(assumeSorted.Input().Ptr(), planNode);
if (inputs.size() == 1) {
return inputs[0];
} else {
return TMaybe<std::variant<ui32, TArgContext>> ();
}
}

return AddOperator(planNode, "ConstantExpr", std::move(op));
}

TMaybe<std::variant<ui32, TArgContext>> Visit(const TCoMember& member, TQueryPlanNode& planNode) {
const auto memberValue = NPlanUtils::PrettyExprStr(member.Struct());

TOperator op;
op.Properties["Name"] = "Member";

if (auto maybeResultBinding = ContainResultBinding(memberValue)) {
auto [txId, resId] = *maybeResultBinding;
planNode.CteRefName = TStringBuilder() << "precompute_" << txId << "_" << resId;
op.Properties["Member"] = *planNode.CteRefName;
} else {
auto inputs = Visit(member.Struct().Ptr(), planNode);
if (inputs.size() == 1) {
return inputs[0];
} else {
return TMaybe<std::variant<ui32, TArgContext>> ();
}
}

if (std::find_if(planNode.Operators.begin(), planNode.Operators.end(), [op](const auto & x) {
return x.Properties.at("Name")=="Member" && x.Properties.at("Member")==op.Properties.at("Member");
})) {
return TMaybe<std::variant<ui32, TArgContext>> ();
}

return AddOperator(planNode, "ConstantExpr", std::move(op));
}

std::variant<ui32, TArgContext> Visit(const TCoIterator& iter, TQueryPlanNode& planNode) {
const auto iterValue = NPlanUtils::PrettyExprStr(iter.List());

Expand Down Expand Up @@ -1825,6 +1882,8 @@ void BuildPlanIndex(NJson::TJsonValue& plan, THashMap<int, NJson::TJsonValue>& p
auto pos = precomputeName.find("precompute");
if (pos != TString::npos) {
precomputes[precomputeName.substr(pos)] = plan;
} else if (precomputeName.size()>=4 && precomputeName.find("CTE ") != TString::npos) {
precomputes[precomputeName.substr(4)] = plan;
}
}

Expand Down Expand Up @@ -1872,6 +1931,11 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,

NJson::TJsonValue result;
result["PlanNodeId"] = currentNodeId;

//if (plan.GetMapSafe().contains("PlanNodeId")) {
// YQL_CLOG(TRACE, CoreDq) << "Recursed into " << plan.GetMapSafe().at("PlanNodeId").GetIntegerSafe() << ", constructed: " << currentNodeId;
//}

if (plan.GetMapSafe().contains("PlanNodeType")) {
result["PlanNodeType"] = plan.GetMapSafe().at("PlanNodeType").GetStringSafe();
}
Expand All @@ -1889,7 +1953,7 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
return result;
}

if (plan.GetMapSafe().at("Node Type") == "TableLookup") {
if (plan.GetMapSafe().at("Node Type").GetStringSafe() == "TableLookup") {
NJson::TJsonValue newOps;
NJson::TJsonValue op;

Expand All @@ -1905,20 +1969,32 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
}

for (auto p : plan.GetMapSafe().at("Plans").GetArraySafe()) {
if (p.GetMapSafe().at("Node Type").GetStringSafe().find("Precompute") == TString::npos) {
if (!p.GetMapSafe().contains("Operators") && p.GetMapSafe().contains("CTE Name")) {
auto precompute = p.GetMapSafe().at("CTE Name").GetStringSafe();
if (precomputes.contains(precompute)) {
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << precompute ;
planInputs.AppendValue(ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter));
} //else {
// YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << precompute ;
//}
} else if (p.GetMapSafe().at("Node Type").GetStringSafe().find("Precompute") == TString::npos) {
planInputs.AppendValue(ReconstructQueryPlanRec(p, 0, planIndex, precomputes, nodeCounter));
}
}
result["Plans"] = planInputs;
return result;
}

if (plan.GetMapSafe().contains("CTE Name") && plan.GetMapSafe().at("Node Type") == "ConstantExpr") {
if (plan.GetMapSafe().contains("CTE Name") && plan.GetMapSafe().at("Node Type").GetStringSafe() == "ConstantExpr") {
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
if (!precomputes.contains(precompute)) {
result["Node Type"] = "ConstantExpr";
//YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << precompute ;

result["Node Type"] = plan.GetMapSafe().at("Node Type");
return result;
}
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << precompute ;

return ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter);
}

Expand All @@ -1930,6 +2006,11 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
auto opName = op.GetMapSafe().at("Name").GetStringSafe();

for (auto opInput : op.GetMapSafe().at("Inputs").GetArraySafe()) {
// Sometimes we have inputs for these operators, don't process them
if (opName == "TablePointLookup") {
break;
}

if (opInput.GetMapSafe().contains("ExternalPlanNodeId")) {
auto inputPlanKey = opInput.GetMapSafe().at("ExternalPlanNodeId").GetIntegerSafe();
auto inputPlan = planIndex.at(inputPlanKey);
Expand All @@ -1938,8 +2019,9 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
auto inputPlanId = opInput.GetMapSafe().at("InternalOperatorId").GetIntegerSafe();
planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
}
// temp hack
if (opName == "Filter") {

// Sometimes we have multiple inputs for these operators, break after the first one
if (opName == "Filter" || opName == "TopSort") {
break;
}
}
Expand All @@ -1948,17 +2030,31 @@ NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
op.GetMapSafe().erase("Inputs");
}

if (op.GetMapSafe().contains("Input") || op.GetMapSafe().contains("ToFlow")) {
if (op.GetMapSafe().contains("Input")
|| op.GetMapSafe().contains("ToFlow")
|| op.GetMapSafe().contains("Member")
|| op.GetMapSafe().contains("AssumeSorted")
|| op.GetMapSafe().contains("Iterator")) {

TString maybePrecompute = "";
if (op.GetMapSafe().contains("Input")) {
maybePrecompute = op.GetMapSafe().at("Input").GetStringSafe();
} else if (op.GetMapSafe().contains("ToFlow")) {
maybePrecompute = op.GetMapSafe().at("ToFlow").GetStringSafe();
} else if (op.GetMapSafe().contains("Member")) {
maybePrecompute = op.GetMapSafe().at("Member").GetStringSafe();
} else if (op.GetMapSafe().contains("AssumeSorted")) {
maybePrecompute = op.GetMapSafe().at("AssumeSorted").GetStringSafe();
} else if (op.GetMapSafe().contains("Iterator")) {
maybePrecompute = op.GetMapSafe().at("Iterator").GetStringSafe();
}

if (precomputes.contains(maybePrecompute)) {
//YQL_CLOG(TRACE, CoreDq) << "Following precompute: " << maybePrecompute ;
planInputs.push_back(ReconstructQueryPlanRec(precomputes.at(maybePrecompute), 0, planIndex, precomputes, nodeCounter));
}
} //else {
// YQL_CLOG(TRACE, CoreDq) << "Didn't find precompute: " << maybePrecompute ;
//}
}

result["Node Type"] = opName;
Expand Down Expand Up @@ -1988,7 +2084,9 @@ NJson::TJsonValue SimplifyQueryPlan(NJson::TJsonValue& plan) {
"Stage",
"Iterator",
"PartitionByKey",
"ToFlow"
"ToFlow",
"Member",
"AssumeSorted"
};

THashMap<int, NJson::TJsonValue> planIndex;
Expand Down