Skip to content

Commit 8c0f1bc

Browse files
authored
Fix YQL-18475 (#4937)
1 parent 785a089 commit 8c0f1bc

3 files changed

Lines changed: 26 additions & 23 deletions

File tree

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -880,9 +880,8 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
880880

881881
if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
882882
if (*PartialJoinCompleted) {
883-
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
883+
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) {
884884
UnpackJoinedData(output);
885-
886885
return EFetchResult::One;
887886
}
888887

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -154,21 +154,21 @@ void TTable::ResetIterator() {
154154
}
155155

156156
// Checks if there are more tuples and sets bucketId and tupleId to next valid.
157-
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId ) {
157+
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) {
158158

159-
if (bucketId >= tableBucketsStats.size()) return false;
159+
if (bucketId >= bucketLimit) return false;
160160

161161
if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) {
162162
tupleId = 0;
163163
bucketId ++;
164164

165-
if (bucketId == tableBucketsStats.size()) {
165+
if (bucketId == bucketLimit) {
166166
return false;
167167
}
168168

169169
while( tableBucketsStats[bucketId].TuplesNum == 0 ) {
170170
bucketId ++;
171-
if (bucketId == tableBucketsStats.size()) {
171+
if (bucketId == bucketLimit) {
172172
return false;
173173
}
174174
}
@@ -181,7 +181,7 @@ inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui
181181

182182
// Returns value of next tuple. Returs true if there are more tuples
183183
bool TTable::NextTuple(TupleData & td){
184-
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex )) {
184+
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) {
185185
GetTupleData(CurrIterBucket, CurrIterIndex, td);
186186
CurrIterIndex++;
187187
return true;
@@ -767,15 +767,15 @@ inline bool HasRightIdMatch(ui64 currId, ui64 & rightIdIter, const std::vector<u
767767
}
768768

769769

770-
bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
770+
bool TTable::NextJoinedData( TupleData & td1, TupleData & td2, ui64 bucketLimit) {
771771

772772
if (JoinKind == EJoinKind::Cross) {
773773

774-
if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex))
774+
if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit))
775775
{
776776
JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1);
777777

778-
if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex))
778+
if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit))
779779
{
780780
JoinTable2->GetTupleData(JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, td2);
781781
JoinTable2->CurrIterIndex++;
@@ -786,15 +786,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
786786
JoinTable2->CurrIterBucket = 0;
787787
JoinTable2->CurrIterIndex = 0;
788788
JoinTable1->CurrIterIndex++;
789-
return NextJoinedData(td1, td2);
789+
return NextJoinedData(td1, td2, bucketLimit);
790790
}
791791
}
792792
else
793793
return false;
794794
}
795795

796796
if ( JoinKind == EJoinKind::Inner ) {
797-
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
797+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
798798
ui32 tupleId2;
799799
if (HasJoinedTupleId(JoinTable1, tupleId2))
800800
{
@@ -810,7 +810,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
810810
}
811811

812812
if ( JoinKind == EJoinKind::Left ) {
813-
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
813+
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
814814
ui32 tupleId2;
815815
if (HasJoinedTupleId(JoinTable1, tupleId2))
816816
{
@@ -845,7 +845,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
845845
}
846846

847847
if ( JoinKind == EJoinKind::Right ) {
848-
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
848+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
849849
ui32 tupleId2;
850850
if (HasJoinedTupleId(JoinTable1, tupleId2))
851851
{
@@ -886,7 +886,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
886886
if ( RightTableBatch_ && HasMoreRightTuples_ )
887887
return false;
888888

889-
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
889+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
890890
ui32 tupleId2;
891891

892892
bool globalMatchedId = false;
@@ -917,7 +917,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
917917
if (LeftTableBatch_ && HasMoreLeftTuples_ )
918918
return false;
919919

920-
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
920+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
921921
ui32 tupleId2;
922922

923923
bool globalMatchedId = false;
@@ -949,7 +949,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
949949
if (RightTableBatch_ && HasMoreRightTuples_ )
950950
return false;
951951

952-
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
952+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
953953
ui32 tupleId2;
954954

955955
if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
@@ -983,7 +983,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
983983
if (LeftTableBatch_ && HasMoreLeftTuples_ )
984984
return false;
985985

986-
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
986+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
987987
ui32 tupleId2;
988988
if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
989989
{
@@ -1010,7 +1010,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10101010
}
10111011

10121012
if ( JoinKind == EJoinKind::Full ) {
1013-
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
1013+
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
10141014
ui32 tupleId2;
10151015
if (HasJoinedTupleId(JoinTable1, tupleId2))
10161016
{
@@ -1036,7 +1036,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10361036
Table2Initialized_ = true;
10371037
}
10381038

1039-
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
1039+
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) {
10401040

10411041
if (CurrIterBucket != JoinTable2->CurrIterBucket) {
10421042
CurrIterBucket = JoinTable2->CurrIterBucket;
@@ -1060,7 +1060,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10601060
}
10611061

10621062
if ( JoinKind == EJoinKind::Exclusion ) {
1063-
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
1063+
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
10641064
ui32 tupleId2;
10651065
if (HasJoinedTupleId(JoinTable1, tupleId2))
10661066
{
@@ -1078,7 +1078,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10781078

10791079
td1.AllNulls = true;
10801080

1081-
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
1081+
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) {
10821082

10831083
if (CurrIterBucket != JoinTable2->CurrIterBucket) {
10841084
CurrIterBucket = JoinTable2->CurrIterBucket;

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,11 @@ class TTable {
275275
void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets);
276276

277277
// Returns next jointed tuple data. Returs true if there are more tuples
278-
bool NextJoinedData(TupleData& td1, TupleData& td2);
278+
bool NextJoinedData(TupleData& td1, TupleData& td2, ui64 bucketLimit);
279+
280+
bool NextJoinedData(TupleData& td1, TupleData& td2) {
281+
return NextJoinedData(td1, td2, JoinTable1->TableBucketsStats.size());
282+
}
279283

280284
// Creates buckets that support spilling.
281285
void InitializeBucketSpillers(ISpiller::TPtr spiller);

0 commit comments

Comments
 (0)