Skip to content

Commit e18ea53

Browse files
authored
[improve](information schema) introduce routine load job system table (#48963)
### What problem does this PR solve? Part IV of #48511 doc apache/doris-website#2196 **Introduce routine load job statistic system table:** ``` mysql> show create table information_schema.routine_load_job\G *************************** 1. row *************************** Table: routine_load_job Create Table: CREATE TABLE `routine_load_job` ( `JOB_ID` text NULL, `JOB_NAME` text NULL, `CREATE_TIME` text NULL, `PAUSE_TIME` text NULL, `END_TIME` text NULL, `DB_NAME` text NULL, `TABLE_NAME` text NULL, `STATE` text NULL, `CURRENT_TASK_NUM` text NULL, `JOB_PROPERTIES` text NULL, `DATA_SOURCE_PROPERTIES` text NULL, `CUSTOM_PROPERTIES` text NULL, `STATISTIC` text NULL, `PROGRESS` text NULL, `LAG` text NULL, `REASON_OF_STATE_CHANGED` text NULL, `ERROR_LOG_URLS` text NULL, `USER_NAME` text NULL, `CURRENT_ABORT_TASK_NUM` int NULL, `IS_ABNORMAL_PAUSE` boolean NULL ) ENGINE=SCHEMA; 1 row in set (0.00 sec) ``` **There are some benefits to empower job with SQL query capability for statistical information:** - It can be used in conjunction with metrics add through #48209 to roughly locate abnormal jobs when Grafana alarms, and the following SQL can be used: ``` SELECT JOB_NAME FROM information_schema.routine_load_job_statistics WHERE CURRENT_ABORT_TASK_NUM > 0 OR IS_ABNORMAL_PAUSE = TRUE; ``` - User can use the `select * from information_schema.routine_load_job` instead of the `show routine load`. The advantage is that the `show routine load` can only be searched by name, but SQL can be very flexible in locating jobs
1 parent 5666884 commit e18ea53

16 files changed

Lines changed: 584 additions & 26 deletions

be/src/exec/schema_scanner.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "exec/schema_scanner/schema_partitions_scanner.h"
4141
#include "exec/schema_scanner/schema_processlist_scanner.h"
4242
#include "exec/schema_scanner/schema_profiling_scanner.h"
43+
#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
4344
#include "exec/schema_scanner/schema_routine_scanner.h"
4445
#include "exec/schema_scanner/schema_rowsets_scanner.h"
4546
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
@@ -228,6 +229,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
228229
return SchemaCatalogMetaCacheStatsScanner::create_unique();
229230
case TSchemaTableType::SCH_BACKEND_KERBEROS_TICKET_CACHE:
230231
return SchemaBackendKerberosTicketCacheScanner::create_unique();
232+
case TSchemaTableType::SCH_ROUTINE_LOAD_JOB:
233+
return SchemaRoutineLoadJobScanner::create_unique();
231234
default:
232235
return SchemaDummyScanner::create_unique();
233236
break;

be/src/exec/schema_scanner/schema_helper.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,13 @@ Status SchemaHelper::show_user(const std::string& ip, const int32_t port,
142142
});
143143
}
144144

145+
Status SchemaHelper::fetch_routine_load_job(const std::string& ip, const int32_t port,
146+
const TFetchRoutineLoadJobRequest& request,
147+
TFetchRoutineLoadJobResult* result) {
148+
return ThriftRpcHelper::rpc<FrontendServiceClient>(
149+
ip, port, [&request, &result](FrontendServiceConnection& client) {
150+
client->fetchRoutineLoadJob(*result, request);
151+
});
152+
}
153+
145154
} // namespace doris

be/src/exec/schema_scanner/schema_helper.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
namespace doris {
2727
class TDescribeTablesParams;
2828
class TDescribeTablesResult;
29+
class TFetchRoutineLoadJobRequest;
30+
class TFetchRoutineLoadJobResult;
2931
class TGetDbsParams;
3032
class TGetDbsResult;
3133
class TGetTablesParams;
@@ -84,6 +86,10 @@ class SchemaHelper {
8486
TShowProcessListResult* result);
8587
static Status show_user(const std::string& ip, const int32_t port,
8688
const TShowUserRequest& request, TShowUserResult* result);
89+
90+
static Status fetch_routine_load_job(const std::string& ip, const int32_t port,
91+
const TFetchRoutineLoadJobRequest& request,
92+
TFetchRoutineLoadJobResult* result);
8793
};
8894

