Skip to content

Commit 73d2802

Browse files
committed
[Enhancement](multi-catalog) Add PredicateFilterTime, DictFilterRewriteTime, LazyReadFilteredRows profile metrics in parquet orc profiles. (apache#51248)
[Enhancement] (multi-catalog) Add PredicateFilterTime, DictFilterRewriteTime, LazyReadFilteredRows profile metrics in parquet/orc profiles.
1 parent b2e70e9 commit 73d2802

6 files changed

Lines changed: 160 additions & 138 deletions

File tree

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 70 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ void OrcReader::_collect_profile_before_close() {
199199
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
200200
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
201201
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
202-
COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time);
202+
COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
203+
COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
204+
COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);
203205

204206
if (_file_input_stream != nullptr) {
205207
_file_input_stream->collect_profile_before_close();
@@ -233,8 +235,12 @@ void OrcReader::_init_profile() {
233235
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
234236
_orc_profile.decode_null_map_time =
235237
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
236-
_orc_profile.filter_block_time =
237-
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1);
238+
_orc_profile.predicate_filter_time =
239+
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", orc_profile, 1);
240+
_orc_profile.dict_filter_rewrite_time =
241+
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", orc_profile, 1);
242+
_orc_profile.lazy_read_filtered_rows =
243+
ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead", TUnit::UNIT, 1);
238244
}
239245
}
240246

@@ -1713,15 +1719,18 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
17131719
*read_rows = 0;
17141720
return Status::OK();
17151721
}
1716-
_execute_filter_position_delete_rowids(*_filter);
17171722
{
1718-
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
1719-
RETURN_IF_CATCH_EXCEPTION(
1720-
Block::filter_block_internal(block, columns_to_filter, *_filter));
1723+
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
1724+
_execute_filter_position_delete_rowids(*_filter);
1725+
{
1726+
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
1727+
RETURN_IF_CATCH_EXCEPTION(
1728+
Block::filter_block_internal(block, columns_to_filter, *_filter));
1729+
}
1730+
Block::erase_useless_column(block, column_to_keep);
1731+
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
1732+
*read_rows = block->rows();
17211733
}
1722-
Block::erase_useless_column(block, column_to_keep);
1723-
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
1724-
*read_rows = block->rows();
17251734
} else {
17261735
uint64_t rr;
17271736
SCOPED_RAW_TIMER(&_statistics.column_read_time);
@@ -1798,63 +1807,60 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
17981807
return Status::OK();
17991808
}
18001809

