Skip to content

Commit a739c6b

Browse files
924060929lzyy2024
authored andcommitted
[fix](nereids) fix bucket shuffle join to right compute wrong output property (apache#47888)
fix Illegal bucket shuffle join or colocate join in fragment because compute wrong join output property, introduced by apache#41730 the exception: ``` errCode = 2, detailMessage = Illegal bucket shuffle join or colocate join in fragment ```
1 parent 85b665d commit a739c6b

4 files changed

Lines changed: 132 additions & 11 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -505,32 +505,42 @@ private PhysicalProperties computeShuffleJoinOutputProperties(
505505
return new PhysicalProperties(
506506
DistributionSpecHash.merge(rightHashSpec, leftHashSpec, outputShuffleType)
507507
);
508-
} else {
508+
} else if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.NONE) {
509+
return new PhysicalProperties(
510+
DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
511+
);
512+
} else if (shuffleSide == ShuffleSide.BOTH) {
509513
return new PhysicalProperties(
510514
DistributionSpecHash.merge(leftHashSpec, rightHashSpec, outputShuffleType)
515+
.withShuffleTypeAndForbidColocateJoin(leftHashSpec.getShuffleType())
511516
);
517+
} else {
518+
throw new AnalysisException("unknown shuffle side " + shuffleSide);
512519
}
513520
case LEFT_SEMI_JOIN:
514521
case LEFT_ANTI_JOIN:
515522
case NULL_AWARE_LEFT_ANTI_JOIN:
516523
case LEFT_OUTER_JOIN:
517-
if (shuffleSide == ShuffleSide.LEFT) {
524+
if (shuffleSide == ShuffleSide.LEFT || shuffleSide == ShuffleSide.BOTH) {
518525
return new PhysicalProperties(
519526
leftHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
520527
);
521-
} else {
528+
} else if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.NONE) {
522529
return new PhysicalProperties(leftHashSpec);
530+
} else {
531+
throw new AnalysisException("unknown shuffle side " + shuffleSide);
523532
}
524533
case RIGHT_SEMI_JOIN:
525534
case RIGHT_ANTI_JOIN:
526535
case RIGHT_OUTER_JOIN:
527-
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) {
536+
if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == ShuffleSide.BOTH) {
537+
return new PhysicalProperties(
538+
rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
539+
);
540+
} else if (shuffleSide == ShuffleSide.LEFT || shuffleSide == ShuffleSide.NONE) {
528541
return new PhysicalProperties(rightHashSpec);
529542
} else {
530-
// retain left shuffle type, since coordinator use left most node to schedule fragment
531-
// forbid colocate join, since right table already shuffle
532-
return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
533-
leftHashSpec.getShuffleType()));
543+
throw new AnalysisException("unknown shuffle side " + shuffleSide);
534544
}
535545
case FULL_OUTER_JOIN:
536546
return PhysicalProperties.createAnyFromHash(leftHashSpec, rightHashSpec);

fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.doris.nereids.util.ExpressionUtils;
5656
import org.apache.doris.nereids.util.JoinUtils;
5757
import org.apache.doris.qe.ConnectContext;
58+
import org.apache.doris.qe.SessionVariable;
5859

5960
import com.google.common.collect.ImmutableList;
6061
import com.google.common.collect.Lists;
@@ -390,22 +391,24 @@ ExpressionUtils.EMPTY_CONDITION, ExpressionUtils.EMPTY_CONDITION, new Distribute
390391
GroupExpression groupExpression = new GroupExpression(join);
391392
new Group(null, groupExpression, null);
392393

394+
long leftTableId = 0L;
393395
PhysicalProperties left = new PhysicalProperties(
394396
new DistributionSpecHash(
395397
Lists.newArrayList(new ExprId(0)),
396398
ShuffleType.NATURAL,
397-
0,
399+
leftTableId,
398400
Sets.newHashSet(0L)
399401
),
400402
new OrderSpec(
401403
Lists.newArrayList(new OrderKey(new SlotReference("ignored", IntegerType.INSTANCE),
402404
true, true)))
403405
);
404406

407+
long rightTableId = 1L;
405408
PhysicalProperties right = new PhysicalProperties(new DistributionSpecHash(
406409
Lists.newArrayList(new ExprId(1)),
407410
ShuffleType.NATURAL,
408-
1,
411+
rightTableId,
409412
Sets.newHashSet(1L)
410413
));
411414

