Skip to content

Commit 5a4f7c6

Browse files
authored
[fix](Nereids) fix new coordinator compute a wrong scanRangeNum (#43850)
fix new coordinator compute a wrong scanRangeNum, introduced by #41730 This bug will show a wrong progress in s3 load: ``` Progress: 0.00%(73/2147483647) ```
1 parent 6c2c36d commit 5a4f7c6

3 files changed

Lines changed: 144 additions & 2 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/qe/CoordinatorContext.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
3131
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
3232
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
33+
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
34+
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
35+
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
36+
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
3337
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
3438
import org.apache.doris.planner.DataSink;
3539
import org.apache.doris.planner.PlanFragment;
@@ -40,12 +44,18 @@
4044
import org.apache.doris.resource.workloadgroup.QueryQueue;
4145
import org.apache.doris.resource.workloadgroup.QueueToken;
4246
import org.apache.doris.service.ExecuteEnv;
47+
import org.apache.doris.thrift.TBrokerScanRange;
4348
import org.apache.doris.thrift.TDescriptorTable;
49+
import org.apache.doris.thrift.TExternalScanRange;
50+
import org.apache.doris.thrift.TFileScanRange;
4451
import org.apache.doris.thrift.TNetworkAddress;
52+
import org.apache.doris.thrift.TPaloScanRange;
4553
import org.apache.doris.thrift.TPipelineWorkloadGroup;
4654
import org.apache.doris.thrift.TQueryGlobals;
4755
import org.apache.doris.thrift.TQueryOptions;
4856
import org.apache.doris.thrift.TResourceLimit;
57+
import org.apache.doris.thrift.TScanRange;
58+
import org.apache.doris.thrift.TScanRangeParams;
4959
import org.apache.doris.thrift.TUniqueId;
5060

5161
import com.google.common.base.Suppliers;
@@ -418,9 +428,57 @@ private TNetworkAddress computeDirectConnectCoordinator() {
418428

419429
private int getScanRangeNum() {
420430
int scanRangeNum = 0;
421-
for (ScanNode scanNode : scanNodes) {
422-
scanRangeNum += scanNode.getScanRangeNum();
431+
for (PipelineDistributedPlan distributedPlan : distributedPlans) {
432+
for (AssignedJob instanceJob : distributedPlan.getInstanceJobs()) {
433+
ScanSource scanSource = instanceJob.getScanSource();
434+
if (scanSource instanceof BucketScanSource) {
435+
BucketScanSource bucketScanSource = (BucketScanSource) scanSource;
436+
for (Map<ScanNode, ScanRanges> kv : bucketScanSource.bucketIndexToScanNodeToTablets.values()) {
437+
for (ScanRanges scanRanges : kv.values()) {
438+
for (TScanRangeParams param : scanRanges.params) {
439+
scanRangeNum += computeScanRangeNumByScanRange(param);
440+
}
441+
}
442+
}
443+
} else {
444+
DefaultScanSource defaultScanSource = (DefaultScanSource) scanSource;
445+
for (ScanRanges scanRanges : defaultScanSource.scanNodeToScanRanges.values()) {
446+
for (TScanRangeParams param : scanRanges.params) {
447+
scanRangeNum += computeScanRangeNumByScanRange(param);
448+
}
449+
}
450+
}
451+
}
452+
}
453+
return scanRangeNum;
454+
}
455+
456+
private int computeScanRangeNumByScanRange(TScanRangeParams param) {
457+
int scanRangeNum = 0;
458+
TScanRange scanRange = param.getScanRange();
459+
if (scanRange == null) {
460+
return scanRangeNum;
461+
}
462+
TBrokerScanRange brokerScanRange = scanRange.getBrokerScanRange();
463+
if (brokerScanRange != null) {
464+
scanRangeNum += brokerScanRange.getRanges().size();
465+
}
466+
TExternalScanRange externalScanRange = scanRange.getExtScanRange();
467+
if (externalScanRange != null) {
468+
TFileScanRange fileScanRange = externalScanRange.getFileScanRange();
469+
if (fileScanRange != null) {
470+
if (fileScanRange.isSetRanges()) {
471+
scanRangeNum += fileScanRange.getRanges().size();
472+
} else if (fileScanRange.isSetSplitSource()) {
473+
scanRangeNum += fileScanRange.getSplitSource().getNumSplits();
474+
}
475+
}
476+
}
477+
TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
478+
if (paloScanRange != null) {
479+
scanRangeNum += 1;
423480
}
481+
// TODO: more ranges?
424482
return scanRangeNum;
425483
}
426484
}

fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,17 @@ public PlanChecker optimize() {
270270
return this;
271271
}
272272

273+
public NereidsPlanner plan(String sql) {
274+
StatementContext statementContext = new StatementContext(connectContext, new OriginStatement(sql, 0));
275+
connectContext.setStatementContext(statementContext);
276+
NereidsPlanner planner = new NereidsPlanner(statementContext);
277+
LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
278+
LogicalPlanAdapter parsedPlanAdaptor = new LogicalPlanAdapter(parsedPlan, statementContext);
279+
statementContext.setParsedStatement(parsedPlanAdaptor);
280+
planner.planWithLock(parsedPlanAdaptor);
281+
return planner;
282+
}
283+
273284
public PlanChecker dpHypOptimize() {
274285
double now = System.currentTimeMillis();
275286
cascadesContext.getStatementContext().setDpHyp(true);
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.qe;
19+
20+
import org.apache.doris.catalog.EnvFactory;
21+
import org.apache.doris.common.FeConstants;
22+
import org.apache.doris.nereids.NereidsPlanner;
23+
import org.apache.doris.nereids.util.PlanChecker;
24+
import org.apache.doris.thrift.TUniqueId;
25+
import org.apache.doris.utframe.TestWithFeService;
26+
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.io.IOException;
32+
import java.util.UUID;
33+
34+
public class NereidsCoordinatorTest extends TestWithFeService {
35+
@BeforeAll
36+
public void init() throws Exception {
37+
FeConstants.runningUnitTest = true;
38+
39+
createDatabase("test");
40+
useDatabase("test");
41+
42+
createTable("create table tbl(id int) distributed by hash(id) buckets 10 properties('replication_num' = '1');");
43+
}
44+
45+
@Test
46+
public void testNereidsCoordinatorScanRangeNum() throws IOException {
47+
NereidsPlanner planner = plan("select * from test.tbl");
48+
NereidsCoordinator coordinator = (NereidsCoordinator) EnvFactory.getInstance()
49+
.createCoordinator(connectContext, null, planner, null);
50+
int scanRangeNum = coordinator.getScanRangeNum();
51+
Assertions.assertEquals(10, scanRangeNum);
52+
}
53+
54+
@Test
55+
public void testNereidsCoordinatorScanRangeNum2() throws IOException {
56+
NereidsPlanner planner = plan("select * from information_schema.columns");
57+
NereidsCoordinator coordinator = (NereidsCoordinator) EnvFactory.getInstance()
58+
.createCoordinator(connectContext, null, planner, null);
59+
int scanRangeNum = coordinator.getScanRangeNum();
60+
Assertions.assertEquals(0, scanRangeNum);
61+
}
62+
63+
private NereidsPlanner plan(String sql) throws IOException {
64+
ConnectContext connectContext = createDefaultCtx();
65+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
66+
connectContext.setThreadLocalInfo();
67+
68+
UUID uuid = UUID.randomUUID();
69+
connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
70+
NereidsPlanner planner = PlanChecker.from(connectContext).plan(sql);
71+
return planner;
72+
}
73+
}

0 commit comments

Comments
 (0)