Skip to content

Commit af68d8b

Browse files
authored
[feat](profile) support getting query progress (#51400)
### What problem does this PR solve? Followup #50791 Add a new FE HTTP API: `/rest/v2/manager/query/statistics/trace_id`. This API will return the query runtime statistic corresponding to a given trace id. The query statistics includes info such as real-time scan rows/bytes. Internally, Doris will get query id by trace id from all Frontends, and then fetch query statistics from BE. Use pattern: 1. User set custom trace id by: `set session_context="trace_id:my_trace_id"` 2. User executes a query in same session 3. Start a http client to get query statistics in real-time during the query process. ![progress](https://github.com/user-attachments/assets/0a697c7d-d87a-4e9c-8965-c5a2d7d7836e) Also fix a bug in `CoordinatorContext.java`, to get real host. introduced from #41730 This PR also change the column name of `information_schema.processlist` table, to be same as column name in `show processlist`.
1 parent 279fa92 commit af68d8b

23 files changed

Lines changed: 438 additions & 119 deletions

be/src/exec/schema_scanner/schema_processlist_scanner.cpp

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@ namespace doris {
3333
#include "common/compile_check_begin.h"
3434

3535
std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = {
36-
{"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
37-
{"ID", TYPE_LARGEINT, sizeof(int128_t), false},
38-
{"USER", TYPE_VARCHAR, sizeof(StringRef), false},
39-
{"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
40-
{"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false},
41-
{"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
42-
{"DB", TYPE_VARCHAR, sizeof(StringRef), false},
43-
{"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
44-
{"TIME", TYPE_INT, sizeof(int32_t), false},
45-
{"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
46-
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
47-
{"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
48-
{"FE", TYPE_VARCHAR, sizeof(StringRef), false},
49-
{"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}};
36+
{"CurrentConnected", TYPE_VARCHAR, sizeof(StringRef), false}, // 0
37+
{"Id", TYPE_LARGEINT, sizeof(int128_t), false}, // 1
38+
{"User", TYPE_VARCHAR, sizeof(StringRef), false}, // 2
39+
{"Host", TYPE_VARCHAR, sizeof(StringRef), false}, // 3
40+
{"LoginTime", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, // 4
41+
{"Catalog", TYPE_VARCHAR, sizeof(StringRef), false}, // 5
42+
{"Db", TYPE_VARCHAR, sizeof(StringRef), false}, // 6
43+
{"Command", TYPE_VARCHAR, sizeof(StringRef), false}, // 7
44+
{"Time", TYPE_INT, sizeof(int32_t), false}, // 8
45+
{"State", TYPE_VARCHAR, sizeof(StringRef), false}, // 9
46+
{"QueryId", TYPE_VARCHAR, sizeof(StringRef), false}, // 10
47+
{"TraceId", TYPE_VARCHAR, sizeof(StringRef), false}, // 11
48+
{"Info", TYPE_VARCHAR, sizeof(StringRef), false}, // 12
49+
{"FE", TYPE_VARCHAR, sizeof(StringRef), false}, // 13
50+
{"CloudCluster", TYPE_VARCHAR, sizeof(StringRef), false}}; // 14
5051

5152
SchemaProcessListScanner::SchemaProcessListScanner()
5253
: SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {}
@@ -62,6 +63,16 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
6263
TShowProcessListResult tmp_ret;
6364
RETURN_IF_ERROR(
6465
SchemaHelper::show_process_list(fe_addr.hostname, fe_addr.port, request, &tmp_ret));
66+
67+
// Check and adjust the number of columns in each row to ensure 15 columns
68+
// This is compatible with newly added column "trace id". #51400
69+
for (auto& row : tmp_ret.process_list) {
70+
if (row.size() == 14) {
71+
// Insert an empty string at position 11 (index 11) for the TRACE_ID column
72+
row.insert(row.begin() + 11, "");
73+
}
74+
}
75+
6576
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
6677
tmp_ret.process_list.begin(),
6778
tmp_ret.process_list.end());

be/src/runtime/fragment_mgr.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,4 +1382,13 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
13821382
return Status::OK();
13831383
}
13841384

1385+
Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) {
1386+
if (query_stats == nullptr) {
1387+
return Status::InvalidArgument("query_stats is nullptr");
1388+
}
1389+
1390+
return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
1391+
print_id(query_id), query_stats);
1392+
}
1393+
13851394
} // namespace doris

be/src/runtime/fragment_mgr.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ class FragmentMgr : public RestMonitorIface {
184184

185185
Status get_realtime_exec_status(const TUniqueId& query_id,
186186
TReportExecStatusParams* exec_status);
187+
// get the query statistics of with a given query id
188+
Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats);
187189

188190
std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
189191

be/src/runtime/runtime_query_statistics_mgr.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,19 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
511511
}
512512
}
513513

514+
Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& query_id,
515+
TQueryStatistics* query_stats) {
516+
std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
517+
518+
auto resource_ctx = _resource_contexts_map.find(query_id);
519+
if (resource_ctx == _resource_contexts_map.end()) {
520+
return Status::InternalError("failed to find query with id {}", query_id);
521+
}
522+
523+
resource_ctx->second->to_thrift_query_statistics(query_stats);
524+
return Status::OK();
525+
}
526+
514527
void RuntimeQueryStatisticsMgr::get_tasks_resource_context(
515528
std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) {
516529
std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);

be/src/runtime/runtime_query_statistics_mgr.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class RuntimeQueryStatisticsMgr {
5555

5656
// used for backend_active_tasks
5757
void get_active_be_tasks_block(vectorized::Block* block);
58+
Status get_query_statistics(const std::string& query_id, TQueryStatistics* query_stats);
5859

5960
// used for MemoryReclamation
6061
void get_tasks_resource_context(std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs);
@@ -95,4 +96,4 @@ class RuntimeQueryStatisticsMgr {
9596
std::unique_ptr<ThreadPool> _thread_pool;
9697
};
9798

98-
} // namespace doris
99+
} // namespace doris

be/src/service/backend_service.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,19 +1321,28 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse
13211321

13221322
std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
13231323
std::make_unique<TReportExecStatusParams>();
1324-
Status st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
1325-
request.id, report_exec_status_params.get());
1324+
std::unique_ptr<TQueryStatistics> query_stats = std::make_unique<TQueryStatistics>();
13261325

1327-
if (!st.ok()) {
1328-
response.__set_status(st.to_thrift());
1329-
return;
1326+
std::string req_type = request.__isset.req_type ? request.req_type : "profile";
1327+
Status st;
1328+
if (req_type == "stats") {
1329+
st = ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id,
1330+
query_stats.get());
1331+
if (st.ok()) {
1332+
response.__set_query_stats(*query_stats);
1333+
}
1334+
} else {
1335+
// default is "profile"
1336+
st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
1337+
request.id, report_exec_status_params.get());
1338+
if (st.ok()) {
1339+
response.__set_report_exec_status_params(*report_exec_status_params);
1340+
}
13301341
}
13311342

