Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions kafka/include/userver/kafka/exceptions.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <cstdint>
#include <exception>
#include <map>
#include <stdexcept>
#include <string_view>

Expand All @@ -27,6 +29,29 @@ class SendException : public std::runtime_error {
const bool is_retryable_{false};
};

/// @brief Base exception thrown by Producer::Send in bulk mode
/// in case of one or more send errors.
class BulkSendException : public std::runtime_error {
static constexpr const char* kWhat{"Some messages was not delivered."};

public:
using ExceptionMap = std::map<std::size_t, std::exception_ptr>;

explicit BulkSendException(ExceptionMap nested_exceptions);

/// @return nested errors.
/// Nested exceptions are subclasses of SendException.
const ExceptionMap& GetExceptions() const noexcept;

private:
/// @brief A mapping from the message's index in the bulk send operation
/// to the exception that occurred during its delivering.
/// @details Key: 0-based index of the element in the input batch.
/// Value: Pointer to the exception.
/// @note Contains only indices that resulted in an error.
const ExceptionMap nested_exceptions_;
};

class DeliveryTimeoutException final : public SendException {
static constexpr const char* kWhat{
"Message is not delivered after `delivery_timeout` milliseconds. Hint: "
Expand Down
41 changes: 41 additions & 0 deletions kafka/include/userver/kafka/impl/messages.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <cstdint>
#include <string_view>

USERVER_NAMESPACE_BEGIN

namespace kafka::impl {

/// @brief Message collection adapter.
class Messages {
public:
Messages() = default;

virtual std::size_t Size() const noexcept = 0;
virtual std::string_view operator[](std::size_t index) const noexcept = 0;

virtual ~Messages() = default;
};

template<typename Container>
class MessagesAdapter final : public Messages {
public:
static_assert(std::is_convertible_v<decltype(std::declval<const Container&>()[0]), std::string_view>,
"Container must support operator[] and return a type convertible to std::string_view");

static_assert(std::is_integral_v<decltype(std::declval<const Container&>().size())>,
"Container must support method size() and return an integral type");

explicit MessagesAdapter(const Container& data) noexcept : data_{data} {}

std::size_t Size() const noexcept override { return data_.size(); }
std::string_view operator[](std::size_t index) const noexcept override { return data_[index]; }

private:
const Container& data_;
};

} // namespace kafka::impl

USERVER_NAMESPACE_END
73 changes: 72 additions & 1 deletion kafka/include/userver/kafka/producer.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once

#include <cstdint>
#include <type_traits>
#include <utility>

#include <userver/engine/task/task_processor_fwd.hpp>
#include <userver/engine/task/task_with_result.hpp>
#include <userver/kafka/exceptions.hpp>
#include <userver/kafka/headers.hpp>
#include <userver/kafka/impl/messages.hpp>
#include <userver/utils/fast_pimpl.hpp>
#include <userver/utils/statistics/writer.hpp>

Expand Down Expand Up @@ -101,7 +104,22 @@ class Producer final {
///
/// @note Use SendException::IsRetryable method to understand whether there is
/// a sense to retry the message sending.
/// @snippet kafka/tests/producer_kafkatest.cpp Producer retryable error
///
/// @code{.cpp}
/// bool delivered{false};
/// while (!delivered && !deadline.IsReached()) {
/// try {
/// producer.Send(topic, key, message);
/// delivered = true;
/// } catch (const kafka::SendException& e) {
/// if (e.IsRetryable()) {
/// engine::InterruptibleSleepFor(std::chrono::milliseconds{10});
/// continue;
/// }
/// break;
/// }
/// }
/// @endcode
void Send(
utils::zstring_view topic_name,
std::string_view key,
Expand All @@ -110,6 +128,43 @@ class Producer final {
HeaderViews headers = {}
) const;

/// @brief Sends given messages to topic `topic_name` by given `key`
/// and `partition` (if passed) with payload contains the `messages`
/// data. Asynchronously waits until the messages are delivered or the delivery
/// error occurred.
///
/// No payload data is copied. Method holds the data until messages are
/// delivered.
///
/// Thread-safe and can be called from any number of threads
/// concurrently.
///
/// If `partition` not passed, partition is chosen by internal
/// Kafka partitioner.
///
/// @warning if `enable_idempotence` option is enabled, do not use both
/// explicit partitions and Kafka-chosen ones.
///
/// @throws BulkSendException if some messages was not delivered
/// and acked by Kafka Broker in configured timeout.
///
/// @note Use BulkSendException::GetExceptions method to get a list
/// of occured nested exceptions.
template<typename Messages>
std::enable_if_t<
std::is_convertible_v<decltype(std::declval<const Messages&>()[0]), std::string_view>
&&
std::is_integral_v<decltype(std::declval<const Messages&>().size())>
> Send(
utils::zstring_view topic_name,
std::string_view key,
const Messages& messages,
std::optional<std::uint32_t> partition = kUnassignedPartition,
HeaderViews headers = {}
) const {
SendWrapper(topic_name, key, impl::MessagesAdapter{messages}, partition, std::move(headers));
}

/// @brief Same as Producer::Send, but returns the task which can be
/// used to wait the message delivery manually.
///
Expand Down Expand Up @@ -141,6 +196,22 @@ class Producer final {
impl::HeadersHolder&& headers_holder
) const;

void SendImpl(
utils::zstring_view topic_name,
std::string_view key,
const impl::Messages& messages,
std::optional<std::uint32_t> partition,
std::vector<impl::HeadersHolder>&& headers_holders
) const;

void SendWrapper(
utils::zstring_view topic_name,
std::string_view key,
const impl::Messages& messages,
std::optional<std::uint32_t> partition,
HeaderViews headers
) const;

const std::string name_;
engine::TaskProcessor& producer_task_processor_;

Expand Down
7 changes: 7 additions & 0 deletions kafka/src/kafka/exceptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ SendException::SendException(const char* what, bool is_retryable)
is_retryable_(is_retryable)
{}

BulkSendException::BulkSendException(BulkSendException::ExceptionMap nested_exceptions)
: std::runtime_error(kWhat),
nested_exceptions_(std::move(nested_exceptions))
{}

const BulkSendException::ExceptionMap& BulkSendException::GetExceptions() const noexcept { return nested_exceptions_; }

DeliveryTimeoutException::DeliveryTimeoutException()
: SendException(kWhat, /*is_retryable=*/true)
{}
Expand Down
60 changes: 52 additions & 8 deletions kafka/src/kafka/impl/producer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,56 @@ DeliveryResult ProducerImpl::Send(
HeadersHolder headers_holder
) const {
LOG(operation_log_level_) << fmt::format("Message to topic '{}' is requested to send", topic_name);
auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
auto delivery_result_future =
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder));
ScheduleMessageDelivery(topic_name, key, message, partition, std::move(headers_holder), deadline);

WaitUntilDeliveryReported(delivery_result_future);

return delivery_result_future.get();
}

std::vector<DeliveryResult> ProducerImpl::Send(
utils::zstring_view topic_name,
std::string_view key,
const Messages& messages,
std::optional<std::uint32_t> partition,
std::vector<HeadersHolder> headers_holders
) const {
UASSERT(messages.Size() == headers_holders.size());

LOG(operation_log_level_) <<
fmt::format("Messages {} to topic '{}' are requested to send", messages.Size(), topic_name);

std::vector<engine::Future<DeliveryResult>> delivery_result_futures;
delivery_result_futures.reserve(messages.Size());

auto deadline = engine::Deadline::FromDuration(delivery_timeout_);
for (std::size_t i = 0; i < messages.Size(); ++i) {
delivery_result_futures.emplace_back(
ScheduleMessageDelivery(topic_name, key, messages[i], partition, std::move(headers_holders[i]), deadline)
);
}

std::vector<DeliveryResult> delivery_results;
delivery_results.reserve(messages.Size());

for (auto& delivery_result_future : delivery_result_futures) {
WaitUntilDeliveryReported(delivery_result_future);

delivery_results.emplace_back(delivery_result_future.get());
}

return delivery_results;
}

engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
utils::zstring_view topic_name,
std::string_view key,
std::string_view message,
std::optional<std::uint32_t> partition,
HeadersHolder headers_holder
HeadersHolder headers_holder,
engine::Deadline deadline
) const {
auto waiter = std::make_unique<DeliveryWaiter>();
auto wait_handle = waiter->GetFuture();
Expand Down Expand Up @@ -181,6 +217,7 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
///
/// Headers holder **must** be released if `rd_kafka_producev` succeeded.

while (!deadline.IsReached() && !engine::current_task::ShouldCancel()) {
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wgnu-statement-expression"
Expand All @@ -203,12 +240,19 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
#pragma clang diagnostic pop
#endif

if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
[[maybe_unused]] const auto waiter_ptr = waiter.release();
} else {
LOG_WARNING("Failed to enqueue message to Kafka local queue: {}", rd_kafka_err2str(enqueue_error));
waiter->SetDeliveryResult(DeliveryResult{enqueue_error});
if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
[[maybe_unused]] const auto headers_holder_ptr = headers_holder.release();
[[maybe_unused]] const auto waiter_ptr = waiter.release();
} else if (enqueue_error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
LOG_LIMITED_WARNING("Kafka local queue is full");
/// waiting for a while for the queue to clear up
engine::Yield();
continue;
} else {
LOG_WARNING("Failed to enqueue message to Kafka local queue: {}", rd_kafka_err2str(enqueue_error));
waiter->SetDeliveryResult(DeliveryResult{enqueue_error});
}
break;
}

return wait_handle;
Expand Down
15 changes: 14 additions & 1 deletion kafka/src/kafka/impl/producer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <librdkafka/rdkafka.h>

#include <userver/engine/deadline.hpp>
#include <userver/kafka/impl/messages.hpp>
#include <userver/kafka/impl/stats.hpp>
#include <userver/logging/level.hpp>
#include <userver/utils/periodic_task.hpp>
Expand Down Expand Up @@ -40,6 +42,16 @@ class ProducerImpl final {
HeadersHolder headers
) const;

/// @brief Send messages and waits for its delivery.
/// While waiting handles other messages delivery reports, errors and logs.
[[nodiscard]] std::vector<DeliveryResult> Send(
utils::zstring_view topic_name,
std::string_view key,
const Messages& messages,
std::optional<std::uint32_t> partition,
std::vector<HeadersHolder> headers
) const;

/// @brief Waits until scheduled messages are delivered for
/// at most 2 x `delivery_timeout`.
///
Expand All @@ -58,7 +70,8 @@ class ProducerImpl final {
std::string_view key,
std::string_view message,
std::optional<std::uint32_t> partition,
HeadersHolder headers
HeadersHolder headers,
engine::Deadline deadline
) const;

/// @brief Poll a delivery or error event from producer's queue.
Expand Down
Loading
Loading