feat(warp): add multi-vector interface and brute force implementation#1602
feat(warp): add multi-vector interface and brute force implementation#1602
Conversation
Summary of ChangesHello @inabao, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the WARP index, a new indexing structure optimized for multi-vector datasets and maxsin similarity calculations. This enhancement allows the system to efficiently handle complex data structures, such as those found in ColBERT-style retrieval, by supporting variable numbers of vectors per document. The addition of serialization capabilities ensures that these indexes can be easily persisted and reloaded, improving overall system usability and performance. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new WARP index for multi-vector search, which is a great feature for ColBERT-style retrieval. The implementation looks mostly solid, but I've found a few critical issues. The search logic in warp.cpp incorrectly uses a max-heap instead of a min-heap, which will return the farthest neighbors instead of the nearest. The ground truth calculation for multi-vector tests is also incorrect. Additionally, there are no tests for the new multi-vector functionality. I've also pointed out some maintainability improvements, such as refactoring duplicated code in the Add method. Please address these points to ensure the correctness and robustness of the new feature.
| // For each query, compute maxsin score for all documents | ||
| // Use a heap to maintain top-k results | ||
| auto search_func = [&](InnerIdType start_doc, InnerIdType end_doc) -> DistHeapPtr { | ||
| auto heap = DistanceHeap::MakeInstanceBySize<true, true>(this->allocator_, request.topk_); |
There was a problem hiding this comment.
The compute_maxsin_similarity function returns a score that is a sum of distances, where a smaller score is better. However, DistanceHeap::MakeInstanceBySize<true, true> creates a max-heap, which will keep the items with the largest scores (i.e., the farthest neighbors). This is incorrect for a k-NN search. You should use a min-heap to find the nearest neighbors.
| auto heap = DistanceHeap::MakeInstanceBySize<true, true>(this->allocator_, request.topk_); | |
| auto heap = DistanceHeap::MakeInstanceBySize<false, true>(this->allocator_, request.topk_); |
src/algorithm/warp.cpp
Outdated
| limited_size = std::numeric_limits<int64_t>::max(); | ||
| } | ||
|
|
||
| auto heap = std::make_shared<StandardHeap<true, true>>(this->allocator_, limited_size); |
There was a problem hiding this comment.
When a limited_size is provided, RangeSearch should return the items with the smallest distances within the given radius. The current implementation uses StandardHeap<true, true>, which is a max-heap. This will incorrectly keep the items with the largest distances. You should use a min-heap to get the correct behavior.
| auto heap = std::make_shared<StandardHeap<true, true>>(this->allocator_, limited_size); | |
| auto heap = std::make_shared<StandardHeap<false, true>>(this->allocator_, limited_size); |
| static float | ||
| CalMultiVectorDistance(const vsag::DatasetPtr query, | ||
| uint64_t query_idx, | ||
| const vsag::DatasetPtr base, | ||
| uint64_t base_idx) { | ||
| auto dim = base->GetDim(); | ||
| auto* query_counts = query->GetVectorCounts(); | ||
| auto* base_counts = base->GetVectorCounts(); | ||
| auto* query_vecs = query->GetFloat32Vectors(); | ||
| auto* base_vecs = base->GetFloat32Vectors(); | ||
| auto dist_func = get_distance_func("ip"); | ||
|
|
||
| // Calculate vector start offsets | ||
| uint32_t q_vec_start = 0; | ||
| for (uint64_t i = 0; i < query_idx; ++i) { | ||
| q_vec_start += query_counts[i]; | ||
| } | ||
| uint32_t b_vec_start = 0; | ||
| for (uint64_t i = 0; i < base_idx; ++i) { | ||
| b_vec_start += base_counts[i]; | ||
| } | ||
|
|
||
| // Calculate max similarity (min distance) across all pairs of vectors | ||
| float min_dist = std::numeric_limits<float>::max(); | ||
| for (uint32_t qv = 0; qv < query_counts[query_idx]; ++qv) { | ||
| for (uint32_t bv = 0; bv < base_counts[base_idx]; ++bv) { | ||
| float dist = dist_func( | ||
| query_vecs + (q_vec_start + qv) * dim, base_vecs + (b_vec_start + bv) * dim, dim); | ||
| if (dist < min_dist) { | ||
| min_dist = dist; | ||
| } | ||
| } | ||
| } | ||
| return min_dist; | ||
| } |
There was a problem hiding this comment.
The CalMultiVectorDistance function incorrectly calculates the distance for multi-vector documents. It computes the minimum distance over all pairs of vectors between the query and base documents. The correct ColBERT-style "max-sim" score is the sum of maximum similarities for each query vector. This translates to summing the minimum distances for each query vector against all document vectors. This incorrect ground truth calculation will lead to unreliable tests.
static float
CalMultiVectorDistance(const vsag::DatasetPtr query,
uint64_t query_idx,
const vsag::DatasetPtr base,
uint64_t base_idx) {
auto dim = base->GetDim();
auto* query_counts = query->GetVectorCounts();
auto* base_counts = base->GetVectorCounts();
auto* query_vecs = query->GetFloat32Vectors();
auto* base_vecs = base->GetFloat32Vectors();
auto dist_func = get_distance_func("ip");
// Calculate vector start offsets
uint32_t q_vec_start = 0;
for (uint64_t i = 0; i < query_idx; ++i) {
q_vec_start += query_counts[i];
}
uint32_t b_vec_start = 0;
for (uint64_t i = 0; i < base_idx; ++i) {
b_vec_start += base_counts[i];
}
float total_score = 0.0f;
for (uint32_t qv = 0; qv < query_counts[query_idx]; ++qv) {
float min_dist_for_qv = std::numeric_limits<float>::max();
for (uint32_t bv = 0; bv < base_counts[base_idx]; ++bv) {
float dist = dist_func(
query_vecs + (q_vec_start + qv) * dim, base_vecs + (b_vec_start + bv) * dim, dim);
min_dist_for_qv = std::min(min_dist_for_qv, dist);
}
total_score += min_dist_for_qv;
}
return total_score;
}|
|
||
| } // namespace fixtures | ||
|
|
||
| TEST_CASE_PERSISTENT_FIXTURE(fixtures::WarpTestIndex, "Warp Add Test", "[ft][warp]") { |
There was a problem hiding this comment.
src/algorithm/warp.cpp
Outdated
| if (vector_counts == nullptr) { | ||
| auto add_func = [&](const float* data, | ||
| const int64_t label, | ||
| const AttributeSet* attr, | ||
| const char* extra_info) -> std::optional<int64_t> { | ||
| InnerIdType inner_id; | ||
| { | ||
| std::scoped_lock add_lock(this->label_lookup_mutex_, this->add_mutex_); | ||
| if (this->label_table_->CheckLabel(label)) { | ||
| return label; | ||
| } | ||
| inner_id = this->total_count_; | ||
| this->total_count_++; | ||
| this->resize(total_count_); | ||
| this->label_table_->Insert(inner_id, label); | ||
| } | ||
| std::shared_lock global_lock(this->global_mutex_); | ||
| if (use_attribute_filter_ && attr != nullptr) { | ||
| this->attr_filter_index_->Insert(*attr, inner_id); | ||
| } | ||
|
|
||
| this->add_one_doc(data, 1, inner_id); | ||
| return std::nullopt; | ||
| }; | ||
|
|
||
| std::vector<std::future<std::optional<int64_t>>> futures; | ||
| const auto total = data->GetNumElements(); | ||
| const auto* labels = data->GetIds(); | ||
| const auto* vectors = data->GetFloat32Vectors(); | ||
| const auto* attrs = data->GetAttributeSets(); | ||
| const auto* extra_info = data->GetExtraInfos(); | ||
| const auto extra_info_size = data->GetExtraInfoSize(); | ||
| for (int64_t j = 0; j < total; ++j) { | ||
| const auto label = labels[j]; | ||
| { | ||
| std::lock_guard label_lock(this->label_lookup_mutex_); | ||
| if (this->label_table_->CheckLabel(label)) { | ||
| failed_ids.emplace_back(label); | ||
| continue; | ||
| } | ||
| } | ||
| if (this->thread_pool_ != nullptr) { | ||
| auto future = | ||
| this->thread_pool_->GeneralEnqueue(add_func, | ||
| vectors + j * dim_, | ||
| label, | ||
| attrs == nullptr ? nullptr : attrs + j, | ||
| extra_info + j * extra_info_size); | ||
| futures.emplace_back(std::move(future)); | ||
| } else { | ||
| if (auto add_res = add_func(vectors + j * dim_, | ||
| label, | ||
| attrs == nullptr ? nullptr : attrs + j, | ||
| extra_info + j * extra_info_size); | ||
| add_res.has_value()) { | ||
| failed_ids.emplace_back(add_res.value()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (this->thread_pool_ != nullptr) { | ||
| for (auto& future : futures) { | ||
| if (auto reply = future.get(); reply.has_value()) { | ||
| failed_ids.emplace_back(reply.value()); | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| // Multi-vector document handling | ||
| const auto total = data->GetNumElements(); | ||
| const auto* labels = data->GetIds(); | ||
| const auto* vectors = data->GetFloat32Vectors(); | ||
| const auto* attrs = data->GetAttributeSets(); | ||
| const auto* extra_info = data->GetExtraInfos(); | ||
| const auto extra_info_size = data->GetExtraInfoSize(); | ||
|
|
||
| // Calculate total vectors | ||
| uint64_t total_vectors = 0; | ||
| for (int64_t i = 0; i < total; ++i) { | ||
| total_vectors += vector_counts[i]; | ||
| } | ||
|
|
||
| auto add_func = [&](const float* doc_vectors, | ||
| uint32_t doc_vec_count, | ||
| const int64_t label, | ||
| const AttributeSet* attr, | ||
| const char* extra_info) -> std::optional<int64_t> { | ||
| InnerIdType inner_id; | ||
| { | ||
| std::scoped_lock add_lock(this->label_lookup_mutex_, this->add_mutex_); | ||
| if (this->label_table_->CheckLabel(label)) { | ||
| return label; | ||
| } | ||
| inner_id = this->total_count_; | ||
| this->total_count_++; | ||
| this->resize(total_count_); | ||
| this->label_table_->Insert(inner_id, label); | ||
| } | ||
| std::shared_lock global_lock(this->global_mutex_); | ||
| if (use_attribute_filter_ && attr != nullptr) { | ||
| this->attr_filter_index_->Insert(*attr, inner_id); | ||
| } | ||
|
|
||
| this->add_one_doc(doc_vectors, doc_vec_count, inner_id); | ||
| return std::nullopt; | ||
| }; | ||
|
|
||
| std::vector<std::future<std::optional<int64_t>>> futures; | ||
| uint64_t vec_offset = 0; | ||
|
|
||
| for (int64_t j = 0; j < total; ++j) { | ||
| const auto label = labels[j]; | ||
| uint32_t doc_vec_count = vector_counts[j]; | ||
|
|
||
| { | ||
| std::lock_guard label_lock(this->label_lookup_mutex_); | ||
| if (this->label_table_->CheckLabel(label)) { | ||
| failed_ids.emplace_back(label); | ||
| vec_offset += doc_vec_count; | ||
| continue; | ||
| } | ||
| } | ||
|
|
||
| if (this->thread_pool_ != nullptr) { | ||
| auto future = | ||
| this->thread_pool_->GeneralEnqueue(add_func, | ||
| vectors + vec_offset * dim_, | ||
| doc_vec_count, | ||
| label, | ||
| attrs == nullptr ? nullptr : attrs + j, | ||
| extra_info + j * extra_info_size); | ||
| futures.emplace_back(std::move(future)); | ||
| } else { | ||
| if (auto add_res = add_func(vectors + vec_offset * dim_, | ||
| doc_vec_count, | ||
| label, | ||
| attrs == nullptr ? nullptr : attrs + j, | ||
| extra_info + j * extra_info_size); | ||
| add_res.has_value()) { | ||
| failed_ids.emplace_back(add_res.value()); | ||
| } | ||
| } | ||
|
|
||
| vec_offset += doc_vec_count; | ||
| } | ||
|
|
||
| if (this->thread_pool_ != nullptr) { | ||
| for (auto& future : futures) { | ||
| if (auto reply = future.get(); reply.has_value()) { | ||
| failed_ids.emplace_back(reply.value()); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The Add method contains significant code duplication for handling single-vector and multi-vector cases. The logic inside the if (vector_counts == nullptr) and else blocks is very similar, including the definition of add_func and the loops for enqueuing tasks. This can be refactored to improve maintainability by creating a single, more generic implementation that handles both cases. For example, you could use a single loop and determine the vector count (1 for single-vector) and vector offset inside the loop before calling a common add function.
src/algorithm/warp.cpp
Outdated
| // Calculate total vectors | ||
| uint64_t total_vectors = 0; | ||
| for (int64_t i = 0; i < total; ++i) { | ||
| total_vectors += vector_counts[i]; | ||
| } |
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #1602 +/- ##
==========================================
+ Coverage 90.44% 90.69% +0.25%
==========================================
Files 330 333 +3
Lines 19514 19781 +267
==========================================
+ Hits 17649 17940 +291
+ Misses 1865 1841 -24
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds a new WARP index type to VSAG to support ColBERT-style multi-vector document retrieval (MaxSim / “maxsim”) and introduces a VectorCounts dataset field to represent variable vectors-per-document.
Changes:
- Introduces
WARPindex implementation, parameters, and factory wiring. - Extends
vsag::Dataset/DatasetImplwithVectorCountsgetters/setters plus copy/append/destructor support. - Adds tests and a C++ example demonstrating multi-vector build/search usage.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
src/algorithm/warp.h |
Declares the new WARP inner index and its multi-vector bookkeeping (doc_offsets_). |
src/algorithm/warp.cpp |
Implements multi-vector add/search/serialize logic for WARP. |
src/algorithm/warp_parameter.h |
Defines WARP build/search parameter types. |
src/algorithm/warp_parameter.cpp |
Implements WARP parameter JSON parsing/serialization and compatibility checks. |
src/factory/engine.cpp |
Registers "warp" creation path in the engine factory. |
src/constants.cpp |
Adds INDEX_WARP and dataset key VECTOR_COUNTS. |
include/vsag/constants.h |
Exposes new public constants INDEX_WARP and VECTOR_COUNTS. |
include/vsag/index.h |
Extends public IndexType enum with WARP. |
include/vsag/dataset.h |
Adds public dataset API: VectorCounts() / GetVectorCounts(). |
src/dataset_impl.h |
Extends DatasetImpl internal variant storage to include const uint32_t* vector counts. |
src/dataset_impl.cpp |
Adds ownership/deep-copy/append/destructor handling for VectorCounts. |
src/dataset_impl_test.cpp |
Updates dataset unit tests to include VectorCounts in copy/append equality. |
tests/fixtures/test_dataset_pool.h |
Adds is_multi_vector option to dataset pool APIs. |
tests/fixtures/test_dataset_pool.cpp |
Includes is_multi_vector in dataset pool keying and creation. |
tests/fixtures/test_dataset.h |
Adds is_multi_vector to fixture dataset creation signature. |
tests/fixtures/test_dataset.cpp |
Generates multi-vector fixture datasets and computes MaxSim-style ground truth. |
tests/test_index.cpp |
Updates query slicing helper to support multi-vector queries via VectorCounts. |
tests/test_simple_index.cpp |
Fixes dataset creation metric argument used by the simple index test. |
tests/test_warp.cpp |
Adds functional tests for WARP add/search/range/serialization. |
examples/cpp/CMakeLists.txt |
Adds build target for the WARP example. |
examples/cpp/110_index_warp.cpp |
New example showing multi-vector dataset construction and WARP search. |
.claude/skills/commit/SKILL.md |
Adds repository automation documentation for commit workflow. |
|
|
||
| #include <iostream> |
There was a problem hiding this comment.
This file uses std::mt19937 and std::uniform_*_distribution but does not include , which will fail to compile on stricter standard library implementations. Add the missing include (and any other needed headers such as / if not provided transitively).
| #include <iostream> | |
| #include <cstdint> | |
| #include <iostream> | |
| #include <random> |
| std::random_device rd; | ||
| std::mt19937 gen(rd()); | ||
| std::uniform_int_distribution<uint32_t> dist(1, 5); | ||
|
|
||
| std::vector<uint32_t> vector_counts(num_elements); | ||
| uint64_t total_vectors = 0; | ||
| for (uint64_t i = 0; i < num_elements; ++i) { | ||
| vector_counts[i] = dist(gen); | ||
| total_vectors += vector_counts[i]; | ||
| } | ||
|
|
||
| std::iota(ids.begin(), ids.end(), 0); | ||
| base->Dim(dim) | ||
| ->Ids(fixtures::CopyVector(ids, allocator)) | ||
| ->Paths(paths) | ||
| ->AttributeSets(attr_sets) | ||
| ->NumElements(num_elements) | ||
| ->Distances(fixtures::CopyVector(distances, allocator)) | ||
| ->VectorCounts(fixtures::CopyVector(vector_counts, allocator)) | ||
| ->Owner(true, allocator); | ||
| base->Float32Vectors(fixtures::CopyVector(vecs, allocator)) | ||
| ->Int8Vectors(fixtures::CopyVector(vecs_int8, allocator)); |
There was a problem hiding this comment.
CreateTestDataset assigns random VectorCounts (1..5) but still generates Float32Vectors/Int8Vectors for only num_elements vectors. That makes the dataset internally inconsistent with the VectorCounts contract (and total_vectors is computed but unused). Consider either keeping VectorCounts all 1 for this unit test, or generating vector storage sized to sum(VectorCounts) and adjusting any size-dependent memcmp logic accordingly.
src/algorithm/warp.cpp
Outdated
| int64_t num_elements = data->GetNumElements(); | ||
|
|
There was a problem hiding this comment.
WARP::Add defines num_elements but never uses it. With builds that enable warnings-as-errors, this can break compilation. Please remove the unused variable or use it (e.g., for validation/logging).
| int64_t num_elements = data->GetNumElements(); |
src/algorithm/warp.cpp
Outdated
|
|
||
| void | ||
| WARP::Train(const DatasetPtr& data) { | ||
| this->inner_codes_->Train(data->GetFloat32Vectors(), data->GetNumElements()); |
There was a problem hiding this comment.
WARP::Train uses data->GetNumElements() as the training count, but for multi-vector datasets Float32Vectors contains the concatenated total vector count (sum of VectorCounts), not the document count. This will under-train (and can mis-train) any quantizer that requires training. Consider computing total_vectors = sum(vector_counts[i]) and passing that to inner_codes_->Train().
| this->inner_codes_->Train(data->GetFloat32Vectors(), data->GetNumElements()); | |
| const float* vectors = data->GetFloat32Vectors(); | |
| CHECK_ARGUMENT(vectors != nullptr, "base.float_vector is nullptr"); | |
| uint64_t train_count = data->GetNumElements(); | |
| const uint32_t* vector_counts = data->GetVectorCounts(); | |
| if (vector_counts != nullptr) { | |
| train_count = std::accumulate( | |
| vector_counts, vector_counts + data->GetNumElements(), static_cast<uint64_t>(0)); | |
| } | |
| this->inner_codes_->Train(vectors, train_count); |
| WARP::resize(uint64_t new_size) { | ||
| uint64_t new_size_power_2 = | ||
| next_multiple_of_power_of_two(new_size, this->resize_increase_count_bit_); | ||
| auto cur_size = this->max_capacity_.load(); | ||
| if (cur_size >= new_size_power_2) { | ||
| return; | ||
| } | ||
| std::lock_guard lock(this->global_mutex_); | ||
| cur_size = this->max_capacity_.load(); | ||
| if (cur_size < new_size_power_2) { | ||
| this->inner_codes_->Resize(new_size_power_2); | ||
| // doc_offsets_ size is new_size_power_2 + 1 (to store total_vector_count_ at the end) | ||
| doc_offsets_.resize(new_size_power_2 + 1); |
There was a problem hiding this comment.
WARP::resize() resizes inner_codes_ based on the number of documents (new_size), but inner_codes_ stores vectors and add_one_doc inserts at indices up to total_vector_count_. With multi-vector docs, total_vector_count_ can greatly exceed total_count_, leading to out-of-capacity inserts. Resize should be driven by required vector capacity (e.g., total_vector_count_ + vec_count) rather than document count.
| auto* base_counts = base->GetVectorCounts(); | ||
| auto* query_vecs = query->GetFloat32Vectors(); | ||
| auto* base_vecs = base->GetFloat32Vectors(); | ||
| auto dist_func = get_distance_func("ip"); |
There was a problem hiding this comment.
CalMultiVectorDistance hard-codes the distance function to IP via get_distance_func("ip"), ignoring the dataset/index metric. This makes the generated ground truth incorrect for multi-vector tests using metric_type != ip. Consider passing metric_str through and using get_distance_func(metric_str).
| auto dist_func = get_distance_func("ip"); | |
| auto dist_func = get_distance_func(base->GetMetricType()); |
src/algorithm/warp.cpp
Outdated
| int64_t query_num = request.query_->GetNumElements(); | ||
| uint32_t query_vec_count = query_vec_counts[0]; | ||
|
|
There was a problem hiding this comment.
SearchWithRequest stores query_num but never uses it. If warnings are treated as errors (ENABLE_WERROR), this may fail the build. Consider removing the variable or using it to validate expected query batch size (e.g., enforce NumElements()==1 if that's required).
| int64_t query_num = request.query_->GetNumElements(); | |
| uint32_t query_vec_count = query_vec_counts[0]; |
src/algorithm/warp.cpp
Outdated
| auto future = this->thread_pool_->GeneralEnqueue(add_func, | ||
| vectors + vec_offset * dim_, | ||
| doc_vec_count, | ||
| label, | ||
| attrs == nullptr ? nullptr : attrs + j, | ||
| extra_info + j * extra_info_size); | ||
| futures.emplace_back(std::move(future)); | ||
| } else { | ||
| if (auto add_res = add_func(vectors + vec_offset * dim_, | ||
| doc_vec_count, | ||
| label, | ||
| attrs == nullptr ? nullptr : attrs + j, | ||
| extra_info + j * extra_info_size); |
There was a problem hiding this comment.
In Add(), extra_info is fetched from the dataset but may be nullptr when extra_info_size==0 (common). The expression extra_info + j * extra_info_size performs pointer arithmetic on a potentially-null pointer, which is undefined behavior even if the result is never dereferenced. Consider passing nullptr when extra_info_size==0 or extra_info==nullptr (and/or removing the unused extra_info parameter from add_func since it isn’t used).
| auto future = this->thread_pool_->GeneralEnqueue(add_func, | |
| vectors + vec_offset * dim_, | |
| doc_vec_count, | |
| label, | |
| attrs == nullptr ? nullptr : attrs + j, | |
| extra_info + j * extra_info_size); | |
| futures.emplace_back(std::move(future)); | |
| } else { | |
| if (auto add_res = add_func(vectors + vec_offset * dim_, | |
| doc_vec_count, | |
| label, | |
| attrs == nullptr ? nullptr : attrs + j, | |
| extra_info + j * extra_info_size); | |
| auto future = this->thread_pool_->GeneralEnqueue( | |
| add_func, | |
| vectors + vec_offset * dim_, | |
| doc_vec_count, | |
| label, | |
| attrs == nullptr ? nullptr : attrs + j, | |
| (extra_info == nullptr || extra_info_size == 0 | |
| ? nullptr | |
| : extra_info + j * extra_info_size)); | |
| futures.emplace_back(std::move(future)); | |
| } else { | |
| if (auto add_res = add_func( | |
| vectors + vec_offset * dim_, | |
| doc_vec_count, | |
| label, | |
| attrs == nullptr ? nullptr : attrs + j, | |
| (extra_info == nullptr || extra_info_size == 0 | |
| ? nullptr | |
| : extra_info + j * extra_info_size)); |
src/algorithm/warp.cpp
Outdated
| // Store the document's vectors | ||
| for (uint32_t i = 0; i < vec_count; ++i) { | ||
| this->inner_codes_->InsertVector(data + i * dim_, total_vector_count_ + i); | ||
| } | ||
|
|
||
| // Update document offsets | ||
| // doc_offsets_[inner_id] is the starting vector index | ||
| // doc_offsets_[inner_id + 1] will be the ending vector index (= total_vector_count_ + vec_count) | ||
| doc_offsets_[inner_id] = total_vector_count_; | ||
| doc_offsets_[inner_id + 1] = total_vector_count_ + vec_count; | ||
|
|
||
| total_vector_count_ += vec_count; |
There was a problem hiding this comment.
add_one_doc updates total_vector_count_ and writes doc_offsets_ without any exclusive synchronization. In Add(), add_one_doc is called under a shared_lock(global_mutex_), so multiple add threads can race and corrupt offsets / vector indices. Consider reserving a vector range per document using an atomic fetch_add on total_vector_count_ (or taking an exclusive lock) and writing doc_offsets_ based on that reserved start index.
| // Store the document's vectors | |
| for (uint32_t i = 0; i < vec_count; ++i) { | |
| this->inner_codes_->InsertVector(data + i * dim_, total_vector_count_ + i); | |
| } | |
| // Update document offsets | |
| // doc_offsets_[inner_id] is the starting vector index | |
| // doc_offsets_[inner_id + 1] will be the ending vector index (= total_vector_count_ + vec_count) | |
| doc_offsets_[inner_id] = total_vector_count_; | |
| doc_offsets_[inner_id + 1] = total_vector_count_ + vec_count; | |
| total_vector_count_ += vec_count; | |
| // Reserve a unique contiguous range of vector indices for this document. | |
| // This avoids races between concurrent add threads when computing offsets. | |
| auto start_offset = total_vector_count_.fetch_add(vec_count, std::memory_order_acq_rel); | |
| // Store the document's vectors using the reserved index range. | |
| for (uint32_t i = 0; i < vec_count; ++i) { | |
| this->inner_codes_->InsertVector(data + i * dim_, start_offset + i); | |
| } | |
| // Update document offsets | |
| // doc_offsets_[inner_id] is the starting vector index | |
| // doc_offsets_[inner_id + 1] will be the ending vector index (= start_offset + vec_count) | |
| doc_offsets_[inner_id] = start_offset; | |
| doc_offsets_[inner_id + 1] = start_offset + vec_count; |
| float min_sim = std::numeric_limits<float>::max(); | ||
| Vector<InnerIdType> vec_indices(doc_vec_count, allocator_); | ||
| Vector<float> dists(doc_vec_count, allocator_); | ||
| std::iota(vec_indices.begin(), vec_indices.end(), doc_start_vec_idx); | ||
| this->inner_codes_->Query(dists.data(), computer, vec_indices.data(), doc_vec_count); | ||
| for (const auto& sim : dists) { | ||
| min_sim = std::min(min_sim, sim); | ||
| } | ||
| total_score += min_sim; |
There was a problem hiding this comment.
compute_maxsin_similarity allocates vec_indices and dists (and fills them with iota) for every (query vector, document) pair. In a brute-force scan this allocation work will dominate runtime. Consider reusing buffers (thread-local or passed in) and avoiding vec_indices materialization (e.g., add a Query-by-range path or scan contiguous ids directly).
e0cc3ab to
e0b445a
Compare
|
Thank you for the detailed review! I have addressed the following comments in the latest commit (6fd225a): Addressed Issues1. WARP::Train using document count instead of vector count (Copilot)✅ Fixed: Now properly calculates total vector count using 2. WARP::resize based on document count instead of vector count (Copilot)✅ Fixed: Pre-calculate total vectors in 3. Unused variables (Copilot)✅ Fixed: Removed unused 4. nullptr pointer arithmetic for extra_info (Copilot)✅ Fixed: Added proper nullptr check before pointer arithmetic. Clarification on Heap UsageRegarding the comment about using The
The same pattern is used in other indexes like Performance ImprovementThe functests now complete in ~1.2 seconds (previously it would timeout):
The key optimization was pre-calculating total vectors and reserving capacity once, instead of triggering resize on every document addition. |
|
/dco |
- Add warp algorithm for multi-vector similarity search - Add warp_parameter for configuration - Add serialize/deserialize support - Add test cases for warp index - Optimize compute_maxsin_similarity by moving vec_indices and dists allocation outside the query loop - Implement binary tree merge for parallel heap merging in SearchWithRequest - Add parallel support for RangeSearch using atomic index distribution - Use BatchInsertVector in add_one_doc for batch insertion - Simplify serialization/deserialization using WriteVector/ReadVector for doc_offsets - Fix Train to use total vector count instead of document count - Pre-calculate and reserve vector capacity in Add to avoid frequent resizes - Remove unused variables (num_elements, query_num) - Fix nullptr pointer arithmetic for extra_info Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: jinjiabao.jjb <jinjiabao.jjb@antgroup.com>
link to #1601