Skip to content

[WIP]feat(io_uring): add new IO Type: UringIO#1535

Open
LHT129 wants to merge 1 commit intoantgroup:mainfrom
LHT129:io_uring
Open

[WIP]feat(io_uring): add new IO Type: UringIO#1535
LHT129 wants to merge 1 commit intoantgroup:mainfrom
LHT129:io_uring

Conversation

@LHT129
Copy link
Copy Markdown
Collaborator

@LHT129 LHT129 commented Jan 20, 2026

closed: #1533

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @LHT129, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request integrates liburing to introduce a new UringIO type, enhancing the system's asynchronous I/O capabilities. The changes involve updating the build configuration, managing new dependencies across different environments, and registering the new I/O type within the application's I/O management. It also includes necessary documentation updates and comprehensive unit tests to ensure the stability and correctness of the new functionality.

Highlights

  • New IO Type: UringIO: Introduced a new I/O type, UringIO, which leverages the liburing library for asynchronous I/O operations, aiming for improved performance.
  • Build System Integration: Added CMake options (ENABLE_LIBURING) and dedicated modules (cmake/FindLiburing.cmake) to detect and integrate liburing into the build process, similar to how libaio is handled.
  • Dependency Updates: Updated Dockerfiles and installation scripts (install_deps_centos.sh, install_deps_ubuntu.sh) to include liburing-devel or liburing-dev as a required dependency for various environments.
  • Configuration and Documentation: The hgraph.md documentation has been updated to include uring_io as a valid option for base_io_type and precise_io_type parameters.
  • IO Type Registration and Instantiation: The new UringIO type is registered within the application's I/O framework, allowing it to be dynamically instantiated in bucket_interface.cpp, flatten_interface.cpp, and io_parameter.cpp.
  • Comprehensive Testing: New unit tests for UringIO and UringIOParameter have been added, covering basic read/write, parameter handling, and serialization. Existing test suites (io_array_test.cpp, test_hgraph.cpp) were also updated to include UringIO.
  • AsyncIO Parameter Fix: Corrected a bug in AsyncIOParameter where its constructor was incorrectly initializing with IO_TYPE_VALUE_BUFFER_IO instead of IO_TYPE_VALUE_ASYNC_IO.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new UringIO type for asynchronous I/O using io_uring, which is a great addition for high-performance I/O operations. The changes include CMake integration, dependency updates, and the core implementation of UringIO with its parameters and tests.

My review has identified several critical memory safety issues in the new uring_io.cpp implementation, including a use-after-free, a double-free, and incorrect resource management in the io_uring context pool, which could lead to crashes or unpredictable behavior. I have also provided a suggestion to improve the readability and maintainability of the MultiReadImpl function. Please address these critical issues to ensure the stability and correctness of the new I/O backend.

Comment on lines +95 to +109
UringIO::DirectReadImpl(uint64_t size, uint64_t offset, bool& need_release) const {
if (not check_valid_offset(size + offset)) {
return nullptr;
}
need_release = true;
if (size == 0) {
return nullptr;
}
DirectIOObject obj(size, offset);
auto ret = pread64(this->rfd_, obj.align_data, obj.size, static_cast<int64_t>(obj.offset));
if (ret < 0) {
throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("pread64 error {}", ret));
}
return obj.data;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a critical use-after-free bug in DirectReadImpl. The DirectIOObject obj is allocated on the stack. Its destructor, which frees the allocated buffer align_data, is called when the function returns. However, the function returns obj.data, which is a pointer into this buffer. The caller receives a dangling pointer, leading to undefined behavior when it's accessed.

To fix this, you should manually manage the buffer's lifecycle instead of relying on the RAII behavior of a stack-allocated DirectIOObject.

const uint8_t*
UringIO::DirectReadImpl(uint64_t size, uint64_t offset, bool& need_release) const {
    if (not check_valid_offset(size + offset)) {
        return nullptr;
    }
    need_release = true;
    if (size == 0) {
        return nullptr;
    }

    uint64_t align_bit = Options::Instance().direct_IO_object_align_bit();
    uint64_t align_size = 1ULL << align_bit;
    uint64_t align_offset = offset & ~((1ULL << align_bit) - 1);
    uint64_t read_size = (offset - align_offset) + size;
    read_size = (read_size + align_size - 1) & ~(align_size - 1);

    void* buffer = nullptr;
    if (posix_memalign(&buffer, align_size, read_size) != 0) {
        throw std::bad_alloc();
    }

    auto ret = pread64(this->rfd_, buffer, read_size, static_cast<int64_t>(align_offset));
    if (ret < 0) {
        free(buffer);
        throw VsagException(ErrorType::INTERNAL_ERROR, fmt::format("pread64 error {}", ret));
    }

    return static_cast<uint8_t*>(buffer) + (offset - align_offset);
}

