diff --git a/.changes/single-pc-connect-perf b/.changes/single-pc-connect-perf new file mode 100644 index 000000000..d4a74a5c0 --- /dev/null +++ b/.changes/single-pc-connect-perf @@ -0,0 +1 @@ +patch type="fixed" "Faster initial connect in single peer connection mode by skipping an unnecessary 20ms negotiate debounce" diff --git a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift index d21c3d369..e531b3ae7 100644 --- a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift +++ b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift @@ -32,16 +32,19 @@ private let dWs: BenchmarkMetric = .custom("D_WS_MS", polarity: .prefersSmaller, private let dSignal: BenchmarkMetric = .custom("D_SIGNAL_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dTransport: BenchmarkMetric = .custom("D_TRANSPORT_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dIceDtls: BenchmarkMetric = .custom("D_ICE_DTLS_MS", polarity: .prefersSmaller, useScalingFactor: false) -private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) +// `D_DC_MS` is not collected: the SDK does not block `connect()` on data channels +// opening, so a `dc_open` split would race with `splitMilliseconds` being read +// here. See spec/01-connection-time.md for the spec-defined `T_DC_OPEN`. +// private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) let connectionBenchmarks: @Sendable () -> Void = { // BM-CONN-001: Dual PeerConnection, subscriber-primary (default) Benchmark( "BM-CONN-001-DualPC-SubscriberPrimary", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -67,21 +70,20 @@ let connectionBenchmarks: @Sendable () -> Void = { let s = span.splitMilliseconds let wsOpen = s["ws_open"] ?? 0 let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 - let answerSent = s["answer_sent"] + // Either side may initiate SDP — answer_sent in dual PC subscriber-primary + // (server-initiated offer), offer_sent in single PC / publisher-primary. + let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - let dcOpen = s["dc_open"] + // let dcOpen = s["dc_open"] // see note on `dDc` above benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) benchmark.measurement(dTransport, Int(pcConnected - joinRecv)) - if let answerSent { - benchmark.measurement(dIceDtls, Int(pcConnected - answerSent)) - } - - if let dcOpen { - benchmark.measurement(dDc, Int(dcOpen - pcConnected)) + if let sdpDispatched { + benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } + // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } } await room.disconnect() @@ -93,9 +95,9 @@ let connectionBenchmarks: @Sendable () -> Void = { Benchmark( "BM-CONN-003-SinglePC", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -120,21 +122,18 @@ let connectionBenchmarks: @Sendable () -> Void = { let s = span.splitMilliseconds let wsOpen = s["ws_open"] ?? 0 let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 - let answerSent = s["answer_sent"] + let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - let dcOpen = s["dc_open"] + // let dcOpen = s["dc_open"] // see note on `dDc` above benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) benchmark.measurement(dTransport, Int(pcConnected - joinRecv)) - if let answerSent { - benchmark.measurement(dIceDtls, Int(pcConnected - answerSent)) - } - - if let dcOpen { - benchmark.measurement(dDc, Int(dcOpen - pcConnected)) + if let sdpDispatched { + benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } + // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } } await room.disconnect() diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 66a5c8c6c..ac4adf609 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -53,11 +53,11 @@ extension Room { } } - func publisherShouldNegotiate() async throws { + func publisherShouldNegotiate(force: Bool = false) async throws { log() let publisher = try requirePublisher() - await publisher.negotiate() + try await publisher.negotiate(force: force) _state.mutate { $0.hasPublished = true } } @@ -165,6 +165,7 @@ extension Room { guard let self else { return } log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)") try await signalClient.send(offer: offer, offerId: offerId) + connectSpan?.record("offer_sent") } // data over pub channel for backwards compatibility @@ -196,10 +197,6 @@ extension Room { _state.mutate { $0.transport = transport } log("[Connect] Fast publish enabled: \(joinResponse.fastPublish ? "true" : "false")") - if isSinglePC || !isSubscriberPrimary || joinResponse.fastPublish { - // In single PC mode or when publisher is primary, negotiate immediately - try await publisherShouldNegotiate() - } } else if case let .reconnect(reconnectResponse) = connectResponse { log("[Connect] Configuring transports with RECONNECT response...") @@ -286,6 +283,16 @@ extension Room { // Resume after configuring transports... await signalClient.resumeQueues() + try Task.checkCancellation() + + // Eager publisher negotiation must run after `resumeQueues()` — + // offers are not queueable, so sending while suspended drops them. + if case let .join(joinResponse) = connectResponse { + let isSubscriberPrimary = singlePC ? false : joinResponse.subscriberPrimary + if singlePC || !isSubscriberPrimary || joinResponse.fastPublish { + try await publisherShouldNegotiate(force: true) + } + } // Wait for transport... try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout) diff --git a/Sources/LiveKit/Core/Room+TransportDelegate.swift b/Sources/LiveKit/Core/Room+TransportDelegate.swift index de299cb59..a8f6a11c1 100644 --- a/Sources/LiveKit/Core/Room+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Room+TransportDelegate.swift @@ -113,10 +113,6 @@ extension Room: TransportDelegate { case LKRTCDataChannel.Labels.lossy: subscriberDataChannel.set(lossy: dataChannel) default: log("Unknown data channel label \(dataChannel.label)", .warning) } - - if subscriberDataChannel.isOpen { - connectSpan?.record("dc_open") - } } func transportShouldNegotiate(_: Transport) {} diff --git a/Sources/LiveKit/Core/Transport.swift b/Sources/LiveKit/Core/Transport.swift index e5ecff708..7d23f4aaf 100644 --- a/Sources/LiveKit/Core/Transport.swift +++ b/Sources/LiveKit/Core/Transport.swift @@ -98,9 +98,15 @@ actor Transport: NSObject, Loggable { _delegate.add(delegate: delegate) } - func negotiate() async { - await _debounce.schedule { - try await self.createAndSendOffer() + func negotiate(force: Bool = false) async throws { + if force { + // Cancel any pending debounced negotiation; this call supersedes it. + await _debounce.cancel() + try await createAndSendOffer() + } else { + await _debounce.schedule { + try await self.createAndSendOffer() + } } }