13321343
report_exec_status_params->__set_query_id(TUniqueId());
13331344
report_exec_status_params->__set_done(false);
1334-
1335-
response.__set_status(Status::OK().to_thrift());
1336-
response.__set_report_exec_status_params(*report_exec_status_params);
1345+
response.__set_status(st.to_thrift());
13371346
}
13381347

13391348
void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result,

fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -523,22 +523,24 @@ public class SchemaTable extends Table {
523523
.column("IS_MUTABLE", ScalarType.createType(PrimitiveType.BOOLEAN))
524524
.build()))
525525
.put("processlist",
526+
// ATTN, the column name should be compatible with MySQL
527+
// See: https://dev.mysql.com/doc/refman/8.4/en/show-processlist.html
526528
new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA,
527-
builder().column("CURRENT_CONNECTED", ScalarType.createVarchar(16))
528-
.column("ID", ScalarType.createType(PrimitiveType.LARGEINT))
529-
.column("USER", ScalarType.createVarchar(32))
530-
.column("HOST", ScalarType.createVarchar(261))
531-
.column("LOGIN_TIME", ScalarType.createType(PrimitiveType.DATETIMEV2))
532-
.column("CATALOG", ScalarType.createVarchar(64))
533-
.column("DB", ScalarType.createVarchar(64))
534-
.column("COMMAND", ScalarType.createVarchar(16))
535-
.column("TIME", ScalarType.createType(PrimitiveType.INT))
536-
.column("STATE", ScalarType.createVarchar(64))
537-
.column("QUERY_ID", ScalarType.createVarchar(256))
538-
.column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
539-
.column("FE",
540-
ScalarType.createVarchar(64))
541-
.column("CLOUD_CLUSTER", ScalarType.createVarchar(64)).build(), true))
529+
builder().column("CurrentConnected", ScalarType.createVarchar(16))
530+
.column("Id", ScalarType.createType(PrimitiveType.LARGEINT))
531+
.column("User", ScalarType.createVarchar(32))
532+
.column("Host", ScalarType.createVarchar(261))
533+
.column("LoginTime", ScalarType.createType(PrimitiveType.DATETIMEV2))
534+
.column("Catalog", ScalarType.createVarchar(64))
535+
.column("Db", ScalarType.createVarchar(64))
536+
.column("Command", ScalarType.createVarchar(16))
537+
.column("Time", ScalarType.createType(PrimitiveType.INT))
538+
.column("State", ScalarType.createVarchar(64))
539+
.column("QueryId", ScalarType.createVarchar(256))
540+
.column("TraceId", ScalarType.createVarchar(256))
541+
.column("Info", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
542+
.column("FE", ScalarType.createVarchar(64))
543+
.column("CloudCluster", ScalarType.createVarchar(64)).build(), true))
542544
.put("workload_policy",
543545
new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA,
544546
builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT))

fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
3333
import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
3434
import org.apache.doris.thrift.TNetworkAddress;
35+
import org.apache.doris.thrift.TQueryStatistics;
3536
import org.apache.doris.thrift.TStatusCode;
3637
import org.apache.doris.thrift.TUniqueId;
3738

39+
import com.google.common.base.Preconditions;
3840
import com.google.common.base.Strings;
3941
import com.google.common.collect.Lists;
4042
import com.google.common.collect.Maps;
@@ -253,7 +255,7 @@ public List<List<String>> getQueryInfoByColumnNameList(List<String> columnNameLi
253255
}
254256

255257
private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
256-
TUniqueId queryID, TNetworkAddress targetBackend) {
258+
TUniqueId queryID, String reqType, TNetworkAddress targetBackend) {
257259
TGetRealtimeExecStatusResponse resp = null;
258260
BackendService.Client client = null;
259261

@@ -268,6 +270,7 @@ private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
268270
try {
269271
TGetRealtimeExecStatusRequest req = new TGetRealtimeExecStatusRequest();
270272
req.setId(queryID);
273+
req.setReqType(reqType);
271274
resp = client.getRealtimeExecStatus(req);
272275
} catch (TException e) {
273276
LOG.warn("Got exception when getRealtimeExecStatus, query {} backend {}",
@@ -293,16 +296,16 @@ private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
293296
return null;
294297
}
295298

296-
if (!resp.isSetReportExecStatusParams()) {
297-
LOG.warn("Invalid GetRealtimeExecStatusResponse, query {}",
299+
if (!resp.isSetReportExecStatusParams() && !resp.isSetQueryStats()) {
300+
LOG.warn("Invalid GetRealtimeExecStatusResponse, missing both exec status and query stats. query {}",
298301
DebugUtil.printId(queryID));
299302
return null;
300303
}
301304

302305
return resp;
303306
}
304307

305-
private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id) {
308+
private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id, String reqType) {
306309
// For query, id is queryId, for load, id is LoadLoadingTaskId
307310
class QueryIdAndAddress {
308311
public TUniqueId id;
@@ -365,18 +368,66 @@ class QueryIdAndAddress {
365368
}
366369

367370
for (QueryIdAndAddress idAndAddress : involvedBackends) {
368-
Callable<TGetRealtimeExecStatusResponse> task = () -> {
369-
return getRealtimeQueryProfile(idAndAddress.id, idAndAddress.beAddress);
370-
};
371+
Callable<TGetRealtimeExecStatusResponse> task = () -> getRealtimeQueryProfile(idAndAddress.id,
372+
reqType, idAndAddress.beAddress);
371373
Future<TGetRealtimeExecStatusResponse> future = fetchRealTimeProfileExecutor.submit(task);
372374
futures.add(future);
373375
}
374376

375377
return futures;
376378
}
377379

380+
public TQueryStatistics getQueryStatistic(String queryId) throws Exception {
381+
List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(queryId,
382+
"stats");
383+
List<TQueryStatistics> queryStatisticsList = Lists.newArrayList();
384+
for (Future<TGetRealtimeExecStatusResponse> future : futures) {
385+
try {
386+
TGetRealtimeExecStatusResponse resp = future.get(5, TimeUnit.SECONDS);
387+
if (resp != null && resp.getStatus().status_code == TStatusCode.OK && resp.isSetQueryStats()) {
388+
queryStatisticsList.add(resp.getQueryStats());
389+
} else {
390+
LOG.warn("Failed to get real-time query stats, id {}, resp is {}",
391+
queryId, resp == null ? "null" : resp.toString());
392+
throw new Exception("Failed to get realtime query stats: " + resp.toString());
393+
}
394+
} catch (Exception e) {
395+
LOG.warn("Failed to get real-time query stats, id {}, error: {}", queryId, e.getMessage(), e);
396+
throw new Exception("Failed to get realtime query stats: " + e.getMessage());
397+
}
398+
}
399+
Preconditions.checkState(!queryStatisticsList.isEmpty() && queryStatisticsList.size() == futures.size(),
400+
String.format("Failed to get real-time stats, id %s, "
401+
+ "queryStatisticsList size %d != futures size %d",
402+
queryId, queryStatisticsList.size(), futures.size()));
403+
404+
TQueryStatistics summary = new TQueryStatistics();
405+
for (TQueryStatistics queryStats : queryStatisticsList) {
406+
// sum all the statistics
407+
summary.setScanRows(summary.getScanRows() + queryStats.getScanRows());
408+
summary.setScanBytes(summary.getScanBytes() + queryStats.getScanBytes());
409+
summary.setReturnedRows(summary.getReturnedRows() + queryStats.getReturnedRows());
410+
summary.setCpuMs(summary.getCpuMs() + queryStats.getCpuMs());
411+
summary.setMaxPeakMemoryBytes(Math.max(summary.getMaxPeakMemoryBytes(),
412+
queryStats.getMaxPeakMemoryBytes()));
413+
summary.setCurrentUsedMemoryBytes(Math.max(summary.getCurrentUsedMemoryBytes(),
414+
queryStats.getCurrentUsedMemoryBytes()));
415+
summary.setShuffleSendBytes(summary.getShuffleSendBytes() + queryStats.getShuffleSendBytes());
416+
summary.setShuffleSendRows(summary.getShuffleSendRows() + queryStats.getShuffleSendRows());
417+
summary.setScanBytesFromLocalStorage(
418+
summary.getScanBytesFromLocalStorage() + queryStats.getScanBytesFromLocalStorage());
419+
summary.setScanBytesFromRemoteStorage(
420+
summary.getScanBytesFromRemoteStorage() + queryStats.getScanBytesFromRemoteStorage());
421+
summary.setSpillWriteBytesToLocalStorage(
422+
summary.getSpillWriteBytesToLocalStorage() + queryStats.getSpillWriteBytesToLocalStorage());
423+
summary.setSpillReadBytesFromLocalStorage(
424+
summary.getSpillReadBytesFromLocalStorage() + queryStats.getSpillReadBytesFromLocalStorage());
425+
}
426+
return summary;
427+
}
428+
378429
public String getProfile(String id) {
379-
List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id);
430+
List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id, "profile");
380431
// beAddr of reportExecStatus of QeProcessorImpl is meaningless, so assign a dummy address
381432
// to avoid compile failing.
382433
TNetworkAddress dummyAddr = new TNetworkAddress();
@@ -1057,3 +1108,4 @@ public void removeProfile(String profileId) {
10571108
}
10581109
}
10591110
}
1111+

fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.doris.httpv2.controller;
1919

2020
import org.apache.doris.catalog.Env;
21+
import org.apache.doris.catalog.SchemaTable;
22+
import org.apache.doris.catalog.Table;
2123
import org.apache.doris.httpv2.entity.ResponseBody;
2224
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
2325
import org.apache.doris.httpv2.rest.RestBaseController;
@@ -56,20 +58,8 @@ public class SessionController extends RestBaseController {
5658
private static final Logger LOG = LogManager.getLogger(SessionController.class);
5759

5860
static {
59-
SESSION_TABLE_HEADER.add("CurrentConnected");
60-
SESSION_TABLE_HEADER.add("Id");
61-
SESSION_TABLE_HEADER.add("User");
62-
SESSION_TABLE_HEADER.add("Host");
63-
SESSION_TABLE_HEADER.add("LoginTime");
64-
SESSION_TABLE_HEADER.add("Catalog");
65-
SESSION_TABLE_HEADER.add("Db");
66-
SESSION_TABLE_HEADER.add("Command");
67-
SESSION_TABLE_HEADER.add("Time");
68-
SESSION_TABLE_HEADER.add("State");
69-
SESSION_TABLE_HEADER.add("QueryId");
70-
SESSION_TABLE_HEADER.add("Info");
71-
SESSION_TABLE_HEADER.add("FE");
72-
SESSION_TABLE_HEADER.add("CloudCluster");
61+
Table tbl = SchemaTable.TABLE_MAP.get("processlist");
62+
tbl.getBaseSchema().stream().forEach(column -> SESSION_TABLE_HEADER.add(column.getName()));
7363
}
7464

7565
@RequestMapping(path = "/session/all", method = RequestMethod.GET)

fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.httpv2.entity.ResponseBody;
2424
import org.apache.doris.persist.gson.GsonUtils;
2525
import org.apache.doris.system.Frontend;
26+
import org.apache.doris.system.SystemInfoService.HostInfo;
2627

2728
import com.google.common.base.Strings;
2829
import com.google.gson.reflect.TypeToken;
@@ -57,6 +58,11 @@ static List<Pair<String, Integer>> getFeList() {
5758
.collect(Collectors.toList());
5859
}
5960

61+
static boolean isCurrentFe(String ip, int port) {
62+
HostInfo hostInfo = Env.getCurrentEnv().getSelfNode();
63+
return hostInfo.isSame(new HostInfo(ip, port));
64+
}
65+
6066
static String concatUrl(Pair<String, Integer> ipPort, String path, Map<String, String> arguments) {
6167
StringBuilder url = new StringBuilder("http://")
6268
.append(ipPort.first).append(":").append(ipPort.second).append(path);

0 commit comments

Comments
 (0)