From 237a243795aa04860dfe0cf8d48d018c71585979 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 19 Jan 2026 19:15:51 +0800 Subject: [PATCH 1/2] use the request_async_id in cpp --- src/ffi_client.cpp | 552 +++++++++++++++++++++++++++++++-------------- src/ffi_client.h | 19 +- 2 files changed, 404 insertions(+), 167 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 811cebb..4ad2cf6 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -15,6 +15,7 @@ */ #include +#include #include "build.h" #include "e2ee.pb.h" @@ -37,6 +38,14 @@ std::string bytesToString(const std::vector &b) { return std::string(reinterpret_cast(b.data()), b.size()); } +// Helper to log errors and conditionally throw in debug mode +inline void logAndThrowInDebug(const std::string &error_msg) { + std::cerr << "LiveKit SDK Error: " << error_msg << std::endl; +#ifndef NDEBUG + throw std::runtime_error(error_msg); +#endif +} + } // namespace FfiClient::~FfiClient() { @@ -150,11 +159,36 @@ void LivekitFfiCallback(const uint8_t *buf, size_t len) { FfiClient::instance().PushEvent(event); } +FfiClient::AsyncId FfiClient::generateAsyncId() { + return nextAsyncId_.fetch_add(1, std::memory_order_relaxed); +} + +bool FfiClient::cancelPendingByAsyncId(AsyncId async_id) { + std::unique_ptr to_cancel; + { + std::lock_guard guard(lock_); + for (auto it = pending_.begin(); it != pending_.end(); ++it) { + if ((*it)->async_id == async_id) { + to_cancel = std::move(*it); + pending_.erase(it); + break; + } + } + } + + if (to_cancel) { + to_cancel->cancel(); + return true; + } + return false; +} + template std::future FfiClient::registerAsync( - std::function match, + AsyncId async_id, std::function match, std::function &)> handler) { auto pending = std::make_unique>(); + pending->async_id = async_id; auto fut = pending->promise.get_future(); pending->match = std::move(match); pending->handler = std::move(handler); @@ -170,10 +204,35 @@ std::future FfiClient::connectAsync(const std::string &url, const std::string &token, const RoomOptions &options) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, + // match lambda: is this the connect event with our async_id? + [async_id](const proto::FfiEvent &event) { + return event.has_connect() && event.connect().async_id() == async_id; + }, + // handler lambda: fill the promise with RoomInfo or an exception + [](const proto::FfiEvent &event, + std::promise &pr) { + const auto &connectCb = event.connect(); + if (!connectCb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(connectCb.error()))); + return; + } + + pr.set_value(connectCb); + }); + + // Build and send the request proto::FfiRequest req; auto *connect = req.mutable_connect(); connect->set_url(url); connect->set_token(token); + connect->set_request_async_id(async_id); auto *opts = connect->mutable_options(); opts->set_auto_subscribe(options.auto_subscribe); opts->set_dynacast(options.dynacast); @@ -245,50 +304,31 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, } } } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_connect()) { - throw std::runtime_error("FfiResponse missing connect"); - } - const AsyncId async_id = resp.connect().async_id(); - - // Now we register an async op that completes with RoomInfo - return registerAsync( - // match lambda: is this the connect event with our async_id? - [async_id](const proto::FfiEvent &event) { - return event.has_connect() && event.connect().async_id() == async_id; - }, - // handler lambda: fill the promise with RoomInfo or an exception - [](const proto::FfiEvent &event, - std::promise &pr) { - const auto &connectCb = event.connect(); - if (!connectCb.error().empty()) { - pr.set_exception( - std::make_exception_ptr(std::runtime_error(connectCb.error()))); - return; - } + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_connect()) { + logAndThrowInDebug("FfiResponse missing connect"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } - pr.set_value(connectCb); - }); + return fut; } // Track APIs Implementation std::future> FfiClient::getTrackStatsAsync(uintptr_t track_handle) { - proto::FfiRequest req; - auto *get_stats_req = req.mutable_get_stats(); - get_stats_req->set_track_handle(track_handle); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_get_stats()) { - throw std::runtime_error("FfiResponse missing get_stats"); - } - - const AsyncId async_id = resp.get_stats().async_id(); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - // Register pending op: - // - match: event.has_get_stats() && ids equal - // - handler: convert proto stats to C++ wrapper + fulfill promise - return registerAsync>( + // Register the async handler BEFORE sending the request + auto fut = registerAsync>( + async_id, // match [async_id](const proto::FfiEvent &event) { return event.has_get_stats() && @@ -312,6 +352,26 @@ FfiClient::getTrackStatsAsync(uintptr_t track_handle) { } pr.set_value(std::move(stats_vec)); }); + + // Build and send the request + proto::FfiRequest req; + auto *get_stats_req = req.mutable_get_stats(); + get_stats_req->set_track_handle(track_handle); + get_stats_req->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_get_stats()) { + logAndThrowInDebug("FfiResponse missing get_stats"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } // Participant APIs Implementation @@ -319,19 +379,12 @@ std::future FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, std::uint64_t track_handle, const TrackPublishOptions &options) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_track(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_track_handle(track_handle); - auto optionProto = toProto(options); - msg->mutable_options()->CopyFrom(optionProto); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_track()) { - throw std::runtime_error("FfiResponse missing publish_track"); - } - const AsyncId async_id = resp.publish_track().async_id(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // Match: is this our PublishTrackCallback? [async_id](const proto::FfiEvent &event) { return event.has_publish_track() && @@ -358,23 +411,41 @@ FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, proto::OwnedTrackPublication pub = cb.publication(); pr.set_value(std::move(pub)); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_publish_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_track_handle(track_handle); + msg->set_request_async_id(async_id); + auto optionProto = toProto(options); + msg->mutable_options()->CopyFrom(optionProto); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_track()) { + logAndThrowInDebug("FfiResponse missing publish_track"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, const std::string &track_sid, bool stop_on_unpublish) { - proto::FfiRequest req; - auto *msg = req.mutable_unpublish_track(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_track_sid(track_sid); - msg->set_stop_on_unpublish(stop_on_unpublish); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_unpublish_track()) { - throw std::runtime_error("FfiResponse missing unpublish_track"); - } - const AsyncId async_id = resp.unpublish_track().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_unpublish_track() && event.unpublish_track().async_id() == async_id; @@ -388,6 +459,28 @@ FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_unpublish_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_track_sid(track_sid); + msg->set_stop_on_unpublish(stop_on_unpublish); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_unpublish_track()) { + logAndThrowInDebug("FfiResponse missing unpublish_track"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::publishDataAsync( @@ -395,23 +488,12 @@ std::future FfiClient::publishDataAsync( std::uint64_t data_len, bool reliable, const std::vector &destination_identities, const std::string &topic) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_data(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_data_ptr(reinterpret_cast(data_ptr)); - msg->set_data_len(data_len); - msg->set_reliable(reliable); - msg->set_topic(topic); - for (const auto &id : destination_identities) { - msg->add_destination_identities(id); - } + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_data()) { - throw std::runtime_error("FfiResponse missing publish_data"); - } - const AsyncId async_id = resp.publish_data().async_id(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_publish_data() && event.publish_data().async_id() == async_id; @@ -425,26 +507,45 @@ std::future FfiClient::publishDataAsync( } pr.set_value(); }); -} -std::future FfiClient::publishSipDtmfAsync( - std::uint64_t local_participant_handle, std::uint32_t code, - const std::string &digit, - const std::vector &destination_identities) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_publish_sip_dtmf(); + auto *msg = req.mutable_publish_data(); msg->set_local_participant_handle(local_participant_handle); - msg->set_code(code); - msg->set_digit(digit); + msg->set_data_ptr(reinterpret_cast(data_ptr)); + msg->set_data_len(data_len); + msg->set_reliable(reliable); + msg->set_topic(topic); + msg->set_request_async_id(async_id); for (const auto &id : destination_identities) { msg->add_destination_identities(id); } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_sip_dtmf()) { - throw std::runtime_error("FfiResponse missing publish_sip_dtmf"); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_data()) { + logAndThrowInDebug("FfiResponse missing publish_data"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.publish_sip_dtmf().async_id(); - return registerAsync( + + return fut; +} + +std::future FfiClient::publishSipDtmfAsync( + std::uint64_t local_participant_handle, std::uint32_t code, + const std::string &digit, + const std::vector &destination_identities) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_publish_sip_dtmf() && event.publish_sip_dtmf().async_id() == async_id; @@ -458,21 +559,42 @@ std::future FfiClient::publishSipDtmfAsync( } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_publish_sip_dtmf(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_code(code); + msg->set_digit(digit); + msg->set_request_async_id(async_id); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_sip_dtmf()) { + logAndThrowInDebug("FfiResponse missing publish_sip_dtmf"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, const std::string &metadata) { - proto::FfiRequest req; - auto *msg = req.mutable_set_local_metadata(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_metadata(metadata); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_set_local_metadata()) { - throw std::runtime_error("FfiResponse missing set_local_metadata"); - } - const AsyncId async_id = resp.set_local_metadata().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_set_local_metadata() && event.set_local_metadata().async_id() == async_id; @@ -486,24 +608,38 @@ FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_set_local_metadata(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_metadata(metadata); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_set_local_metadata()) { + logAndThrowInDebug("FfiResponse missing set_local_metadata"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, const proto::AudioFrameBufferInfo &buffer) { - proto::FfiRequest req; - auto *msg = req.mutable_capture_audio_frame(); - msg->set_source_handle(source_handle); - msg->mutable_buffer()->CopyFrom(buffer); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_capture_audio_frame()) { - throw std::runtime_error("FfiResponse missing capture_audio_frame"); - } - - const AsyncId async_id = resp.capture_audio_frame().async_id(); - - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // match predicate [async_id](const proto::FfiEvent &event) { return event.has_capture_audio_frame() && @@ -519,6 +655,27 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_capture_audio_frame(); + msg->set_source_handle(source_handle); + msg->set_request_async_id(async_id); + msg->mutable_buffer()->CopyFrom(buffer); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_capture_audio_frame()) { + logAndThrowInDebug("FfiResponse missing capture_audio_frame"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future @@ -527,21 +684,12 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, const std::string &method, const std::string &payload, std::optional response_timeout_ms) { - proto::FfiRequest req; - auto *msg = req.mutable_perform_rpc(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_destination_identity(destination_identity); - msg->set_method(method); - msg->set_payload(payload); - if (response_timeout_ms.has_value()) { - msg->set_response_timeout_ms(*response_timeout_ms); - } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_perform_rpc()) { - throw std::runtime_error("FfiResponse missing perform_rpc"); - } - const AsyncId async_id = resp.perform_rpc().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // match predicate [async_id](const proto::FfiEvent &event) { return event.has_perform_rpc() && @@ -558,6 +706,32 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, } pr.set_value(cb.payload()); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_perform_rpc(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_destination_identity(destination_identity); + msg->set_method(method); + msg->set_payload(payload); + msg->set_request_async_id(async_id); + if (response_timeout_ms.has_value()) { + msg->set_response_timeout_ms(*response_timeout_ms); + } + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_perform_rpc()) { + logAndThrowInDebug("FfiResponse missing perform_rpc"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::sendStreamHeaderAsync( @@ -565,22 +739,12 @@ std::future FfiClient::sendStreamHeaderAsync( const proto::DataStream::Header &header, const std::vector &destination_identities, const std::string &sender_identity) { - proto::FfiRequest req; - auto *msg = req.mutable_send_stream_header(); - msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_header() = header; - msg->set_sender_identity(sender_identity); - for (const auto &id : destination_identities) { - msg->add_destination_identities(id); - } + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_header()) { - throw std::runtime_error("FfiResponse missing send_stream_header"); - } - const AsyncId async_id = resp.send_stream_header().async_id(); - - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_header() && e.send_stream_header().async_id() == async_id; @@ -594,28 +758,44 @@ std::future FfiClient::sendStreamHeaderAsync( } pr.set_value(); }); -} -std::future FfiClient::sendStreamChunkAsync( - std::uint64_t local_participant_handle, - const proto::DataStream::Chunk &chunk, - const std::vector &destination_identities, - const std::string &sender_identity) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_send_stream_chunk(); + auto *msg = req.mutable_send_stream_header(); msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_chunk() = chunk; + *msg->mutable_header() = header; msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); for (const auto &id : destination_identities) { msg->add_destination_identities(id); } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_chunk()) { - throw std::runtime_error("FfiResponse missing send_stream_chunk"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_header()) { + logAndThrowInDebug("FfiResponse missing send_stream_header"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.send_stream_chunk().async_id(); - return registerAsync( + + return fut; +} + +std::future FfiClient::sendStreamChunkAsync( + std::uint64_t local_participant_handle, + const proto::DataStream::Chunk &chunk, + const std::vector &destination_identities, + const std::string &sender_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_chunk() && e.send_stream_chunk().async_id() == async_id; @@ -629,25 +809,43 @@ std::future FfiClient::sendStreamChunkAsync( } pr.set_value(); }); -} -std::future -FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, - const proto::DataStream::Trailer &trailer, - const std::string &sender_identity) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_send_stream_trailer(); + auto *msg = req.mutable_send_stream_chunk(); msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_trailer() = trailer; + *msg->mutable_chunk() = chunk; msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_trailer()) { - throw std::runtime_error("FfiResponse missing send_stream_trailer"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_chunk()) { + logAndThrowInDebug("FfiResponse missing send_stream_chunk"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.send_stream_trailer().async_id(); - return registerAsync( + return fut; +} + +std::future +FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Trailer &trailer, + const std::string &sender_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_trailer() && e.send_stream_trailer().async_id() == async_id; @@ -661,6 +859,28 @@ FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_send_stream_trailer(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_trailer() = trailer; + msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_trailer()) { + logAndThrowInDebug("FfiResponse missing send_stream_trailer"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } } // namespace livekit diff --git a/src/ffi_client.h b/src/ffi_client.h index 36cf72c..3fa9818 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -17,11 +17,13 @@ #ifndef LIVEKIT_FFI_CLIENT_H #define LIVEKIT_FFI_CLIENT_H +#include #include #include #include #include #include +#include #include #include "livekit/stats.h" @@ -147,9 +149,11 @@ class FfiClient { // Base class for type-erased pending ops struct PendingBase { + AsyncId async_id = 0; // Client-generated async ID for cancellation virtual ~PendingBase() = default; virtual bool matches(const proto::FfiEvent &event) const = 0; virtual void complete(const proto::FfiEvent &event) = 0; + virtual void cancel() = 0; // Cancel the pending operation }; template struct Pending : PendingBase { std::promise promise; @@ -163,17 +167,30 @@ class FfiClient { void complete(const proto::FfiEvent &event) override { handler(event, promise); } + + void cancel() override { + promise.set_exception(std::make_exception_ptr( + std::runtime_error("Async operation cancelled"))); + } }; template std::future registerAsync( - std::function match, + AsyncId async_id, std::function match, std::function &)> handler); + // Generate a unique client-side async ID for request correlation + AsyncId generateAsyncId(); + + // Cancel a pending async operation by its async_id. Returns true if found and + // removed. + bool cancelPendingByAsyncId(AsyncId async_id); + std::unordered_map listeners_; ListenerId nextListenerId = 1; mutable std::mutex lock_; mutable std::vector> pending_; + std::atomic nextAsyncId_{1}; void PushEvent(const proto::FfiEvent &event) const; friend void LivekitFfiCallback(const uint8_t *buf, size_t len); From a4448213b8751e65a9afc694e0b0116ee37304e8 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 19 Jan 2026 19:40:06 +0800 Subject: [PATCH 2/2] changed the code back to always throw when ffi returns invalid result --- src/ffi_client.cpp | 54 ++++++++++++---------------------------------- 1 file changed, 14 insertions(+), 40 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 4ad2cf6..ec6a022 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -38,12 +38,10 @@ std::string bytesToString(const std::vector &b) { return std::string(reinterpret_cast(b.data()), b.size()); } -// Helper to log errors and conditionally throw in debug mode -inline void logAndThrowInDebug(const std::string &error_msg) { +// Helper to log errors and throw +inline void logAndThrow(const std::string &error_msg) { std::cerr << "LiveKit SDK Error: " << error_msg << std::endl; -#ifndef NDEBUG throw std::runtime_error(error_msg); -#endif } } // namespace @@ -308,9 +306,7 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_connect()) { - logAndThrowInDebug("FfiResponse missing connect"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing connect"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -362,9 +358,7 @@ FfiClient::getTrackStatsAsync(uintptr_t track_handle) { try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_get_stats()) { - logAndThrowInDebug("FfiResponse missing get_stats"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing get_stats"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -424,9 +418,7 @@ FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_publish_track()) { - logAndThrowInDebug("FfiResponse missing publish_track"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing publish_track"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -471,9 +463,7 @@ FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_unpublish_track()) { - logAndThrowInDebug("FfiResponse missing unpublish_track"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing unpublish_track"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -524,9 +514,7 @@ std::future FfiClient::publishDataAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_publish_data()) { - logAndThrowInDebug("FfiResponse missing publish_data"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing publish_data"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -574,9 +562,7 @@ std::future FfiClient::publishSipDtmfAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_publish_sip_dtmf()) { - logAndThrowInDebug("FfiResponse missing publish_sip_dtmf"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing publish_sip_dtmf"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -619,9 +605,7 @@ FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_set_local_metadata()) { - logAndThrowInDebug("FfiResponse missing set_local_metadata"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing set_local_metadata"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -666,9 +650,7 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_capture_audio_frame()) { - logAndThrowInDebug("FfiResponse missing capture_audio_frame"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing capture_audio_frame"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -722,9 +704,7 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_perform_rpc()) { - logAndThrowInDebug("FfiResponse missing perform_rpc"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing perform_rpc"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -773,9 +753,7 @@ std::future FfiClient::sendStreamHeaderAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_send_stream_header()) { - logAndThrowInDebug("FfiResponse missing send_stream_header"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing send_stream_header"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -824,9 +802,7 @@ std::future FfiClient::sendStreamChunkAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_send_stream_chunk()) { - logAndThrowInDebug("FfiResponse missing send_stream_chunk"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing send_stream_chunk"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -871,9 +847,7 @@ FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_send_stream_trailer()) { - logAndThrowInDebug("FfiResponse missing send_stream_trailer"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing send_stream_trailer"); } } catch (...) { cancelPendingByAsyncId(async_id);