feat kafka: add bulk send producer method#1134
feat kafka: add bulk send producer method#1134disaykin wants to merge 4 commits intouserver-framework:developfrom
Conversation
31b4214 to
79d1485
Compare
|
@apolukhin Что нужно сделать, чтобы запустить тестовый пайплайн? |
|
Can somebody help me? |
apolukhin
left a comment
There was a problem hiding this comment.
Nice! A few comments follow
| public: | ||
| using ExceptionMap = std::map<std::size_t, std::exception_ptr>; | ||
|
|
||
| BulkSendException(ExceptionMap nested_exceptions); |
| void Send( | ||
| utils::zstring_view topic_name, | ||
| std::string_view key, | ||
| utils::span<const std::string> messages, |
There was a problem hiding this comment.
utils::span<const std::string_view> messages ?
There was a problem hiding this comment.
Сначала так и сделал, но потом не смог победить. Нужно создать массив вьюх на временные строки, чтобы потом он эффективно преобразовался в span. Временные строки разрушатся до того, как мы сможем передать их в метод Send. Если подскажете, как сделать - с удовольствием переделаю.
There was a problem hiding this comment.
попробовал сделать, чтобы метод Send принимал на вход любой контейнер, имеющий метод size() и оператор индексирования, и который хранит элементы, преобразуемые к std::string_view, так, чтобы это было валидно на C++-17 и чтобы не вытаскивать кишки реализации наружу. Получилось не очень изысканно, прошу совета от гуру, как это лучше сделать
| void SendImpl( | ||
| utils::zstring_view topic_name, | ||
| std::string_view key, | ||
| utils::span<const std::string> messages, |
There was a problem hiding this comment.
utils::span<const std::string_view> messages ?
There was a problem hiding this comment.
аналогично, не знаю, как эффективно преобразовать std::vector<std::string> в utils::span<std::string_view>
| } else if (enqueue_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) { | ||
| LOG_LIMITED_WARNING("Kafka local queue is full"); | ||
| /// waiting for a while for the queue to clear up | ||
| engine::InterruptibleSleepFor(std::chrono::milliseconds{10}); |
There was a problem hiding this comment.
This is too much... How about just an engine::Yield()?
|
|
||
| UEXPECT_THROW(make_send_request(), kafka::QueueFullException); | ||
|
|
||
| /// [Producer retryable error] |
There was a problem hiding this comment.
This example is used in docs as a sample userver/kafka/include/userver/kafka/producer.hpp. Please do not remove it
There was a problem hiding this comment.
Беда, не знал. Как в этом случае поступить? Я переделал сценарий так, чтобы в случае переполнения кода Producer::Send повторял попытки положить сообщение в очередь вплоть до исчерпания delivery_timeout. Либо я добавляю опцию, чтобы Send одного сообщения сразу возвращал ошибку, что очередь полна, либо нужно править документацию.
There was a problem hiding this comment.
Вернул старое поведение для одиночного Send, чтобы тест и комментарий остались валидными. Других восстановимых ошибок (кроме DeliveryTimeoutError) юсервером не поддерживается, поэтому либо внедряем пример из теста в документацию для демонстрации, как обрабатывать IsRetriable, либо мокаем ProducerImpl, чтобы он первое время возвращал DeliveryTimeoutError, либо возвращаем старое поведение. Пока предпочел третий вариант.
There was a problem hiding this comment.
Всё-таки решил переделать на новый лад. Пример использования IsRetriable() вынес из теста в документацию
Сначала отревьюил, потом только заметил что можно писать на русском :) Запустил тестовый пайплайн |
|
при установке кафки на интересно, почему это не привело к фэйлу пайплайна? Это приводит к ошибкам на этапе функциональных тестов кафки: |
|
На |
79d1485 to
bc9c629
Compare
|
@apolukhin нужна помощь с воспроизведением локальной сборки с тестами. Локально на ноуте с Ubuntu 24.04 интеграционные тесты на Кафку не проходят одинаково, что с моими изменениями, что без них |
|
отломалась компиляция на ubuntu-minimal: |
Фикс уже в пути... Там не хватает inlude fmt/ranges.h или чего-то подобного Можете просто взять версию userver до комита 332dcb5 |
b072ef9 to
f6113f4
Compare
Добавлен метод `Producer::Send` для синхронной отправки пачки сообщений в указанный топик Kafka. В случае ошибки отправки выкидывается исключение `BulkSendException`, которое содержит подробности об ошибках отправки сообщений из пачки. `Producer::Send` пытается обработать ошибки переполнения локальной очереди librdkafka в пределах настроенного `delivery.timeout.ms`. В отличие от старых методов `Send` и `SendAsync` метод порождает только одну корутину на всю пачку сообщений.
f6113f4 to
20db89b
Compare
Добавлен метод
Producer::Sendдля синхронной отправки пачки сообщений в указанный топик Kafka. В случае ошибки отправки выкидывается исключениеBulkSendException, которое содержит подробности об ошибках отправки сообщений из пачки.Producer::Sendпытается обработать ошибки переполнения локальной очереди librdkafka в пределах настроенногоdelivery.timeout.ms.В отличие от старых методов
SendиSendAsyncметод порождает только одну корутину на всю пачку сообщений.Note: by creating a PR or an issue you automatically agree to the CLA. See CONTRIBUTING.md. Feel free to remove this note, the agreement holds.