Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tici
Submodule tici updated from ee7809 to db0a40
9 changes: 6 additions & 3 deletions contrib/tici-search-lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/cxxbridge)

add_custom_command(
OUTPUT ${TICI_LIB}
COMMAND cargo build -p tici-search-lib --release --target-dir ${CMAKE_CURRENT_BINARY_DIR} --manifest-path ${TICI_PROJECT_DIR}/Cargo.toml
COMMAND ${CMAKE_COMMAND} -E env "CC=gcc" cargo build -p tici-search-lib --release --target-dir ${CMAKE_CURRENT_BINARY_DIR} --manifest-path ${TICI_PROJECT_DIR}/Cargo.toml
WORKING_DIRECTORY ${TICI_PROJECT_DIR}
DEPENDS ${LIB_SOURCE_FILES}
COMMENT "Build Rust lib"${CMAKE_CURRENT_BINARY_DIR}
Expand All @@ -26,13 +26,16 @@ target_include_directories(tici_search_lib_static INTERFACE
add_library(tici_search_lib SHARED "${TiFlash_SOURCE_DIR}/libs/libclara-cmake/dummy.cpp")
target_compile_options(tici_search_lib PRIVATE -pthread)
target_link_options(tici_search_lib PRIVATE -pthread)
target_link_libraries(tici_search_lib PRIVATE "$<LINK_LIBRARY:WHOLE_ARCHIVE,tici_search_lib_static>")
if(APPLE)
target_link_libraries(tici_search_lib PRIVATE
target_link_options(tici_search_lib PRIVATE "LINKER:-force_load,${TICI_LIB}")
target_link_libraries(tici_search_lib PRIVATE tici_search_lib_static
"-framework Security"
"-framework CoreFoundation"
"-framework IOKit"
)
else()
target_link_libraries(tici_search_lib PRIVATE
-Wl,--whole-archive tici_search_lib_static -Wl,--no-whole-archive)
endif()

target_include_directories(tici_search_lib INTERFACE
Expand Down
2 changes: 1 addition & 1 deletion contrib/tipb
30 changes: 24 additions & 6 deletions dbms/src/Flash/Coprocessor/TiCIScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,40 @@
#include <cassert>
namespace DB
{

static TiCIQueryMode resolveQueryMode(const tipb::IndexScan & idx_scan)
{
if (idx_scan.has_tici_vector_query_info())
return TiCIQueryMode::Vector;
RUNTIME_CHECK_MSG(idx_scan.has_fts_query_info(), "IndexScan must have either fts_query_info or tici_vector_query_info");
return TiCIQueryMode::FTS;
}

TiCIScan::TiCIScan(const tipb::Executor * tici_scan_, const String & executor_id_, const DAGContext & dag_context)
: tici_scan(tici_scan_)
, executor_id(executor_id_)
, keyspace_id(dag_context.getKeyspaceID())
, table_id(tici_scan->idx_scan().table_id())
, index_id(tici_scan->idx_scan().index_id())
, return_columns(TiDB::toTiDBColumnInfos(tici_scan->idx_scan().columns()))
, query_type(tici_scan->idx_scan().fts_query_info().query_type())
, query_mode(resolveQueryMode(tici_scan->idx_scan()))
, shard_infos(dag_context.query_shard_infos.getTableShardInfosByExecutorID(tici_scan_->executor_id()))
, limit(tici_scan->idx_scan().fts_query_info().top_k())
, limit(
query_mode == TiCIQueryMode::Vector
? tici_scan->idx_scan().tici_vector_query_info().top_k()
: tici_scan->idx_scan().fts_query_info().top_k())
, sort_column_ids(
tici_scan->idx_scan().fts_query_info().sort_column_ids().begin(),
tici_scan->idx_scan().fts_query_info().sort_column_ids().end())
query_mode == TiCIQueryMode::FTS
? std::vector<Int64>(
tici_scan->idx_scan().fts_query_info().sort_column_ids().begin(),
tici_scan->idx_scan().fts_query_info().sort_column_ids().end())
: std::vector<Int64>())
, sort_column_asc(
tici_scan->idx_scan().fts_query_info().sort_column_asc().begin(),
tici_scan->idx_scan().fts_query_info().sort_column_asc().end())
query_mode == TiCIQueryMode::FTS
? std::vector<bool>(
tici_scan->idx_scan().fts_query_info().sort_column_asc().begin(),
tici_scan->idx_scan().fts_query_info().sort_column_asc().end())
: std::vector<bool>())
{}

void TiCIScan::constructTiCIScanForRemoteRead(tipb::IndexScan * tipb_index_scan) const
Expand Down
17 changes: 16 additions & 1 deletion dbms/src/Flash/Coprocessor/TiCIScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ namespace DB
{
class DAGContext;

enum class TiCIQueryMode
{
FTS,
Vector,
};

class TiCIScan
{
public:
Expand All @@ -38,13 +44,22 @@ class TiCIScan
const int & getLimit() const { return limit; }
const tipb::Executor * getTiCIScan() const { return tici_scan; }

TiCIQueryMode getQueryMode() const { return query_mode; }

void constructTiCIScanForRemoteRead(tipb::IndexScan * tipb_index_scan) const;

const ::google::protobuf::RepeatedPtrField<::tipb::Expr> & getMatchExpr() const
{
RUNTIME_CHECK(query_mode == TiCIQueryMode::FTS);
return tici_scan->idx_scan().fts_query_info().match_expr();
}

const tipb::TiCIVectorQueryInfo & getVectorQueryInfo() const
{
RUNTIME_CHECK(query_mode == TiCIQueryMode::Vector);
return tici_scan->idx_scan().tici_vector_query_info();
}

bool isCount() const { return is_count_agg; }

void setIsCountAgg(bool v) { is_count_agg = v; }
Expand All @@ -65,7 +80,7 @@ class TiCIScan
const int index_id;
TiDB::ColumnInfos return_columns;
NamesAndTypes names_and_types;
[[maybe_unused]] tipb::FTSQueryType query_type;
TiCIQueryMode query_mode;
const TableShardInfos shard_infos;
const int limit;
std::vector<Int64> sort_column_ids;
Expand Down
49 changes: 36 additions & 13 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,43 @@ void PhysicalPlan::buildTableScan(const String & executor_id, const tipb::Execut

void PhysicalPlan::buildTiCIScan(const String & executor_id, const tipb::Executor * executor)
{
RUNTIME_ASSERT(executor->idx_scan().has_fts_query_info());
RUNTIME_ASSERT(
executor->idx_scan().has_fts_query_info() || executor->idx_scan().has_tici_vector_query_info(),
"IndexScan must have either fts_query_info or tici_vector_query_info");
TiCIScan tici_scan(executor, executor_id, dagContext());
LOG_INFO(
log,
"tici scan: keyspace_id={} table_id={} index_id={} limit={} shard_count={} match_expr_size={} query_type={} "
"start_ts={}",
tici_scan.getKeyspaceID(),
tici_scan.getTableId(),
tici_scan.getIndexId(),
tici_scan.getLimit(),
tici_scan.getShardInfos().shard_info_list.size(),
tici_scan.getMatchExpr().size(),
tipb::FTSQueryType_Name(executor->idx_scan().fts_query_info().query_type()),
context.getSettingsRef().read_tso);
if (tici_scan.getQueryMode() == TiCIQueryMode::Vector)
{
const auto & vqi = executor->idx_scan().tici_vector_query_info();
LOG_INFO(
log,
"tici vector scan: keyspace_id={} table_id={} index_id={} col_id={} distance_metric={} top_k={} "
"dimension={} filter_expr_size={} shard_count={} start_ts={}",
tici_scan.getKeyspaceID(),
tici_scan.getTableId(),
tici_scan.getIndexId(),
vqi.column_id(),
tipb::VectorDistanceMetric_Name(vqi.distance_metric()),
vqi.top_k(),
vqi.dimension(),
vqi.filter_expr_size(),
tici_scan.getShardInfos().shard_info_list.size(),
context.getSettingsRef().read_tso);
}
else
{
LOG_INFO(
log,
"tici scan: keyspace_id={} table_id={} index_id={} limit={} shard_count={} match_expr_size={} "
"query_type={} start_ts={}",
tici_scan.getKeyspaceID(),
tici_scan.getTableId(),
tici_scan.getIndexId(),
tici_scan.getLimit(),
tici_scan.getShardInfos().shard_info_list.size(),
tici_scan.getMatchExpr().size(),
tipb::FTSQueryType_Name(executor->idx_scan().fts_query_info().query_type()),
context.getSettingsRef().read_tso);
}
pushBack(PhysicalTiCIScan::build(executor_id, log, tici_scan));
dagContext().table_scan_executor_id = executor_id;
}
Expand Down
50 changes: 35 additions & 15 deletions dbms/src/Storages/StorageTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,41 @@ void StorageTantivy::read(
auto shards_snapshot = std::move(*local_shards_snapshot);
local_shards_snapshot.reset();

auto tici_task_pool = std::make_shared<TS::TiCIReadTaskPool>(
log,
tici_scan.getKeyspaceID(),
tici_scan.getTableId(),
tici_scan.getIndexId(),
local_read,
return_columns,
tici_scan.getLimit(),
tici_scan.getSortColumnIds(),
tici_scan.getSortColumnAsc(),
context.getSettingsRef().read_tso,
tici_scan.getMatchExpr(),
tici_scan.isCount(),
context.getTimezoneInfo(),
std::move(shards_snapshot));
std::shared_ptr<TS::TiCIReadTaskPool> tici_task_pool;
if (tici_scan.getQueryMode() == TiCIQueryMode::Vector)
{
auto vector_state = TS::TiCIReadTaskPool::buildVectorState(
tici_scan.getVectorQueryInfo(),
context.getTimezoneInfo());
tici_task_pool = std::make_shared<TS::TiCIReadTaskPool>(
log,
tici_scan.getKeyspaceID(),
tici_scan.getTableId(),
tici_scan.getIndexId(),
local_read,
return_columns,
context.getSettingsRef().read_tso,
std::move(vector_state),
std::move(shards_snapshot));
}
else
{
tici_task_pool = std::make_shared<TS::TiCIReadTaskPool>(
log,
tici_scan.getKeyspaceID(),
tici_scan.getTableId(),
tici_scan.getIndexId(),
local_read,
return_columns,
tici_scan.getLimit(),
tici_scan.getSortColumnIds(),
tici_scan.getSortColumnAsc(),
context.getSettingsRef().read_tso,
tici_scan.getMatchExpr(),
tici_scan.isCount(),
context.getTimezoneInfo(),
std::move(shards_snapshot));
}

num_streams = std::max(1, std::min(num_streams, local_read.size()));
// local read
Expand Down
Loading