Comment on lines +196 to +197
memcpy(obj->dist_data, obj->data, obj->dist_size);
ReleaseImpl(obj->data);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a critical double-free bug here. ReleaseImpl(obj->data) frees the aligned buffer. However, the UringDirectIOObject instance obj is part of the objs vector which is on the stack. When MultiReadImpl returns, the destructor for UringDirectIOObject will be called, which in turn calls the ~DirectIOObject() destructor, attempting to free the same buffer again.

To prevent this double free, you should nullify the align_data pointer in the obj after manually releasing the memory.

            memcpy(obj->dist_data, obj->data, obj->dist_size);
            ReleaseImpl(obj->data);
            obj->align_data = nullptr; // Prevent double-free in ~DirectIOObject()

Comment on lines +44 to +45
void
Reset() override{};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The Reset() method is empty, which is incorrect for UringIOContext. If an error occurs in UringIO::MultiReadImpl after requests have been submitted to the io_uring ring, the context is returned to the pool in a 'dirty' state with pending operations. The next user of this context will encounter an inconsistent state, leading to unpredictable behavior.

The Reset() method should properly clean up and re-initialize the io_uring instance to ensure it's in a clean state for reuse.

Suggested change
void
Reset() override{};
void
Reset() override {
io_uring_queue_exit(&ring_);
int ret = io_uring_queue_init(RING_SIZE, &ring_, 0);
if (ret < 0) {
throw VsagException(ErrorType::INTERNAL_ERROR,
fmt::format("io_uring_queue_init in Reset() failed: {}", strerror(-ret)));
}
};


AsyncIOParameter::AsyncIOParameter(const vsag::JsonType& json)
: IOParameter(IO_TYPE_VALUE_BUFFER_IO) {
: IOParameter(IO_TYPE_VALUE_ASYNC_IO) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This is a good bug fix. The constructor was previously passing the wrong IO type string (IO_TYPE_VALUE_BUFFER_IO) to the base class, which would cause issues with parameter handling and type identification.

Comment on lines +140 to +209
UringIO::MultiReadImpl(uint8_t* datas, uint64_t* sizes, uint64_t* offsets, uint64_t count) const {
if (count == 0) {
return true;
}

auto ctx = io_context_pool_->TakeOne();
auto* ring = ctx->ring();

auto all_count = static_cast<int64_t>(count);

while (all_count > 0) {
count = std::min(static_cast<uint64_t>(UringIOContext::RING_SIZE),
static_cast<uint64_t>(all_count));
std::vector<UringDirectIOObject> objs(count);

for (uint64_t i = 0; i < count; ++i) {
objs[i].Set(sizes[i], offsets[i], datas);
datas += sizes[i];

struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
if (!sqe) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR,
"io_uring_get_sqe failed in multi-read");
}
io_uring_prep_read(sqe,
this->rfd_,
objs[i].align_data,
objs[i].size,
static_cast<int64_t>(objs[i].offset));
sqe->user_data = reinterpret_cast<uint64_t>(&objs[i]);
}

int submitted = io_uring_submit(ring);
if (submitted < static_cast<int>(count)) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR, "failed to submit all reads");
}

uint64_t completed = 0;
while (completed < count) {
struct io_uring_cqe* cqe;
int ret = io_uring_wait_cqe(ring, &cqe);
if (ret < 0) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR, "wait cqe failed");
}

if (cqe->res < 0) {
io_context_pool_->ReturnOne(ctx);
throw VsagException(ErrorType::INTERNAL_ERROR,
fmt::format("multi-read failed: {}", strerror(-cqe->res)));
}

auto* obj = reinterpret_cast<UringDirectIOObject*>(cqe->user_data);

memcpy(obj->dist_data, obj->data, obj->dist_size);
ReleaseImpl(obj->data);
completed++;

