perf(index): replace stats_mutex_ with lock-free atomic counter in WindowResultQueue#1721
Hidden character warning
perf(index): replace stats_mutex_ with lock-free atomic counter in WindowResultQueue#1721LHT129 wants to merge 1 commit intoantgroup:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves performance in high-QPS concurrent search scenarios by replacing an exclusive mutex lock with a lock-free atomic counter for collecting statistics within the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request replaces the stats_mutex_ lock with a lock-free atomic counter in WindowResultQueue to improve performance in high-QPS concurrent search scenarios. The changes involve converting WindowResultQueue::count_ to std::atomic<uint64_t>, using fetch_add for lock-free Push operation, and removing stats_mutex_ protection in HNSW::GetStats() and search methods, as well as in DiskANN::GetStats() and search methods. The code changes appear to be well-structured and address the identified performance bottleneck. However, I have added a review comment to address a potential issue.
| uint64_t window_size = queue_.size(); | ||
| queue_[count_ % window_size] = value; | ||
| count_++; | ||
| uint64_t pos = count_.fetch_add(1, std::memory_order_relaxed); |
There was a problem hiding this comment.
The memory_order_relaxed memory order is used for the atomic increment. While this provides the least synchronization overhead, it might lead to reordering issues in some rare cases. Consider whether memory_order_acquire or memory_order_seq_cst would be more appropriate to ensure proper synchronization and prevent potential data races, especially if the value of count_ is used in other parts of the code that require consistent ordering. Since the original implementation used a mutex, a stronger memory order might be needed to maintain the same level of consistency.
| uint64_t pos = count_.fetch_add(1, std::memory_order_relaxed); | |
| uint64_t pos = count_.fetch_add(1, std::memory_order_relaxed); // Consider using memory_order_acquire or memory_order_seq_cst |
There was a problem hiding this comment.
Pull request overview
This PR aims to remove lock contention in stats collection during concurrent searches by replacing the stats_mutex_-guarded counter with a std::atomic counter, and by removing stats_mutex_ locking around stats updates/reads in HNSW and DiskANN.
Changes:
- Convert
WindowResultQueue::count_tostd::atomic<uint64_t>and usefetch_addinPush(). - Remove
stats_mutex_locking around statsPush()calls in HNSW/DiskANN search paths. - Remove
stats_mutex_locking inHNSW::GetStats()andDiskANN::GetStats()when computing averages.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/utils/window_result_queue.h | Switch count_ to an atomic counter to enable lock-free increments. |
| src/utils/window_result_queue.cpp | Use fetch_add/load for count management in Push() and GetAvgResult(). |
| src/index/hnsw.cpp | Remove stats_mutex_ locking around stats updates and stats reads. |
| src/index/diskann.cpp | Remove stats_mutex_ locking around stats updates and stats reads. |
| WindowResultQueue::Push(float value) { | ||
| uint64_t window_size = queue_.size(); | ||
| queue_[count_ % window_size] = value; | ||
| count_++; | ||
| uint64_t pos = count_.fetch_add(1, std::memory_order_relaxed); | ||
| queue_[pos % window_size] = value; | ||
| } |
| uint64_t statistic_num = | ||
| std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size()); |
src/index/hnsw.cpp
Outdated
| std::lock_guard<std::mutex> lock(stats_mutex_); | ||
| result_queues_[STATSTIC_KNN_TIME].Push(static_cast<float>(time_cost)); | ||
| } | ||
| result_queues_[STATSTIC_KNN_TIME].Push(static_cast<float>(time_cost)); |
src/index/diskann.cpp
Outdated
| result_queues_[STATSTIC_KNN_IO_TIME].Push( | ||
| (query_stats[i].io_us / static_cast<float>(query_stats[i].n_ios)) / | ||
| MACRO_TO_MILLI); |
…ation - Add mutex protection for queue_ access in WindowResultQueue - Initialize result_queues_ keys in constructor to avoid concurrent map modification - Use at() instead of operator[] for thread-safe map access - Add n_ios > 0 check to prevent division by zero in IO time calculation Addresses review comments on PR antgroup#1721 Signed-off-by: LHT129 <tianlan.lht@antgroup.com> Co-authored-by: Kimi-K2.5 <assistant@example.com>
e75fb2a to
d43d56d
Compare
There was a problem hiding this comment.
Pull request overview
This PR aims to reduce contention in concurrent search stats collection by replacing index-level stats_mutex_ usage with a more concurrent-friendly stats recording approach in WindowResultQueue.
Changes:
- Updated
WindowResultQueueto use an atomic counter and added internal synchronization around the backing buffer. - Removed
stats_mutex_-guarded blocks in HNSW/DiskANN search paths and stats getters; pre-populated required stats queues and switched to.at()access. - Added planning/docs artifacts (
plan.md,TASK.md) describing the intended approach.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/utils/window_result_queue.h | Switch count_ to std::atomic and add a per-queue mutex to synchronize the buffer. |
| src/utils/window_result_queue.cpp | Use fetch_add and add locking around buffer writes/reads in Push/GetAvgResult. |
| src/index/hnsw.cpp | Remove stats_mutex_ usage and ensure the required stats queue key is present. |
| src/index/diskann.cpp | Remove stats_mutex_ usage, pre-create stats queues, and avoid divide-by-zero for IO timing stats. |
| plan.md | Documents the intended “lock-free” strategy and rollout steps. |
| TASK.md | Adds a pointer to a local task file path. |
Comments suppressed due to low confidence (1)
TASK.md:1
- This commits an absolute, machine-local filesystem path (including a username) that won’t resolve for other developers and may leak environment details. Prefer removing
TASK.md, or replace it with a repo-relative link/path (or an issue/PR reference) that’s valid for all contributors.
| uint64_t pos = count_.fetch_add(1, std::memory_order_relaxed); | ||
| std::lock_guard<std::mutex> lock(queue_mutex_); | ||
| queue_[pos % window_size] = value; |
| uint64_t statistic_num = | ||
| std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size()); |
| if (statistic_num == 0) { | ||
| return 0.0F; | ||
| } | ||
| std::lock_guard<std::mutex> lock(queue_mutex_); |
| #include <atomic> | ||
| #include <mutex> |
| std::atomic<uint64_t> count_{0}; | ||
| std::vector<float> queue_; | ||
| mutable std::mutex queue_mutex_; |
plan.md
Outdated
| - **接口定义**: 保持现有 API 不变(`Push`、`GetAvgResult`) | ||
|
|
||
| ### 无锁 Push 实现原理 | ||
|
|
Review Response - Need DiscussionThank you for the detailed review feedback. I need to address some concerns: Key Issues Identified
Possible SolutionsOption A: Accept current implementation as-is
Option B: Revert to original mutex implementation
Option C: True lock-free implementation
I'm leaning toward Option A (rename and proceed) since:
@wxyucs - Could you advise on which direction to take? |
LHT129
left a comment
There was a problem hiding this comment.
@gemini-code-assist 感谢详细的 review feedback。
我已经更新了 PR 描述来准确反映当前的实现:这是一个 per-queue locking 方案,而不是完全无锁的。
关于主要的 concerns:
-
Lock-free 声称 vs 实现: 您说得对,最初的描述不准确。实际实现使用了 per-queue mutex 来保护 queue buffer,这确实不是 lock-free。我已经更新了 PR 描述和 title 来准确反映这一点。
-
为什么使用 per-queue locking 而不是完全 lock-free?
- 完全 lock-free 需要对非原子的 float 数组进行无保护访问,这在 C++ 中是 undefined behavior
- Per-queue locking 已经比原来的共享 stats_mutex_ 好很多:
- 消除了单点瓶颈(每个 queue 有自己的锁)
- 不同的 search key 可以并发访问不同的 queue
- 锁的粒度很细(只有 queue write/read)
-
关于 count_ 递增顺序: 当前实现中,atomic count_ 的递增和 mutex 保护是分开的。考虑到统计数据可以容忍微小偏差,这种设计是可接受的。
这个方案在性能和正确性之间取得了平衡。请查看更新后的 PR 描述,如有进一步建议欢迎提出。
18ffa0b to
36a249f
Compare
36a249f to
11ffa59
Compare
There was a problem hiding this comment.
Pull request overview
Reduce stats-recording lock contention during concurrent searches by moving synchronization into each WindowResultQueue and making queue position allocation atomic.
Changes:
- Make
WindowResultQueue::count_atomic and add a per-queue mutex to protect buffer access. - Remove shared
stats_mutex_usage around stats updates and stats reads in HNSW and DiskANN. - Pre-initialize needed stats queues in constructors and switch hot-path pushes to
.at()access.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/utils/window_result_queue.h | Add <atomic>/<mutex>, make count_ atomic, and introduce per-queue mutex. |
| src/utils/window_result_queue.cpp | Use atomic fetch-add for indexing and lock queue buffer on push/read. |
| src/index/hnsw.cpp | Remove global stats lock; pre-create queue and push without stats_mutex_. |
| src/index/diskann.cpp | Remove global stats lock; pre-create queues; guard divide-by-zero in IO-time stats. |
| uint64_t pos = count_.fetch_add(1, std::memory_order_relaxed); | ||
| std::lock_guard<std::mutex> lock(queue_mutex_); | ||
| queue_[pos % window_size] = value; |
There was a problem hiding this comment.
count_ is incremented before the queued value is written under queue_mutex_. A concurrent GetAvgResult() can observe the incremented count_ and include a slot that has not been updated yet (especially when count_ < window_size), producing incorrect averages. To preserve the “write then publish” behavior that previously existed (and that the old shared mutex implicitly provided), either (mandatory) move the fetch_add into the same critical section as the write, or (alternative) add a separate “committed/published” counter that is updated after the write with release semantics and read with acquire semantics in GetAvgResult().
| uint64_t statistic_num = | ||
| std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size()); |
There was a problem hiding this comment.
statistic_num is computed before acquiring queue_mutex_, so it can be derived from a count_ value that doesn’t correspond to the buffer state protected by the mutex (especially given Push() currently increments count_ before writing). Compute statistic_num while holding queue_mutex_ (and, if keeping a separate published counter, base it on that) so the “how many samples are valid” decision is consistent with the snapshot being summed.
| uint64_t statistic_num = | ||
| std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size()); | ||
| if (statistic_num == 0) { | ||
| return 0.0F; | ||
| } | ||
| std::lock_guard<std::mutex> lock(queue_mutex_); |
There was a problem hiding this comment.
statistic_num is computed before acquiring queue_mutex_, so it can be derived from a count_ value that doesn’t correspond to the buffer state protected by the mutex (especially given Push() currently increments count_ before writing). Compute statistic_num while holding queue_mutex_ (and, if keeping a separate published counter, base it on that) so the “how many samples are valid” decision is consistent with the snapshot being summed.
| uint64_t statistic_num = | |
| std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size()); | |
| if (statistic_num == 0) { | |
| return 0.0F; | |
| } | |
| std::lock_guard<std::mutex> lock(queue_mutex_); | |
| std::lock_guard<std::mutex> lock(queue_mutex_); | |
| uint64_t statistic_num = | |
| std::min<uint64_t>(count_.load(std::memory_order_relaxed), queue_.size()); | |
| if (statistic_num == 0) { | |
| return 0.0F; | |
| } |
| result_queues_.at(STATSTIC_KNN_IO).Push(static_cast<float>(query_stats[i].n_ios)); | ||
| result_queues_.at(STATSTIC_KNN_TIME).Push(static_cast<float>(time_cost)); | ||
| if (query_stats[i].n_ios > 0) { | ||
| result_queues_.at(STATSTIC_KNN_IO_TIME) | ||
| .Push((query_stats[i].io_us / static_cast<float>(query_stats[i].n_ios)) / | ||
| MACRO_TO_MILLI); | ||
| } else { | ||
| result_queues_.at(STATSTIC_KNN_IO_TIME).Push(0.0F); |
There was a problem hiding this comment.
This hot path performs multiple associative-container lookups via result_queues_.at(...) per query (and per loop iteration), which can add measurable overhead after removing the shared mutex bottleneck. A tangible improvement is to cache references once (e.g., bind auto& knn_io_q = result_queues_.at(STATSTIC_KNN_IO); etc.) and then call Push() on those references, reducing repeated lookups while preserving the “no insertion” behavior of .at().
…ultQueue - Convert count_ to std::atomic<uint64_t> for atomic position allocation - Add per-queue queue_mutex_ to protect queue buffer access - Remove shared stats_mutex_ in HNSW::GetStats() and search methods - Remove shared stats_mutex_ in DiskANN::GetStats() and search methods - Add division-by-zero protection for IO time statistics This reduces lock contention in high-concurrency search scenarios by replacing a single shared lock with per-queue locks. Signed-off-by: LHT129 <tianlan.lht@antgroup.com>
11ffa59 to
eb32eb5
Compare
Summary
Replace the shared exclusive
stats_mutex_lock with per-queue locking inWindowResultQueueto reduce lock contention in high-QPS concurrent search scenarios.Changes
WindowResultQueue::count_tostd::atomic<uint64_t>for atomic position allocationqueue_mutex_to protect queue buffer access (replacing the sharedstats_mutex_)stats_mutex_protection in HNSW::GetStats() and search methodsstats_mutex_protection in DiskANN::GetStats() and search methodsBackground
The HNSW and DiskANN indices record statistics (search time, IO count, etc.) during each search operation. The previous implementation used a single shared exclusive lock (
stats_mutex_) for all queues, which became a performance bottleneck in high-QPS concurrent search scenarios (10,000+ QPS).This change replaces the shared mutex with per-queue mutexes. Since each search typically uses different queue keys, the contention is significantly reduced compared to the previous single-lock approach.
Implementation Notes
Why not fully lock-free?
The initial design considered a fully lock-free approach using atomic operations on the queue buffer. However, this would introduce data races on the non-atomic
floatelements, which is undefined behavior in C++ even if small inaccuracies are acceptable.The current "per-queue locking" approach:
stats_mutex_bottleneckcount_for position allocation (no contention)Testing
Expected Impact
Related