1801-
_build_delete_row_filter(block, _batch->numElements);
1802-
1803-
std::vector<uint32_t> columns_to_filter;
1804-
int column_to_keep = block->columns();
1805-
columns_to_filter.resize(column_to_keep);
1806-
for (uint32_t i = 0; i < column_to_keep; ++i) {
1807-
columns_to_filter[i] = i;
1808-
}
1809-
if (!_lazy_read_ctx.conjuncts.empty()) {
1810-
VExprContextSPtrs filter_conjuncts;
1811-
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
1812-
_filter_conjuncts.end());
1813-
for (auto& conjunct : _dict_filter_conjuncts) {
1814-
filter_conjuncts.emplace_back(conjunct);
1815-
}
1816-
for (auto& conjunct : _non_dict_filter_conjuncts) {
1817-
filter_conjuncts.emplace_back(conjunct);
1818-
}
1819-
std::vector<IColumn::Filter*> filters;
1820-
if (_delete_rows_filter_ptr) {
1821-
filters.push_back(_delete_rows_filter_ptr.get());
1822-
}
1823-
IColumn::Filter result_filter(block->rows(), 1);
1824-
bool can_filter_all = false;
1825-
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
1826-
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
1827-
if (can_filter_all) {
1828-
for (auto& col : columns_to_filter) {
1829-
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
1810+
{
1811+
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
1812+
_build_delete_row_filter(block, _batch->numElements);
1813+
1814+
std::vector<uint32_t> columns_to_filter;
1815+
int column_to_keep = block->columns();
1816+
columns_to_filter.resize(column_to_keep);
1817+
for (uint32_t i = 0; i < column_to_keep; ++i) {
1818+
columns_to_filter[i] = i;
1819+
}
1820+
if (!_lazy_read_ctx.conjuncts.empty()) {
1821+
VExprContextSPtrs filter_conjuncts;
1822+
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
1823+
_filter_conjuncts.end());
1824+
for (auto& conjunct : _dict_filter_conjuncts) {
1825+
filter_conjuncts.emplace_back(conjunct);
18301826
}
1831-
Block::erase_useless_column(block, column_to_keep);
1832-
return _convert_dict_cols_to_string_cols(block, &batch_vec);
1833-
}
1834-
_execute_filter_position_delete_rowids(result_filter);
1835-
{
1836-
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
1827+
for (auto& conjunct : _non_dict_filter_conjuncts) {
1828+
filter_conjuncts.emplace_back(conjunct);
1829+
}
1830+
std::vector<IColumn::Filter*> filters;
1831+
if (_delete_rows_filter_ptr) {
1832+
filters.push_back(_delete_rows_filter_ptr.get());
1833+
}
1834+
IColumn::Filter result_filter(block->rows(), 1);
1835+
bool can_filter_all = false;
1836+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
1837+
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
1838+
if (can_filter_all) {
1839+
for (auto& col : columns_to_filter) {
1840+
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
1841+
}
1842+
Block::erase_useless_column(block, column_to_keep);
1843+
return _convert_dict_cols_to_string_cols(block, &batch_vec);
1844+
}
1845+
_execute_filter_position_delete_rowids(result_filter);
18371846
RETURN_IF_CATCH_EXCEPTION(
18381847
Block::filter_block_internal(block, columns_to_filter, result_filter));
1839-
}
1840-
Block::erase_useless_column(block, column_to_keep);
1841-
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
1842-
} else {
1843-
if (_delete_rows_filter_ptr) {
1844-
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
1845-
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
1846-
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
1847-
(*_delete_rows_filter_ptr)));
1848+
Block::erase_useless_column(block, column_to_keep);
18481849
} else {
1849-
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
1850-
_execute_filter_position_delete_rowids(*filter);
1851-
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
1852-
RETURN_IF_CATCH_EXCEPTION(
1853-
Block::filter_block_internal(block, columns_to_filter, (*filter)));
1850+
if (_delete_rows_filter_ptr) {
1851+
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
1852+
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
1853+
block, columns_to_filter, (*_delete_rows_filter_ptr)));
1854+
} else {
1855+
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
1856+
_execute_filter_position_delete_rowids(*filter);
1857+
RETURN_IF_CATCH_EXCEPTION(
1858+
Block::filter_block_internal(block, columns_to_filter, (*filter)));
1859+
}
1860+
Block::erase_useless_column(block, column_to_keep);
18541861
}
1855-
Block::erase_useless_column(block, column_to_keep);
1856-
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
18571862
}
1863+
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
18581864
*read_rows = block->rows();
18591865
}
18601866
return Status::OK();
@@ -1898,6 +1904,7 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
18981904
}
18991905

19001906
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
1907+
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
19011908
Block* block = (Block*)arg;
19021909
size_t origin_column_num = block->columns();
19031910

@@ -1998,6 +2005,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
19982005
sel[new_size] = i;
19992006
new_size += result_filter_data[i] ? 1 : 0;
20002007
}
2008+
_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
20012009
data.numElements = new_size;
20022010
return Status::OK();
20032011
}
@@ -2071,6 +2079,7 @@ bool OrcReader::_can_filter_by_dict(int slot_id) {
20712079
Status OrcReader::on_string_dicts_loaded(
20722080
std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
20732081
bool* is_stripe_filtered) {
2082+
SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
20742083
*is_stripe_filtered = false;
20752084
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
20762085
std::string& dict_filter_col_name = it->first;

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ class OrcReader : public GenericReader {
128128
int64_t set_fill_column_time = 0;
129129
int64_t decode_value_time = 0;
130130
int64_t decode_null_map_time = 0;
131-
int64_t filter_block_time = 0;
131+
int64_t predicate_filter_time = 0;
132+
int64_t dict_filter_rewrite_time = 0;
133+
int64_t lazy_read_filtered_rows = 0;
132134
};
133135

134136
OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
@@ -227,6 +229,9 @@ class OrcReader : public GenericReader {
227229
RuntimeProfile::Counter* decode_value_time = nullptr;
228230
RuntimeProfile::Counter* decode_null_map_time = nullptr;
229231
RuntimeProfile::Counter* filter_block_time = nullptr;
232+
RuntimeProfile::Counter* predicate_filter_time = nullptr;
233+
RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
234+
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
230235
};
231236

232237
class ORCFilterImpl : public orc::ORCFilter {

0 commit comments

Comments
 (0)