io_uring_cqe_seen(ring, cqe);
}
sizes += count;
offsets += count;
all_count -= static_cast<int64_t>(count);
}

io_context_pool_->ReturnOne(ctx);
return true;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of MultiReadImpl can be made more readable and less error-prone. Currently, it shadows the count parameter and modifies the input pointers sizes and offsets within the loop. This can be confusing and is generally not good practice for const methods.

A clearer approach is to use an index to keep track of the processed requests, which avoids modifying input arguments and shadowing variables.

bool
UringIO::MultiReadImpl(uint8_t* datas, uint64_t* sizes, uint64_t* offsets, uint64_t count) const {
    if (count == 0) {
        return true;
    }

    auto ctx = io_context_pool_->TakeOne();
    auto* ring = ctx->ring();

    uint64_t submitted_count = 0;
    while (submitted_count < count) {
        uint64_t batch_size = std::min(static_cast<uint64_t>(UringIOContext::RING_SIZE),
                                     count - submitted_count);
        std::vector<UringDirectIOObject> objs(batch_size);

        for (uint64_t i = 0; i < batch_size; ++i) {
            uint64_t current_idx = submitted_count + i;
            objs[i].Set(sizes[current_idx], offsets[current_idx], datas);
            datas += sizes[current_idx];

            struct io_uring_sqe* sqe = io_uring_get_sqe(ring);
            if (!sqe) {
                io_context_pool_->ReturnOne(ctx);
                throw VsagException(ErrorType::INTERNAL_ERROR,
                                    "io_uring_get_sqe failed in multi-read");
            }
            io_uring_prep_read(sqe,
                               this->rfd_,
                               objs[i].align_data,
                               objs[i].size,
                               static_cast<int64_t>(objs[i].offset));
            sqe->user_data = reinterpret_cast<uint64_t>(&objs[i]);
        }

        int submitted = io_uring_submit(ring);
        if (submitted < static_cast<int>(batch_size)) {
            io_context_pool_->ReturnOne(ctx);
            throw VsagException(ErrorType::INTERNAL_ERROR, "failed to submit all reads");
        }

        uint64_t completed_in_batch = 0;
        while (completed_in_batch < batch_size) {
            struct io_uring_cqe* cqe;
            int ret = io_uring_wait_cqe(ring, &cqe);
            if (ret < 0) {
                io_context_pool_->ReturnOne(ctx);
                throw VsagException(ErrorType::INTERNAL_ERROR, "wait cqe failed");
            }

            if (cqe->res < 0) {
                io_context_pool_->ReturnOne(ctx);
                throw VsagException(ErrorType::INTERNAL_ERROR,
                                    fmt::format("multi-read failed: {}", strerror(-cqe->res)));
            }

            auto* obj = reinterpret_cast<UringDirectIOObject*>(cqe->user_data);

            memcpy(obj->dist_data, obj->data, obj->dist_size);
            ReleaseImpl(obj->data);
            obj->align_data = nullptr; // Prevent double-free
            completed_in_batch++;

            io_uring_cqe_seen(ring, cqe);
        }
        submitted_count += batch_size;
    }

    io_context_pool_->ReturnOne(ctx);
    return true;
}

@codecov
Copy link
Copy Markdown

codecov bot commented Jan 20, 2026

Codecov Report

❌ Patch coverage is 37.50000% with 5 lines in your changes missing coverage. Please review.

❌ Your patch check has failed because the patch coverage (37.50%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage.

@@            Coverage Diff             @@
##             main    #1535      +/-   ##
==========================================
- Coverage   91.24%   91.18%   -0.06%     
==========================================
  Files         329      329              
  Lines       19396    19403       +7     
==========================================
- Hits        17697    17693       -4     
- Misses       1699     1710      +11     
Flag Coverage Δ
cpp 91.18% <37.50%> (-0.06%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Components Coverage Δ
common 85.81% <ø> (ø)
datacell 93.22% <75.00%> (-0.56%) ⬇️
index 90.66% <ø> (+0.04%) ⬆️
simd 100.00% <ø> (ø)

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3dd2923...c5f107f. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@LHT129 LHT129 force-pushed the io_uring branch 2 times, most recently from 28f53e3 to c6f92eb Compare January 28, 2026 07:17
Signed-off-by: LHT129 <tianlan.lht@antgroup.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support iouring for vsag

1 participant