Skip to content

Commit 3282b6d

Browse files
Merge d98362a into 601fa99
2 parents 601fa99 + d98362a commit 3282b6d

3 files changed

Lines changed: 377 additions & 118 deletions

File tree

ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,11 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
598598
leftJoinKeys.emplace(leftKey);
599599
}
600600

601+
const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
602+
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
603+
&& supportedStreamJoinKinds.contains(join.JoinType().Value())
604+
&& !indexName;
605+
601606
auto leftRowArg = Build<TCoArgument>(ctx, join.Pos())
602607
.Name("leftRowArg")
603608
.Done();
@@ -677,10 +682,19 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
677682
}
678683
if (canCast) {
679684
DBG("------ cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
680-
member = Build<TCoConvert>(ctx, join.Pos())
681-
.Input(member)
682-
.Type().Build(rightDataType->GetName())
683-
.Done().Ptr();
685+
686+
if (useStreamIndexLookupJoin) {
687+
// For stream lookup join we should cast keys before join
688+
member = Build<TCoSafeCast>(ctx, join.Pos())
689+
.Value(member)
690+
.Type(ExpandType(join.Pos(), *rightType, ctx))
691+
.Done().Ptr();
692+
} else {
693+
member = Build<TCoConvert>(ctx, join.Pos())
694+
.Input(member)
695+
.Type().Build(rightDataType->GetName())
696+
.Done().Ptr();
697+
}
684698
} else {
685699
DBG("------ can not cast " << leftDataType->GetName() << " to " << rightDataType->GetName());
686700
return {};
@@ -704,11 +718,6 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
704718
return {};
705719
}
706720

707-
const bool useStreamIndexLookupJoin = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
708-
&& kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
709-
&& supportedStreamJoinKinds.contains(join.JoinType().Value())
710-
&& !indexName;
711-
712721
bool needPrecomputeLeft = (kqpCtx.IsDataQuery() || kqpCtx.IsGenericQuery())
713722
&& !join.LeftInput().Maybe<TCoParameter>()
714723
&& !IsParameterToListOfStructsRepack(join.LeftInput())

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,9 +601,18 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
601601
break;
602602
}
603603

604-
UnprocessedRows.pop_front();
604+
auto hasNulls = [](const TOwnedCellVec& cellVec) {
605+
for (const auto& cell : cellVec) {
606+
if (cell.IsNull()) {
607+
return true;
608+
}
609+
}
605610

606-
if (!joinKey.data()->IsNull()) { // don't use nulls as lookup keys, because null != null
611+
return false;
612+
};
613+
614+
UnprocessedRows.pop_front();
615+
if (!hasNulls(joinKey)) { // don't use nulls as lookup keys, because null != null
607616
std::vector <std::pair<ui64, TOwnedTableRange>> partitions;
608617
if (joinKey.size() < KeyColumns.size()) {
609618
// build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
@@ -730,7 +739,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
730739
for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) {
731740
auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name);
732741
YQL_ENSURE(it != ReadColumns.end());
733-
joinKeyCells[joinKeyIdx] = row[std::distance(ReadColumns.begin(), it)];
742+
joinKeyCells[LookupKeyColumns[joinKeyIdx]->KeyOrder] = row[std::distance(ReadColumns.begin(), it)];
734743
}
735744

736745
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);

0 commit comments

Comments
 (0)