8995
} // namespace doris
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
19+
20+
#include <gen_cpp/Descriptors_types.h>
21+
#include <gen_cpp/FrontendService_types.h>
22+
23+
#include <string>
24+
25+
#include "exec/schema_scanner/schema_helper.h"
26+
#include "runtime/runtime_state.h"
27+
#include "vec/common/string_ref.h"
28+
#include "vec/core/block.h"
29+
#include "vec/data_types/data_type_factory.hpp"
30+
31+
namespace doris {
32+
class RuntimeState;
33+
namespace vectorized {
34+
class Block;
35+
} // namespace vectorized
36+
37+
std::vector<SchemaScanner::ColumnDesc> SchemaRoutineLoadJobScanner::_s_tbls_columns = {
38+
// name, type, size, is_null
39+
{"JOB_ID", TYPE_STRING, sizeof(StringRef), true},
40+
{"JOB_NAME", TYPE_STRING, sizeof(StringRef), true},
41+
{"CREATE_TIME", TYPE_STRING, sizeof(StringRef), true},
42+
{"PAUSE_TIME", TYPE_STRING, sizeof(StringRef), true},
43+
{"END_TIME", TYPE_STRING, sizeof(StringRef), true},
44+
{"DB_NAME", TYPE_STRING, sizeof(StringRef), true},
45+
{"TABLE_NAME", TYPE_STRING, sizeof(StringRef), true},
46+
{"STATE", TYPE_STRING, sizeof(StringRef), true},
47+
{"CURRENT_TASK_NUM", TYPE_STRING, sizeof(StringRef), true},
48+
{"JOB_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
49+
{"DATA_SOURCE_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
50+
{"CUSTOM_PROPERTIES", TYPE_STRING, sizeof(StringRef), true},
51+
{"STATISTIC", TYPE_STRING, sizeof(StringRef), true},
52+
{"PROGRESS", TYPE_STRING, sizeof(StringRef), true},
53+
{"LAG", TYPE_STRING, sizeof(StringRef), true},
54+
{"REASON_OF_STATE_CHANGED", TYPE_STRING, sizeof(StringRef), true},
55+
{"ERROR_LOG_URLS", TYPE_STRING, sizeof(StringRef), true},
56+
{"USER_NAME", TYPE_STRING, sizeof(StringRef), true},
57+
{"CURRENT_ABORT_TASK_NUM", TYPE_INT, sizeof(int32_t), true},
58+
{"IS_ABNORMAL_PAUSE", TYPE_BOOLEAN, sizeof(int8_t), true},
59+
};
60+
61+
SchemaRoutineLoadJobScanner::SchemaRoutineLoadJobScanner()
62+
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_ROUTINE_LOAD_JOB) {}
63+
64+
SchemaRoutineLoadJobScanner::~SchemaRoutineLoadJobScanner() {}
65+
66+
Status SchemaRoutineLoadJobScanner::start(RuntimeState* state) {
67+
if (!_is_init) {
68+
return Status::InternalError("used before initialized.");
69+
}
70+
TFetchRoutineLoadJobRequest request;
71+
RETURN_IF_ERROR(SchemaHelper::fetch_routine_load_job(
72+
*(_param->common_param->ip), _param->common_param->port, request, &_result));
73+
return Status::OK();
74+
}
75+
76+
Status SchemaRoutineLoadJobScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
77+
if (!_is_init) {
78+
return Status::InternalError("call this before initial.");
79+
}
80+
if (block == nullptr || eos == nullptr) {
81+
return Status::InternalError("invalid parameter.");
82+
}
83+
84+
*eos = true;
85+
if (_result.routineLoadJobs.empty()) {
86+
return Status::OK();
87+
}
88+
89+
return _fill_block_impl(block);
90+
}
91+
92+
Status SchemaRoutineLoadJobScanner::_fill_block_impl(vectorized::Block* block) {
93+
SCOPED_TIMER(_fill_block_timer);
94+
95+
const auto& jobs_info = _result.routineLoadJobs;
96+
size_t row_num = jobs_info.size();
97+
if (row_num == 0) {
98+
return Status::OK();
99+
}
100+
101+
for (size_t col_idx = 0; col_idx < _s_tbls_columns.size(); ++col_idx) {
102+
const auto& col_desc = _s_tbls_columns[col_idx];
103+
104+
std::vector<StringRef> str_refs(row_num);
105+
std::vector<int32_t> int_vals(row_num);
106+
std::vector<int8_t> bool_vals(row_num);
107+
std::vector<void*> datas(row_num);
108+
std::vector<std::string> column_values(row_num);
109+
110+
for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
111+
const auto& job_info = jobs_info[row_idx];
112+
std::string& column_value = column_values[row_idx];
113+
114+
if (col_desc.type == TYPE_STRING) {
115+
switch (col_idx) {
116+
case 0: // JOB_ID
117+
column_value = job_info.__isset.job_id ? job_info.job_id : "";
118+
break;
119+
case 1: // JOB_NAME
120+
column_value = job_info.__isset.job_name ? job_info.job_name : "";
121+
break;
122+
case 2: // CREATE_TIME
123+
column_value = job_info.__isset.create_time ? job_info.create_time : "";
124+
break;
125+
case 3: // PAUSE_TIME
126+
column_value = job_info.__isset.pause_time ? job_info.pause_time : "";
127+
break;
128+
case 4: // END_TIME
129+
column_value = job_info.__isset.end_time ? job_info.end_time : "";
130+
break;
131+
case 5: // DB_NAME
132+
column_value = job_info.__isset.db_name ? job_info.db_name : "";
133+
break;
134+
case 6: // TABLE_NAME
135+
column_value = job_info.__isset.table_name ? job_info.table_name : "";
136+
break;
137+
case 7: // STATE
138+
column_value = job_info.__isset.state ? job_info.state : "";
139+
break;
140+
case 8: // CURRENT_TASK_NUM
141+
column_value =
142+
job_info.__isset.current_task_num ? job_info.current_task_num : "";
143+
break;
144+
case 9: // JOB_PROPERTIES
145+
column_value = job_info.__isset.job_properties ? job_info.job_properties : "";
146+
break;
147+
case 10: // DATA_SOURCE_PROPERTIES
148+
column_value = job_info.__isset.data_source_properties
149+
? job_info.data_source_properties
150+
: "";
151+
break;
152+
case 11: // CUSTOM_PROPERTIES
153+
column_value =
154+
job_info.__isset.custom_properties ? job_info.custom_properties : "";
155+
break;
156+
case 12: // STATISTIC
157+
column_value = job_info.__isset.statistic ? job_info.statistic : "";
158+
break;
159+
case 13: // PROGRESS
160+
column_value = job_info.__isset.progress ? job_info.progress : "";
161+
break;
162+
case 14: // LAG
163+
column_value = job_info.__isset.lag ? job_info.lag : "";
164+
break;
165+
case 15: // REASON_OF_STATE_CHANGED
166+
column_value = job_info.__isset.reason_of_state_changed
167+
? job_info.reason_of_state_changed
168+
: "";
169+
break;
170+
case 16: // ERROR_LOG_URLS
171+
column_value = job_info.__isset.error_log_urls ? job_info.error_log_urls : "";
172+
break;
173+
case 17: // USER_NAME
174+
column_value = job_info.__isset.user_name ? job_info.user_name : "";
175+
break;
176+
}
177+
178+
str_refs[row_idx] =
179+
StringRef(column_values[row_idx].data(), column_values[row_idx].size());
180+
datas[row_idx] = &str_refs[row_idx];
181+
} else if (col_desc.type == TYPE_INT) {
182+
int_vals[row_idx] = job_info.__isset.current_abort_task_num
183+
? job_info.current_abort_task_num
184+
: 0;
185+
datas[row_idx] = &int_vals[row_idx];
186+
} else if (col_desc.type == TYPE_BOOLEAN) {
187+
bool_vals[row_idx] =
188+
job_info.__isset.is_abnormal_pause ? job_info.is_abnormal_pause : false;
189+
datas[row_idx] = &bool_vals[row_idx];
190+
}
191+
}
192+
193+
RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas));
194+
}
195+
196+
return Status::OK();
197+
}
198+
199+
} // namespace doris
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <gen_cpp/FrontendService_types.h>
21+
22+
#include <vector>
23+
24+
#include "common/status.h"
25+
#include "exec/schema_scanner.h"
26+
27+
namespace doris {
28+
class RuntimeState;
29+
namespace vectorized {
30+
class Block;
31+
} // namespace vectorized
32+
33+
class SchemaRoutineLoadJobScanner : public SchemaScanner {
34+
ENABLE_FACTORY_CREATOR(SchemaRoutineLoadJobScanner);
35+
36+
public:
37+
SchemaRoutineLoadJobScanner();
38+
~SchemaRoutineLoadJobScanner() override;
39+
40+
Status start(RuntimeState* state) override;
41+
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
42+
43+
private:
44+
Status _fill_block_impl(vectorized::Block* block);
45+
46+
TFetchRoutineLoadJobResult _result;
47+
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
48+
};
49+
50+
} // namespace doris

fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public enum SchemaTableType {
9090
SCH_CATALOG_META_CACHE_STATISTICS("CATALOG_META_CACHE_STATISTICS", "CATALOG_META_CACHE_STATISTICS",
9191
TSchemaTableType.SCH_CATALOG_META_CACHE_STATISTICS),
9292
SCH_BACKEND_KERBEROS_TICKET_CACHE("BACKEND_KERBEROS_TICKET_CACHE", "BACKEND_KERBEROS_TICKET_CACHE",
93-
TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE);
93+
TSchemaTableType.SCH_BACKEND_KERBEROS_TICKET_CACHE),
94+
SCH_ROUTINE_LOAD_JOB("ROUTINE_LOAD_JOB", "ROUTINE_LOAD_JOB",
95+
TSchemaTableType.SCH_ROUTINE_LOAD_JOB);
9496

9597
private static final String dbName = "INFORMATION_SCHEMA";
9698
private static SelectList fullSelectLists;

fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,30 @@ public class SchemaTable extends Table {
597597
.column("REFRESH_INTERVAL_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
598598
.build())
599599
)
600+
.put("routine_load_job",
601+
new SchemaTable(SystemIdGenerator.getNextId(), "routine_load_job", TableType.SCHEMA,
602+
builder().column("JOB_ID", ScalarType.createStringType())
603+
.column("JOB_NAME", ScalarType.createStringType())
604+
.column("CREATE_TIME", ScalarType.createStringType())
605+
.column("PAUSE_TIME", ScalarType.createStringType())
606+
.column("END_TIME", ScalarType.createStringType())
607+
.column("DB_NAME", ScalarType.createStringType())
608+
.column("TABLE_NAME", ScalarType.createStringType())
609+
.column("STATE", ScalarType.createStringType())
610+
.column("CURRENT_TASK_NUM", ScalarType.createStringType())
611+
.column("JOB_PROPERTIES", ScalarType.createStringType())
612+
.column("DATA_SOURCE_PROPERTIES", ScalarType.createStringType())
613+
.column("CUSTOM_PROPERTIES", ScalarType.createStringType())
614+
.column("STATISTIC", ScalarType.createStringType())
615+
.column("PROGRESS", ScalarType.createStringType())
616+
.column("LAG", ScalarType.createStringType())
617+
.column("REASON_OF_STATE_CHANGED", ScalarType.createStringType())
618+
.column("ERROR_LOG_URLS", ScalarType.createStringType())
619+
.column("USER_NAME", ScalarType.createStringType())
620+
.column("CURRENT_ABORT_TASK_NUM", ScalarType.createType(PrimitiveType.INT))
621+
.column("IS_ABNORMAL_PAUSE", ScalarType.createType(PrimitiveType.BOOLEAN))
622+
.build())
623+
)
600624
.build();
601625

