[WIP]feat(io_uring): add new IO Type: UringIO#1535
[WIP]feat(io_uring): add new IO Type: UringIO#1535LHT129 wants to merge 1 commit intoantgroup:mainfrom
Conversation
Summary of ChangesHello @LHT129, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request integrates Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new UringIO type for asynchronous I/O using io_uring, which is a great addition for high-performance I/O operations. The changes include CMake integration, dependency updates, and the core implementation of UringIO with its parameters and tests.
My review has identified several critical memory safety issues in the new uring_io.cpp implementation, including a use-after-free, a double-free, and incorrect resource management in the io_uring context pool, which could lead to crashes or unpredictable behavior. I have also provided a suggestion to improve the readability and maintainability of the MultiReadImpl function. Please address these critical issues to ensure the stability and correctness of the new I/O backend.
| UringIO::DirectReadImpl(uint64_t size, uint64_t offset, bool& need_release) const { | ||
| if (not check_valid_offset(size + offset)) { | ||
| return nullptr; | ||
| } | ||
| need_release = true; | ||
| if (size == 0) { | ||
| return nullptr; | ||
| } | ||
| DirectIOObject obj(size, offset); | ||
| auto ret = pread64(this->rfd_, obj.align_data, obj.size, static_cast<int64_t>(obj.offset)); | ||
| if (ret < 0) { | ||
| throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("pread64 error {}", ret)); | ||
| } | ||
| return obj.data; | ||
| } |
There was a problem hiding this comment.
There is a critical use-after-free bug in DirectReadImpl. The DirectIOObject obj is allocated on the stack. Its destructor, which frees the allocated buffer align_data, is called when the function returns. However, the function returns obj.data, which is a pointer into this buffer. The caller receives a dangling pointer, leading to undefined behavior when it's accessed.
To fix this, you should manually manage the buffer's lifecycle instead of relying on the RAII behavior of a stack-allocated DirectIOObject.
const uint8_t*
UringIO::DirectReadImpl(uint64_t size, uint64_t offset, bool& need_release) const {
if (not check_valid_offset(size + offset)) {
return nullptr;
}
need_release = true;
if (size == 0) {
return nullptr;
}
uint64_t align_bit = Options::Instance().direct_IO_object_align_bit();
uint64_t align_size = 1ULL << align_bit;
uint64_t align_offset = offset & ~((1ULL << align_bit) - 1);
uint64_t read_size = (offset - align_offset) + size;
read_size = (read_size + align_size - 1) & ~(align_size - 1);
void* buffer = nullptr;
if (posix_memalign(&buffer, align_size, read_size) != 0) {
throw std::bad_alloc();
}
auto ret = pread64(this->rfd_, buffer, read_size, static_cast<int64_t>(align_offset));
if (ret < 0) {
free(buffer);
throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("pread64 error {}", ret));
}
return static_cast<uint8_t*>(buffer) + (offset - align_offset);
}| memcpy(obj->dist_data, obj->data, obj->dist_size); | ||
| ReleaseImpl(obj->data); |
There was a problem hiding this comment.
There is a critical double-free bug here. ReleaseImpl(obj->data) frees the aligned buffer. However, the UringDirectIOObject instance obj is part of the objs vector which is on the stack. When MultiReadImpl returns, the destructor for UringDirectIOObject will be called, which in turn calls the ~DirectIOObject() destructor, attempting to free the same buffer again.
To prevent this double free, you should nullify the align_data pointer in the obj after manually releasing the memory.
memcpy(obj->dist_data, obj->data, obj->dist_size);
ReleaseImpl(obj->data);
obj->align_data = nullptr; // Prevent double-free in ~DirectIOObject()| void | ||
| Reset() override{}; |
There was a problem hiding this comment.
The Reset() method is empty, which is incorrect for UringIOContext. If an error occurs in UringIO::MultiReadImpl after requests have been submitted to the io_uring ring, the context is returned to the pool in a 'dirty' state with pending operations. The next user of this context will encounter an inconsistent state, leading to unpredictable behavior.
The Reset() method should properly clean up and re-initialize the io_uring instance to ensure it's in a clean state for reuse.
| void | |
| Reset() override{}; | |
| void | |
| Reset() override { | |
| io_uring_queue_exit(&ring_); | |
| int ret = io_uring_queue_init(RING_SIZE, &ring_, 0); | |
| if (ret < 0) { | |
| throw VsagException(ErrorType::INTERNAL_ERROR, | |
| fmt::format("io_uring_queue_init in Reset() failed: {}", strerror(-ret))); | |
| } | |
| }; |
|
|
||
| AsyncIOParameter::AsyncIOParameter(const vsag::JsonType& json) | ||
| : IOParameter(IO_TYPE_VALUE_BUFFER_IO) { | ||
| : IOParameter(IO_TYPE_VALUE_ASYNC_IO) { |
| UringIO::MultiReadImpl(uint8_t* datas, uint64_t* sizes, uint64_t* offsets, uint64_t count) const { | ||
| if (count == 0) { | ||
| return true; | ||
| } | ||
|
|
||
| auto ctx = io_context_pool_->TakeOne(); | ||
| auto* ring = ctx->ring(); | ||
|
|
||
| auto all_count = static_cast<int64_t>(count); | ||
|
|
||
| while (all_count > 0) { | ||
| count = std::min(static_cast<uint64_t>(UringIOContext::RING_SIZE), | ||
| static_cast<uint64_t>(all_count)); | ||
| std::vector<UringDirectIOObject> objs(count); | ||
|
|
||
| for (uint64_t i = 0; i < count; ++i) { | ||
| objs[i].Set(sizes[i], offsets[i], datas); | ||
| datas += sizes[i]; | ||
|
|
||
| struct io_uring_sqe* sqe = io_uring_get_sqe(ring); | ||
| if (!sqe) { | ||
| io_context_pool_->ReturnOne(ctx); | ||
| throw VsagException(ErrorType::INTERNAL_ERROR, | ||
| "io_uring_get_sqe failed in multi-read"); | ||
| } | ||
| io_uring_prep_read(sqe, | ||
| this->rfd_, | ||
| objs[i].align_data, | ||
| objs[i].size, | ||
| static_cast<int64_t>(objs[i].offset)); | ||
| sqe->user_data = reinterpret_cast<uint64_t>(&objs[i]); | ||
| } | ||
|
|
||
| int submitted = io_uring_submit(ring); | ||
| if (submitted < static_cast<int>(count)) { | ||
| io_context_pool_->ReturnOne(ctx); | ||
| throw VsagException(ErrorType::INTERNAL_ERROR, "failed to submit all reads"); | ||
| } | ||
|
|
||
| uint64_t completed = 0; | ||
| while (completed < count) { | ||
| struct io_uring_cqe* cqe; | ||
| int ret = io_uring_wait_cqe(ring, &cqe); | ||
| if (ret < 0) { | ||
| io_context_pool_->ReturnOne(ctx); | ||
| throw VsagException(ErrorType::INTERNAL_ERROR, "wait cqe failed"); | ||
| } | ||
|
|
||
| if (cqe->res < 0) { | ||
| io_context_pool_->ReturnOne(ctx); | ||
| throw VsagException(ErrorType::INTERNAL_ERROR, | ||
| fmt::format("multi-read failed: {}", strerror(-cqe->res))); | ||
| } | ||
|
|
||
| auto* obj = reinterpret_cast<UringDirectIOObject*>(cqe->user_data); | ||
|
|
||
| memcpy(obj->dist_data, obj->data, obj->dist_size); | ||
| ReleaseImpl(obj->data); | ||
| completed++; | ||
|
|
||
| io_uring_cqe_seen(ring, cqe); | ||
| } | ||
| sizes += count; | ||
| offsets += count; | ||
| all_count -= static_cast<int64_t>(count); | ||
| } | ||
|
|
||
| io_context_pool_->ReturnOne(ctx); | ||
| return true; | ||
| } |
There was a problem hiding this comment.
The implementation of MultiReadImpl can be made more readable and less error-prone. Currently, it shadows the count parameter and modifies the input pointers sizes and offsets within the loop. This can be confusing and is generally not good practice for const methods.
A clearer approach is to use an index to keep track of the processed requests, which avoids modifying input arguments and shadowing variables.
bool
UringIO::MultiReadImpl(uint8_t* datas, uint64_t* sizes, uint64_t* offsets, uint64_t count) const {
if (count == 0) {
return true;
}
auto ctx = io_context_pool_->TakeOne();
auto* ring = ctx->ring();
uint64_t submitted_count = 0;
while (submitted_count < count) {
uint64_t batch_size = std::min(static_cast<uint64_t>(UringIOContext::RING_SIZE),
count - submitted_count);
std::vector<UringDirectIOObject> objs(batch_size);
for (uint64_t i = 0; i < batch_size; ++i) {
uint64_t current_idx = submitted_count + i;
objs[i].Set(sizes[current_idx], offsets[current_idx], datas);
datas += sizes[current_idx];
struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
if (!sqe) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR,
"io_uring_get_sqe failed in multi-read");
}
io_uring_prep_read(sqe,
this->rfd_,
objs[i].align_data,
objs[i].size,
static_cast<int64_t>(objs[i].offset));
sqe->user_data = reinterpret_cast<uint64_t>(&objs[i]);
}
int submitted = io_uring_submit(ring);
if (submitted < static_cast<int>(batch_size)) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR, "failed to submit all reads");
}
uint64_t completed_in_batch = 0;
while (completed_in_batch < batch_size) {
struct io_uring_cqe* cqe;
int ret = io_uring_wait_cqe(ring, &cqe);
if (ret < 0) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR, "wait cqe failed");
}
if (cqe->res < 0) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR,
fmt::format("multi-read failed: {}", strerror(-cqe->res)));
}
auto* obj = reinterpret_cast<UringDirectIOObject*>(cqe->user_data);
memcpy(obj->dist_data, obj->data, obj->dist_size);
ReleaseImpl(obj->data);
obj->align_data = nullptr; // Prevent double-free
completed_in_batch++;
io_uring_cqe_seen(ring, cqe);
}
submitted_count += batch_size;
}
io_context_pool_->ReturnOne(ctx);
return true;
}
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (37.50%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. @@ Coverage Diff @@
## main #1535 +/- ##
==========================================
- Coverage 91.24% 91.18% -0.06%
==========================================
Files 329 329
Lines 19396 19403 +7
==========================================
- Hits 17697 17693 -4
- Misses 1699 1710 +11
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
28f53e3 to
c6f92eb
Compare
Signed-off-by: LHT129 <tianlan.lht@antgroup.com>
closed: #1533