@@ -416,8 +419,11 @@ ExpressionUtils.EMPTY_CONDITION, ExpressionUtils.EMPTY_CONDITION, new Distribute
416419
Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
417420
Assertions.assertInstanceOf(DistributionSpecHash.class, result.getDistributionSpec());
418421
DistributionSpecHash actual = (DistributionSpecHash) result.getDistributionSpec();
422+
419423
Assertions.assertEquals(ShuffleType.NATURAL, actual.getShuffleType());
420-
Assertions.assertEquals(-1, actual.getTableId());
424+
Assertions.assertEquals(
425+
SessionVariable.canUseNereidsDistributePlanner() ? rightTableId : -1L, actual.getTableId()
426+
);
421427
// check merged
422428
Assertions.assertEquals(1, actual.getExprIdToEquivalenceSet().size());
423429
Assertions.assertEquals(1, actual.getExprIdToEquivalenceSet().keySet().iterator().next().asInt());
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !bucket_shuffle_to_right --
3+
\N
4+
\N
5+
\N
6+
\N
7+
\N
8+
\N
9+
\N
10+
\N
11+
\N
12+
\N
13+
\N
14+
\N
15+
\N
16+
\N
17+
\N
18+
\N
19+
\N
20+
2
21+
3
22+
6
23+
6
24+
7
25+
7
26+
9
27+
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("bucket_shuffle_to_right") {
19+
multi_sql """
20+
set enable_nereids_distribute_planner=true;
21+
set disable_join_reorder=true;
22+
23+
drop table if exists table_20_undef_partitions2_keys3_properties4_distributed_by53;
24+
create table table_20_undef_partitions2_keys3_properties4_distributed_by53 (
25+
`pk` int,
26+
`col_varchar_10__undef_signed` varchar(10) ,
27+
`col_int_undef_signed` int ,
28+
`col_varchar_1024__undef_signed` varchar(1024)
29+
) engine=olap
30+
DUPLICATE KEY(pk, col_varchar_10__undef_signed)
31+
32+
distributed by hash(pk) buckets 10
33+
properties("replication_num" = "1");
34+
insert into table_20_undef_partitions2_keys3_properties4_distributed_by53(pk,col_int_undef_signed,col_varchar_10__undef_signed,col_varchar_1024__undef_signed) values (0,null,"at",'b'),(1,null,'e',"your"),(2,6,'h',"look"),(3,3,'z',"to"),(4,6,'q',"did"),(5,7,"come",'g'),(6,null,'x',"his"),(7,null,'w','s'),(8,null,"don't",'l'),(9,null,"and","know"),(10,null,'q','c'),(11,null,'u','w'),(12,9,'c','x'),(13,null,"my","or"),(14,null,'a','i'),(15,null,"look",'u'),(16,2,"were","be"),(17,null,"is",'k'),(18,null,'c',"her"),(19,null,"but",'x');
35+
36+
37+
drop table if exists table_23_undef_partitions2_keys3_properties4_distributed_by52;
38+
create table table_23_undef_partitions2_keys3_properties4_distributed_by52 (
39+
`pk` int,
40+
`col_int_undef_signed` int ,
41+
`col_varchar_10__undef_signed` varchar(10) ,
42+
`col_varchar_1024__undef_signed` varchar(1024) MIN
43+
) engine=olap
44+
AGGREGATE KEY(pk, col_int_undef_signed, col_varchar_10__undef_signed)
45+
46+
distributed by hash(pk) buckets 10
47+
properties("replication_num" = "1");
48+
insert into table_23_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_varchar_10__undef_signed,col_varchar_1024__undef_signed) values (0,null,"to","so"),(1,null,'u','j'),(2,null,"say","would"),(3,null,'t',"to"),(4,0,"your",'q'),(5,1,'w',"if"),(6,null,"right",'p'),(7,7,'h',"her"),(8,6,"that",'v'),(9,5,'k',"as"),(10,null,"know","did"),(11,9,"to",'q'),(12,null,"look","don't"),(13,9,"say",'v'),(14,null,'m','j'),(15,9,"i","want"),(16,4,"then","why"),(17,null,"something",'p'),(18,2,'i',"for"),(19,5,"for",'q'),(20,6,"he",'r'),(21,null,'n',"didn't"),(22,null,'n','x');
49+
50+
51+
drop table if exists table_100_undef_partitions2_keys3_properties4_distributed_by5;
52+
create table table_100_undef_partitions2_keys3_properties4_distributed_by5 (
53+
`col_int_undef_signed` int/*agg_type_placeholder*/ ,
54+
`col_varchar_10__undef_signed` varchar(10)/*agg_type_placeholder*/ ,
55+
`col_varchar_1024__undef_signed` varchar(1024)/*agg_type_placeholder*/ ,
56+
`pk` int/*agg_type_placeholder*/
57+
) engine=olap
58+
59+
60+
distributed by hash(pk) buckets 10
61+
properties("replication_num" = "1");
62+
insert into table_100_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_varchar_10__undef_signed,col_varchar_1024__undef_signed) values (0,null,"when","yes"),(1,null,"do",'i'),(2,1,"all","didn't"),(3,null,"don't","who"),(4,9,"your","it"),(5,5,'n','c'),(6,0,"up","it's"),(7,9,'d','a'),(8,3,"yeah",'v'),(9,null,'r','s'),(10,5,'s','n'),(11,null,'w','l'),(12,null,'k',"she"),(13,1,"from","what"),(14,1,'t',"at"),(15,null,"something",'s'),(16,null,'q',"his"),(17,3,'k','d'),(18,6,"you",'m'),(19,null,"something","could"),(20,8,'e','d'),(21,null,"I'm",'j'),(22,8,"get","can"),(23,5,'b',"but"),(24,3,'f','q'),(25,8,"who",'z'),(26,null,"was","her"),(27,5,'q','l'),(28,1,"here","about"),(29,null,'i',"for"),(30,null,"that","ok"),(31,null,'l',"from"),(32,9,"my",'o'),(33,6,'k',"one"),(34,null,"or","yeah"),(35,null,'q',"going"),(36,7,'x','g'),(37,null,'b',"one"),(38,null,'w','l'),(39,9,'q',"that"),(40,8,'x',"the"),(41,1,"was",'y'),(42,9,"his","you"),(43,null,"with","okay"),(44,0,"with",'f'),(45,2,'c',"as"),(46,null,"yes",'l'),(47,null,'a',"yeah"),(48,2,'c','b'),(49,5,'f','j'),(50,null,'k','j'),(51,4,"don't",'b'),(52,1,"that's",'u'),(53,null,"had",'z'),(54,null,"didn't",'o'),(55,null,'t','v'),(56,null,"this",'o'),(57,null,'b','p'),(58,7,'u',"good"),(59,null,"something",'a'),(60,2,"back",'w'),(61,null,"got","he"),(62,null,'i',"back"),(63,5,"why","be"),(64,0,'m',"he's"),(65,null,'j','e'),(66,null,"from",'z'),(67,null,'y','i'),(68,null,"he's",'l'),(69,5,"him",'b'),(70,2,'b','b'),(71,1,'w','u'),(72,8,'n',"he's"),(73,4,"say","you"),(74,null,'y',"as"),(75,null,"he",'h'),(76,5,'k','y'),(77,null,"not","is"),(78,3,"on","you're"),(79,null,'a','r'),(80,null,"just","that"),(81,8,'k',"as"),(82,null,'y',"to"),(83,2,"we","yes"),(84,7,'j',"look"),(85,2,'v','d'),(86,null,"me",'u'),(87,8,"okay",'u'),(88,4,"not",'p'),(89,3,'f',"he's"),(90,null,'g',"what"),(91,3,'l','o'),(92,null,"I'm",'i'),(93,null,"been","back"),(94,null,'w','b'),(95,null,"okay","who"),(96,1,"got","all"),(97,null,"your",'c'),(98,null,'i','u'),(99,null,"ok","they");
63+
"""
64+
65+
order_qt_bucket_shuffle_to_right """
66+
select alias1 . `col_int_undef_signed` AS field1
67+
from table_100_undef_partitions2_keys3_properties4_distributed_by5 as alias3
68+
right outer join
69+
(
70+
select alias1 . `pk`, alias1 . `col_int_undef_signed`
71+
from table_23_undef_partitions2_keys3_properties4_distributed_by52 as alias2
72+
right outer join
73+
table_20_undef_partitions2_keys3_properties4_distributed_by53 as alias1
74+
on alias1 . `pk` = alias2 . `col_int_undef_signed`
75+
) alias1 ON alias1 . `pk` = alias3 . `pk`
76+
WHERE alias1 . `pk` >= 0;
77+
"""
78+
}

0 commit comments

Comments
 (0)