Skip to content

Commit c22db94

Browse files
authored
[opt](mtmv) optimize mtmv rewrite performance (#49514) (#51309)
1 parent ee74a40 commit c22db94

35 files changed

Lines changed: 2252 additions & 470 deletions

fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class SummaryProfile {
8787
public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table Time";
8888
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
8989
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
90+
91+
public static final String NEREIDS_COLLECT_TABLE_PARTITION_TIME = "Nereids Collect Table Partition Time";
9092
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
9193
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
9294
public static final String NEREIDS_GARBAGE_COLLECT_TIME = "Nereids GarbageCollect Time";
@@ -198,6 +200,8 @@ public class SummaryProfile {
198200
private long parseSqlFinishTime = -1;
199201

200202
private long nereidsLockTableFinishTime = -1;
203+
204+
private long nereidsCollectTablePartitionFinishTime = -1;
201205
private long nereidsAnalysisFinishTime = -1;
202206
private long nereidsRewriteFinishTime = -1;
203207
private long nereidsOptimizeFinishTime = -1;
@@ -318,6 +322,8 @@ private void updateExecutionSummaryProfile() {
318322
executionSummaryProfile.addInfoString(NEREIDS_LOCK_TABLE_TIME, getPrettyNereidsLockTableTime());
319323
executionSummaryProfile.addInfoString(NEREIDS_ANALYSIS_TIME, getPrettyNereidsAnalysisTime());
320324
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
325+
executionSummaryProfile.addInfoString(NEREIDS_COLLECT_TABLE_PARTITION_TIME,
326+
getPrettyNereidsCollectTablePartitionTime());
321327
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
322328
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
323329
executionSummaryProfile.addInfoString(NEREIDS_GARBAGE_COLLECT_TIME, getPrettyNereidsGarbageCollectionTime());
@@ -406,6 +412,10 @@ public void setNereidsLockTableFinishTime() {
406412
this.nereidsLockTableFinishTime = TimeUtils.getStartTimeMs();
407413
}
408414

415+
public void setNereidsCollectTablePartitionFinishTime() {
416+
this.nereidsCollectTablePartitionFinishTime = TimeUtils.getStartTimeMs();
417+
}
418+
409419
public void setNereidsAnalysisTime() {
410420
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
411421
}
@@ -663,8 +673,13 @@ public String getPrettyNereidsRewriteTime() {
663673
return getPrettyTime(nereidsRewriteFinishTime, nereidsAnalysisFinishTime, TUnit.TIME_MS);
664674
}
665675

676+
677+
public String getPrettyNereidsCollectTablePartitionTime() {
678+
return getPrettyTime(nereidsCollectTablePartitionFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
679+
}
680+
666681
public String getPrettyNereidsOptimizeTime() {
667-
return getPrettyTime(nereidsOptimizeFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS);
682+
return getPrettyTime(nereidsOptimizeFinishTime, nereidsCollectTablePartitionFinishTime, TUnit.TIME_MS);
668683
}
669684

670685
public String getPrettyNereidsTranslateTime() {

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.catalog.Env;
2121
import org.apache.doris.catalog.MTMV;
22+
import org.apache.doris.catalog.Partition;
2223
import org.apache.doris.catalog.Table;
2324
import org.apache.doris.common.AnalysisException;
2425
import org.apache.doris.common.DdlException;
@@ -27,6 +28,7 @@
2728
import org.apache.doris.job.exception.JobException;
2829
import org.apache.doris.job.extensions.mtmv.MTMVTask;
2930
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
31+
import org.apache.doris.nereids.rules.exploration.mv.PartitionCompensator;
3032
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
3133
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
3234
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
@@ -43,6 +45,7 @@
4345
import org.apache.logging.log4j.LogManager;
4446
import org.apache.logging.log4j.Logger;
4547

48+
import java.util.Collection;
4649
import java.util.List;
4750
import java.util.Map;
4851
import java.util.Objects;
@@ -82,13 +85,21 @@ public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos, ConnectContex
8285
boolean forceConsistent, BiPredicate<ConnectContext, MTMV> predicate) {
8386
Set<MTMV> res = Sets.newLinkedHashSet();
8487
Set<BaseTableInfo> mvInfos = getMTMVInfos(tableInfos);
88+
Map<List<String>, Set<String>> queryUsedPartitions = PartitionCompensator.getQueryUsedPartitions(
89+
ctx.getStatementContext());
90+
8591
for (BaseTableInfo tableInfo : mvInfos) {
8692
try {
8793
MTMV mtmv = (MTMV) MTMVUtil.getTable(tableInfo);
8894
if (predicate.test(ctx, mtmv)) {
8995
continue;
9096
}
91-
if (isMVPartitionValid(mtmv, ctx, forceConsistent)) {
97+
if (!mtmv.isUseForRewrite()) {
98+
continue;
99+
}
100+
BaseTableInfo relatedTableInfo = mtmv.getMvPartitionInfo().getRelatedTableInfo();
101+
if (isMVPartitionValid(mtmv, ctx, forceConsistent,
102+
relatedTableInfo == null ? null : queryUsedPartitions.get(relatedTableInfo.toList()))) {
92103
res.add(mtmv);
93104
}
94105
} catch (Exception e) {
@@ -117,10 +128,15 @@ public Set<MTMV> getAllMTMVs(List<BaseTableInfo> tableInfos) {
117128
}
118129

119130
@VisibleForTesting
120-
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent) {
131+
public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean forceConsistent,
132+
Set<String> relatedPartitions) {
121133
long currentTimeMillis = System.currentTimeMillis();
122-
return !CollectionUtils
123-
.isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMillis, forceConsistent));
134+
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(
135+
mtmv, ctx, currentTimeMillis, forceConsistent, relatedPartitions);
136+
// MTMVRewriteUtil.getMTMVCanRewritePartitions is time-consuming behavior, So record for used later
137+
ctx.getStatementContext().getMvCanRewritePartitionsMap().putIfAbsent(
138+
new BaseTableInfo(mtmv), mtmvCanRewritePartitions);
139+
return !CollectionUtils.isEmpty(mtmvCanRewritePartitions);
124140
}
125141

126142
private Set<BaseTableInfo> getMTMVInfos(List<BaseTableInfo> tableInfos) {

fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@
2525
import org.apache.doris.qe.ConnectContext;
2626

2727
import com.google.common.collect.Lists;
28+
import com.google.common.collect.Maps;
2829
import com.google.common.collect.Sets;
2930
import org.apache.logging.log4j.LogManager;
3031
import org.apache.logging.log4j.Logger;
3132

3233
import java.util.Collection;
3334
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Map.Entry;
37+
import java.util.Set;
3438

3539
public class MTMVRewriteUtil {
3640
private static final Logger LOG = LogManager.getLogger(MTMVRewriteUtil.class);
@@ -43,7 +47,8 @@ public class MTMVRewriteUtil {
4347
* @return
4448
*/
4549
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx,
46-
long currentTimeMills, boolean forceConsistent) {
50+
long currentTimeMills, boolean forceConsistent,
51+
Set<String> relatedPartitions) {
4752
List<Partition> res = Lists.newArrayList();
4853
Collection<Partition> allPartitions = mtmv.getPartitions();
4954
MTMVRelation mtmvRelation = mtmv.getRelation();
@@ -55,6 +60,11 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
5560
if (mtmvStatus.getState() != MTMVState.NORMAL || mtmvStatus.getRefreshState() == MTMVRefreshState.INIT) {
5661
return res;
5762
}
63+
// if relatedPartitions is empty but not null, which means query no partitions
64+
if (relatedPartitions != null && relatedPartitions.size() == 0) {
65+
return res;
66+
}
67+
Set<String> mtmvNeedComparePartitions = null;
5868
MTMVRefreshContext refreshContext = null;
5969
// check gracePeriod
6070
long gracePeriodMills = mtmv.getGracePeriod();
@@ -73,6 +83,14 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
7383
return res;
7484
}
7585
}
86+
if (mtmvNeedComparePartitions == null) {
87+
mtmvNeedComparePartitions = getMtmvPartitionsByRelatedPartitions(mtmv, refreshContext,
88+
relatedPartitions);
89+
}
90+
// if the partition which query not used, should not compare partition version
91+
if (!mtmvNeedComparePartitions.contains(partition.getName())) {
92+
continue;
93+
}
7694
try {
7795
if (MTMVPartitionUtil.isMTMVPartitionSync(refreshContext, partition.getName(),
7896
mtmvRelation.getBaseTablesOneLevel(),
@@ -86,4 +104,29 @@ public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, Conne
86104
}
87105
return res;
88106
}
107+
108+
private static Set<String> getMtmvPartitionsByRelatedPartitions(MTMV mtmv, MTMVRefreshContext refreshContext,
109+
Set<String> relatedPartitions) {
110+
// if relatedPartitions is null, which means QueryPartitionCollector visitLogicalCatalogRelation can not
111+
// get query used partitions, should get all mtmv partitions
112+
if (relatedPartitions == null) {
113+
return mtmv.getPartitionNames();
114+
}
115+
Set<String> res = Sets.newHashSet();
116+
Map<String, String> relatedToMv = getRelatedToMv(refreshContext.getPartitionMappings());
117+
for (String relatedPartition : relatedPartitions) {
118+
res.add(relatedToMv.get(relatedPartition));
119+
}
120+
return res;
121+
}
122+
123+
private static Map<String, String> getRelatedToMv(Map<String, Set<String>> mvToRelated) {
124+
Map<String, String> res = Maps.newHashMap();
125+
for (Entry<String, Set<String>> entry : mvToRelated.entrySet()) {
126+
for (String relatedPartition : entry.getValue()) {
127+
res.put(relatedPartition, entry.getKey());
128+
}
129+
}
130+
return res;
131+
}
89132
}

fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.doris.nereids.jobs.JobContext;
2525
import org.apache.doris.nereids.jobs.executor.Analyzer;
2626
import org.apache.doris.nereids.jobs.executor.TableCollectAndHookInitializer;
27+
import org.apache.doris.nereids.jobs.executor.TablePartitionCollector;
2728
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
2829
import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
2930
import org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob.RootRewriteJobContext;
@@ -228,6 +229,10 @@ public TableCollectAndHookInitializer newTableCollector() {
228229
return new TableCollectAndHookInitializer(this);
229230
}
230231

232+
public TablePartitionCollector newTablePartitionCollector() {
233+
return new TablePartitionCollector(this);
234+
}
235+
231236
public Analyzer newAnalyzer() {
232237
return new Analyzer(this);
233238
}

fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,22 @@ protected void collectAndLockTable(boolean showPlanProcess) {
344344
}
345345
}
346346

347-
private void analyze(boolean showPlanProcess) {
347+
protected void collectTableUsedPartitions(boolean showPlanProcess) {
348+
if (LOG.isDebugEnabled()) {
349+
LOG.debug("Start to collect table used partition");
350+
}
351+
keepOrShowPlanProcess(showPlanProcess, () -> cascadesContext.newTablePartitionCollector().execute());
352+
NereidsTracer.logImportantTime("EndCollectTablePartitions");
353+
if (LOG.isDebugEnabled()) {
354+
LOG.debug("Start to collect table used partition");
355+
}
356+
if (statementContext.getConnectContext().getExecutor() != null) {
357+
statementContext.getConnectContext().getExecutor().getSummaryProfile()
358+
.setNereidsCollectTablePartitionFinishTime();
359+
}
360+
}
361+
362+
protected void analyze(boolean showPlanProcess) {
348363
if (LOG.isDebugEnabled()) {
349364
LOG.debug("Start analyze plan");
350365
}
@@ -376,6 +391,10 @@ private void rewrite(boolean showPlanProcess) {
376391
if (statementContext.getConnectContext().getExecutor() != null) {
377392
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
378393
}
394+
// collect partitions table used, this is for query rewrite by materialized view
395+
// this is needed before init hook
396+
collectTableUsedPartitions(showPlanProcess);
397+
cascadesContext.getStatementContext().getPlannerHooks().forEach(hook -> hook.afterRewrite(this));
379398
}
380399

381400
// DependsRules: EnsureProjectOnTopJoin.class

fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,16 @@ default void beforeAnalyze(NereidsPlanner planner) {
3535
*/
3636
default void afterAnalyze(NereidsPlanner planner) {
3737
}
38+
39+
/**
40+
* the hook before rewrite
41+
*/
42+
default void beforeRewrite(NereidsPlanner planner) {
43+
}
44+
45+
/**
46+
* the hook after rewrite
47+
*/
48+
default void afterRewrite(NereidsPlanner planner) {
49+
}
3850
}

fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.analysis.StatementBase;
2121
import org.apache.doris.catalog.Column;
22+
import org.apache.doris.catalog.Partition;
2223
import org.apache.doris.catalog.TableIf;
2324
import org.apache.doris.catalog.View;
2425
import org.apache.doris.catalog.constraint.TableIdentifier;
@@ -29,6 +30,7 @@
2930
import org.apache.doris.datasource.mvcc.MvccSnapshot;
3031
import org.apache.doris.datasource.mvcc.MvccTable;
3132
import org.apache.doris.datasource.mvcc.MvccTableInfo;
33+
import org.apache.doris.mtmv.BaseTableInfo;
3234
import org.apache.doris.nereids.exceptions.AnalysisException;
3335
import org.apache.doris.nereids.hint.Hint;
3436
import org.apache.doris.nereids.memo.Group;
@@ -60,6 +62,7 @@
6062
import com.google.common.base.Supplier;
6163
import com.google.common.base.Suppliers;
6264
import com.google.common.base.Throwables;
65+
import com.google.common.collect.HashMultimap;
6366
import com.google.common.collect.ImmutableList;
6467
import com.google.common.collect.Maps;
6568
import com.google.common.collect.Multimap;
@@ -106,6 +109,7 @@ public enum TableFrom {
106109
private ConnectContext connectContext;
107110

108111
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
112+
private final Stopwatch materializedViewStopwatch = Stopwatch.createUnstarted();
109113

110114
@GuardedBy("this")
111115
private final Map<String, Supplier<Object>> contextCacheMap = Maps.newLinkedHashMap();
@@ -175,7 +179,14 @@ public enum TableFrom {
175179

176180
// tables in this query directly
177181
private final Map<List<String>, TableIf> tables = Maps.newHashMap();
178-
// tables maybe used by mtmv rewritten in this query
182+
// tables maybe used by mtmv rewritten in this query,
183+
// this contains mvs which use table in tables and the tables in mvs
184+
// such as
185+
// mv1 use t1, t2.
186+
// mv2 use mv1, t3, t4.
187+
// mv3 use t3, t4, t5
188+
// if query is: select * from t2 join t5
189+
// mtmvRelatedTables is mv1, mv2, mv3, t1, t2, t3, t4, t5
179190
private final Map<List<String>, TableIf> mtmvRelatedTables = Maps.newHashMap();
180191
// insert into target tables
181192
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
@@ -211,6 +222,16 @@ public enum TableFrom {
211222

212223
private boolean privChecked;
213224

225+
// if greater than 0 means the duration has used
226+
private long materializedViewRewriteDuration = 0L;
227+
228+
// Record used table and it's used partitions
229+
private final Multimap<List<String>, Pair<RelationId, Set<String>>> tableUsedPartitionNameMap =
230+
HashMultimap.create();
231+
232+
// Record mtmv and valid partitions map because this is time-consuming behavior
233+
private final Map<BaseTableInfo, Collection<Partition>> mvCanRewritePartitionsMap = new HashMap<>();
234+
214235
public StatementContext() {
215236
this(ConnectContext.get(), null, 0);
216237
}
@@ -339,6 +360,26 @@ public Stopwatch getStopwatch() {
339360
return stopwatch;
340361
}
341362

363+
public Stopwatch getMaterializedViewStopwatch() {
364+
return materializedViewStopwatch;
365+
}
366+
367+
public long getMaterializedViewRewriteDuration() {
368+
return materializedViewRewriteDuration;
369+
}
370+
371+
public void addMaterializedViewRewriteDuration(long millisecond) {
372+
materializedViewRewriteDuration += millisecond;
373+
}
374+
375+
public Multimap<List<String>, Pair<RelationId, Set<String>>> getTableUsedPartitionNameMap() {
376+
return tableUsedPartitionNameMap;
377+
}
378+
379+
public Map<BaseTableInfo, Collection<Partition>> getMvCanRewritePartitionsMap() {
380+
return mvCanRewritePartitionsMap;
381+
}
382+
342383
public void setMaxNAryInnerJoin(int maxNAryInnerJoin) {
343384
if (maxNAryInnerJoin > this.maxNAryInnerJoin) {
344385
this.maxNAryInnerJoin = maxNAryInnerJoin;

0 commit comments

Comments
 (0)