Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/single-pc-connect-perf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Faster initial connect in single peer connection mode by skipping an unnecessary 20ms negotiate debounce"
18 changes: 10 additions & 8 deletions Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,27 @@ let connectionBenchmarks: @Sendable () -> Void = {

benchmark.startMeasurement()
try await room.connect(url: config.url, token: token)
try? await room.waitUntilDataChannelsOpen()
benchmark.stopMeasurement()

// Extract fine-grained timestamps from the completed connect span
if let span = benchmarkTracer.completedSpan("connect") {
let s = span.splitMilliseconds
let wsOpen = s["ws_open"] ?? 0
let joinRecv = s["signal"] ?? s["join_recv"] ?? 0
let answerSent = s["answer_sent"]
// Either side may initiate SDP — answer_sent in dual PC subscriber-primary
// (server-initiated offer), offer_sent in single PC / publisher-primary.
let sdpDispatched = s["answer_sent"] ?? s["offer_sent"]
let pcConnected = s["pc_connected"] ?? 0
let dcOpen = s["dc_open"]

benchmark.measurement(dWs, Int(wsOpen))
benchmark.measurement(dSignal, Int(joinRecv - wsOpen))
benchmark.measurement(dTransport, Int(pcConnected - joinRecv))

if let answerSent {
benchmark.measurement(dIceDtls, Int(pcConnected - answerSent))
if let sdpDispatched {
benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched))
}

if let dcOpen {
benchmark.measurement(dDc, Int(dcOpen - pcConnected))
}
Expand Down Expand Up @@ -114,24 +116,24 @@ let connectionBenchmarks: @Sendable () -> Void = {

benchmark.startMeasurement()
try await room.connect(url: config.url, token: token)
try? await room.waitUntilDataChannelsOpen()
benchmark.stopMeasurement()

if let span = benchmarkTracer.completedSpan("connect") {
let s = span.splitMilliseconds
let wsOpen = s["ws_open"] ?? 0
let joinRecv = s["signal"] ?? s["join_recv"] ?? 0
let answerSent = s["answer_sent"]
let sdpDispatched = s["answer_sent"] ?? s["offer_sent"]
let pcConnected = s["pc_connected"] ?? 0
let dcOpen = s["dc_open"]

benchmark.measurement(dWs, Int(wsOpen))
benchmark.measurement(dSignal, Int(joinRecv - wsOpen))
benchmark.measurement(dTransport, Int(pcConnected - joinRecv))

if let answerSent {
benchmark.measurement(dIceDtls, Int(pcConnected - answerSent))
if let sdpDispatched {
benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched))
}

if let dcOpen {
benchmark.measurement(dDc, Int(dcOpen - pcConnected))
}
Expand Down
46 changes: 46 additions & 0 deletions Sources/LiveKit/Core/Room+DataChannels.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2026 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

public extension Room {
/// Waits until the connect-time data channels have opened.
///
/// ``connect(url:token:connectOptions:roomOptions:)`` returns once the
/// primary peer connection reaches DTLS completion. Data channels share
/// the same SCTP transport and typically open within a few milliseconds
/// after, but the open event is observed asynchronously. Await this
/// method when full handshake completion (readiness to send and receive
/// data) must be observed before proceeding.
///
/// Records a `dc_open` event on ``connectSpan`` when the data channels open.
///
/// - Parameters:
/// - timeout: The timeout for the operation.
/// - Throws: `LiveKitError` if data channels do not open within the timeout.
@discardableResult
func waitUntilDataChannelsOpen(timeout: TimeInterval = .defaultPublisherDataChannelOpen) async throws -> Self {
guard let pair = _state.transport?.connectDataChannelPair(
publisher: publisherDataChannel,
subscriber: subscriberDataChannel
) else {
return self
}
try await pair.openCompleter.wait(timeout: timeout)
connectSpan?.record("dc_open")
return self
}
}
19 changes: 13 additions & 6 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ extension Room {
}
}

func publisherShouldNegotiate() async throws {
func publisherShouldNegotiate(force: Bool = false) async throws {
log()

let publisher = try requirePublisher()
await publisher.negotiate()
try await publisher.negotiate(force: force)
_state.mutate { $0.hasPublished = true }
}

Expand Down Expand Up @@ -165,6 +165,7 @@ extension Room {
guard let self else { return }
log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)")
try await signalClient.send(offer: offer, offerId: offerId)
connectSpan?.record("offer_sent")
}

// data over pub channel for backwards compatibility
Expand Down Expand Up @@ -196,10 +197,6 @@ extension Room {
_state.mutate { $0.transport = transport }

log("[Connect] Fast publish enabled: \(joinResponse.fastPublish ? "true" : "false")")
if isSinglePC || !isSubscriberPrimary || joinResponse.fastPublish {
// In single PC mode or when publisher is primary, negotiate immediately
try await publisherShouldNegotiate()
}

} else if case let .reconnect(reconnectResponse) = connectResponse {
log("[Connect] Configuring transports with RECONNECT response...")
Expand Down Expand Up @@ -286,6 +283,16 @@ extension Room {

// Resume after configuring transports...
await signalClient.resumeQueues()
try Task.checkCancellation()

// Eager publisher negotiation must run after `resumeQueues()` —
// offers are not queueable, so sending while suspended drops them.
if case let .join(joinResponse) = connectResponse {
let isSubscriberPrimary = singlePC ? false : joinResponse.subscriberPrimary
if singlePC || !isSubscriberPrimary || joinResponse.fastPublish {
try await publisherShouldNegotiate(force: true)
}
}

// Wait for transport...
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
Expand Down
4 changes: 0 additions & 4 deletions Sources/LiveKit/Core/Room+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ extension Room: TransportDelegate {
case LKRTCDataChannel.Labels.lossy: subscriberDataChannel.set(lossy: dataChannel)
default: log("Unknown data channel label \(dataChannel.label)", .warning)
}

if subscriberDataChannel.isOpen {
connectSpan?.record("dc_open")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved to 769386a to make it consistent with the spec, I'm not sure if we should drop this data point if that's not really awaitable from Swift.

I may re-evaluate the spec (and other SDKs).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I'm more for dropping it...

}
}

func transportShouldNegotiate(_: Transport) {}
Expand Down
12 changes: 9 additions & 3 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ actor Transport: NSObject, Loggable {
_delegate.add(delegate: delegate)
}

func negotiate() async {
await _debounce.schedule {
try await self.createAndSendOffer()
func negotiate(force: Bool = false) async throws {
if force {
// Cancel any pending debounced negotiation; this call supersedes it.
await _debounce.cancel()
try await createAndSendOffer()
} else {
await _debounce.schedule {
try await self.createAndSendOffer()
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions Sources/LiveKit/Core/TransportMode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ extension TransportMode {
}
}

/// The data-channel pair that opens during the connect handshake:
/// the publisher pair (client-created) in single PC, the subscriber pair
/// (server-pushed) in dual PC.
func connectDataChannelPair(publisher: DataChannelPair, subscriber: DataChannelPair) -> DataChannelPair {
switch self {
case .publisherOnly: publisher
case .subscriberPrimary, .publisherPrimary: subscriber
}
}

/// All distinct transports (one in single PC, two in dual PC).
var allTransports: [Transport] {
switch self {
Expand Down
Loading