602626
private boolean fetchAllFe = false;

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ protected boolean needAutoResume() {
474474
}
475475

476476
@Override
477-
protected String getStatistic() {
477+
public String getStatistic() {
478478
Map<String, Object> summary = this.jobStatistic.summary();
479479
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
480480
return gson.toJson(summary);
@@ -635,7 +635,7 @@ private void setCustomKafkaProperties(Map<String, String> kafkaProperties) {
635635
}
636636

637637
@Override
638-
protected String dataSourcePropertiesJsonToString() {
638+
public String dataSourcePropertiesJsonToString() {
639639
Map<String, String> dataSourceProperties = Maps.newHashMap();
640640
dataSourceProperties.put("brokerList", brokerList);
641641
dataSourceProperties.put("topic", topic);
@@ -647,21 +647,21 @@ protected String dataSourcePropertiesJsonToString() {
647647
}
648648

649649
@Override
650-
protected String customPropertiesJsonToString() {
650+
public String customPropertiesJsonToString() {
651651
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
652652
return gson.toJson(customProperties);
653653
}
654654

655655
@Override
656-
protected Map<String, String> getDataSourceProperties() {
656+
public Map<String, String> getDataSourceProperties() {
657657
Map<String, String> dataSourceProperties = Maps.newHashMap();
658658
dataSourceProperties.put("kafka_broker_list", brokerList);
659659
dataSourceProperties.put("kafka_topic", topic);
660660
return dataSourceProperties;
661661
}
662662

663663
@Override
664-
protected Map<String, String> getCustomProperties() {
664+
public Map<String, String> getCustomProperties() {
665665
Map<String, String> ret = new HashMap<>();
666666
customProperties.forEach((k, v) -> ret.put("property." + k, v));
667667
return ret;
@@ -910,7 +910,7 @@ public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdT
910910
}
911911

912912
@Override
913-
protected String getLag() {
913+
public String getLag() {
914914
Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) progress).getLag(cachedPartitionWithLatestOffsets);
915915
Gson gson = new Gson();
916916
return gson.toJson(partitionIdToOffsetLag);

0 commit comments

Comments
 (0)