Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lightway-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ async fn handle_events<A: 'static + Send + EventCallback, ExtAppState: Send + Sy
Event::EncodingStateChanged { enabled } => {
info!("Encoding state changed to {enabled}");
}
Event::DataPathModeChanged(mode) => {
info!(?mode, "Data path mode changed");
}

// Server only events
Event::SessionIdRotationStarted { .. }
Expand Down
4 changes: 3 additions & 1 deletion lightway-client/src/mobile/lightway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,9 @@ async fn handle_events<A: 'static + Send + EventCallback>(
}
continue;
}
Event::FirstPacketReceived | Event::EncodingStateChanged { .. } => (), // will be handled by handle_global_events
Event::FirstPacketReceived
| Event::EncodingStateChanged { .. }
| Event::DataPathModeChanged(_) => (), // will be handled by handle_global_events

// Server-only events
Event::SessionIdRotationAcknowledged { .. }
Expand Down
140 changes: 112 additions & 28 deletions lightway-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ const MAX_RETRANSMISSION_ATTEMPTS: u8 = 5;
/// Maximum number of retransmissions attempts for each encoding request packet.
const ENCODING_REQUEST_PKT_MAX_RETRANSMISSION_ATTEMPTS: u8 = 5;

/// Which mutually exclusive data path optimization is active.
///
/// ExpressLane and InsidePktCodec cannot be active simultaneously.
/// This enum is the single source of truth for which mode governs
/// the data path.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum DataPathMode {
/// Standard data path (i.e. non-ExpressLane, non-InsidePktCodec)
#[default]
Standard,

/// ExpressLane
ExpressLane,

/// InsidePktCodec
InsidePktCodec,
}

/// Connection state
#[derive(Debug, Clone, Copy, PartialEq)]
#[repr(u8)]
Expand Down Expand Up @@ -441,6 +459,9 @@ pub struct Connection<AppState: Send = ()> {

// Expresslane state, config exchange, health monitoring, wire crypto, and callbacks
expresslane: expresslane::Expresslane<AppState>,

/// Which mutually exclusive data path optimization is active
data_path_mode: DataPathMode,
}

/// Information about the new session being established with a new
Expand Down Expand Up @@ -530,6 +551,7 @@ impl<AppState: Send> Connection<AppState> {
args.expresslane_cb,
args.expresslane_metrics,
),
data_path_mode: DataPathMode::Standard,
};

// This will very likely fail since negotiation always needs
Expand Down Expand Up @@ -1648,12 +1670,36 @@ impl<AppState: Send> Connection<AppState> {
}

/// Set the expresslane state and emit the event if the state has changed.
///
/// Activation is blocked while data path mode is [`DataPathMode::InsidePktCodec`].
/// Transitions to/from [`ExpresslaneState::Active`] automatically update
/// the [`DataPathMode`].
fn set_expresslane_state(&mut self, new_state: ExpresslaneState) {
if self.expresslane.state == new_state {
return;
}
if new_state == ExpresslaneState::Active
&& self.data_path_mode == DataPathMode::InsidePktCodec
{
warn!("Blocking expresslane activation: InsidePktCodec is active");
return;
}
self.expresslane.state = new_state;
self.event(Event::ExpresslaneStateChanged(new_state));

if new_state == ExpresslaneState::Active {
self.set_data_path_mode(DataPathMode::ExpressLane);
} else if self.data_path_mode == DataPathMode::ExpressLane {
self.set_data_path_mode(DataPathMode::Standard);
}
}

fn set_data_path_mode(&mut self, new_mode: DataPathMode) {
if self.data_path_mode == new_mode {
return;
}
self.data_path_mode = new_mode;
self.event(Event::DataPathModeChanged(new_mode));
}

/// Encode expresslane metrics as a binary payload.
Expand Down Expand Up @@ -1715,6 +1761,11 @@ impl<AppState: Send> Connection<AppState> {
return Ok(());
}

if self.data_path_mode == DataPathMode::InsidePktCodec {
debug!("Ignoring expresslane config from peer (InsidePktCodec active)");
return Ok(());
}

// Server starts in WaitingForClient. The client's first config
// transitions the server to Pending, which enables
// expresslane_supported() and allows the server to send its own key.
Expand Down Expand Up @@ -1770,39 +1821,40 @@ impl<AppState: Send> Connection<AppState> {
// If updating key failed, send disabled to peer
let enabled = self.expresslane.data.update_next_self_key(key).is_ok();

self.expresslane.config_counter += 1;
let config = wire::ExpresslaneConfig {
enabled,
key,
version: ExpresslaneVersion::Version1,
ack: false,
counter: self.expresslane.config_counter,
};

let msg = wire::Frame::ExpresslaneConfig(config);
let _ = self.send_frame_or_drop(msg);
self.send_expresslane_config(enabled, key);

self.expresslane.last_key_rotation = Some(Instant::now());

// Callback to schedule re-transmission if required
(self.schedule_tick_cb)(
self.expresslane.retransmit_wait_time(),
&mut self.app_state,
TickType::ExpresslaneKeyShareTick(ExpresslaneTickData(config)),
);
Ok(())
}

/// Mark expresslane as [Degraded](ExpresslaneState::Degraded) and notify the peer to also disable
/// Expresslane will not be re-enabled after being degraded
fn set_expresslane_degraded(&mut self) {
self.set_expresslane_state(ExpresslaneState::Degraded);

let key = ExpresslaneKey::INVALID;
let _ = self.expresslane.data.update_next_self_key(key);

self.send_expresslane_config(false, key);
}

/// Mark expresslane as [Inactive](ExpresslaneState::Inactive) and notify the peer to also disable
/// Unlike [`Self::set_expresslane_degraded`], this allows re-activation when InsidePktCodec is later disabled
fn set_expresslane_inactive(&mut self) {
self.set_expresslane_state(ExpresslaneState::Inactive);

let key = ExpresslaneKey::INVALID;
let _ = self.expresslane.data.update_next_self_key(key);

self.send_expresslane_config(false, key);
}

/// Build an ExpresslaneConfig, send it to the peer, and schedule retransmission.
fn send_expresslane_config(&mut self, enabled: bool, key: ExpresslaneKey) {
self.expresslane.config_counter += 1;
let config = wire::ExpresslaneConfig {
enabled: false,
enabled,
key,
version: ExpresslaneVersion::Version1,
ack: false,
Expand All @@ -1812,8 +1864,6 @@ impl<AppState: Send> Connection<AppState> {
let msg = wire::Frame::ExpresslaneConfig(config);
let _ = self.send_frame_or_drop(msg);

// Callback to schedule re-transmission if required
// reuses same retry logic as key rotation
(self.schedule_tick_cb)(
self.expresslane.retransmit_wait_time(),
&mut self.app_state,
Expand Down Expand Up @@ -2123,6 +2173,15 @@ impl<AppState: Send> Connection<AppState> {
// Update the latest acknowledged packet's id
self.encoding_request_states.id_counter = er.id;

if er.enable {
if self.data_path_mode == DataPathMode::ExpressLane {
self.set_expresslane_inactive();
}
self.set_data_path_mode(DataPathMode::InsidePktCodec);
} else {
self.set_data_path_mode(DataPathMode::Standard);
}

encoder.set_encoding_state(er.enable);
debug!(
"Client {:?}: EncodingRequest {} received. encoding state now: {}.",
Expand All @@ -2133,6 +2192,12 @@ impl<AppState: Send> Connection<AppState> {

self.event(Event::EncodingStateChanged { enabled: er.enable });

if !er.enable && self.expresslane_supported() {
// Re-enable ExpressLane
self.expresslane.last_key_rotation = None;
let _ = self.rotate_expresslane_key();
}

// Reply to the client.
let msg = wire::Frame::EncodingResponse(wire::EncodingResponse {
id: er.id,
Expand All @@ -2146,13 +2211,10 @@ impl<AppState: Send> Connection<AppState> {
&mut self,
te: wire::EncodingResponse,
) -> ConnectionResult<()> {
let encoder = match &mut self.inside_pkt_encoder {
Some(encoder) => encoder,
None => {
error!("Received EncodingResponse packet even without an encoder.");
return Err(ConnectionError::PacketCodecDoesNotExist);
}
};
if self.inside_pkt_encoder.is_none() {
error!("Received EncodingResponse packet even without an encoder.");
return Err(ConnectionError::PacketCodecDoesNotExist);
}

if !matches!(self.state, State::Online) {
warn!("Received encoding request packet before state is Online");
Expand Down Expand Up @@ -2186,7 +2248,19 @@ impl<AppState: Send> Connection<AppState> {

let new_setting = te.enable;

encoder.set_encoding_state(new_setting);
if new_setting {
if self.data_path_mode == DataPathMode::ExpressLane {
self.set_expresslane_inactive();
}
self.set_data_path_mode(DataPathMode::InsidePktCodec);
} else {
self.set_data_path_mode(DataPathMode::Standard);
}

self.inside_pkt_encoder
.as_mut()
.unwrap()
.set_encoding_state(new_setting);
info!("inside packet encoding state is now set to {}", new_setting);

self.event(Event::EncodingStateChanged {
Expand All @@ -2196,9 +2270,19 @@ impl<AppState: Send> Connection<AppState> {
// Removes from pending pkt store such that it is no longer retransmitted
self.encoding_request_states.pending_request_pkt = None;

if !new_setting && self.expresslane_supported() {
self.expresslane.last_key_rotation = None;
let _ = self.rotate_expresslane_key();
}

Ok(())
}

/// Get the current data path mode
pub fn data_path_mode(&self) -> DataPathMode {
self.data_path_mode
}

/// Get the current encoding state from the encoder
pub fn is_encoding_enabled(&self) -> bool {
self.inside_pkt_encoder
Expand Down
4 changes: 3 additions & 1 deletion lightway-core/src/connection/event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::connection::ExpresslaneState;
use crate::connection::{DataPathMode, ExpresslaneState};
use crate::{SessionId, State};

/// A lightway event
Expand Down Expand Up @@ -52,4 +52,6 @@ pub enum Event {
},
/// Expresslane state changed
ExpresslaneStateChanged(ExpresslaneState),
/// Data path mode changed
DataPathModeChanged(DataPathMode),
}
5 changes: 3 additions & 2 deletions lightway-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ pub use builder_predicates::BuilderPredicates;
pub use cipher::Cipher;
pub use connection::{
ClientConnectionBuilder, Connection, ConnectionActivity, ConnectionBuilderError,
ConnectionError, ConnectionResult, Event, EventCallback, EventCallbackArg, ExpresslaneState,
ServerConnectionBuilder, State, dplpmtud::Timer as DplpmtudTimer, expresslane::*,
ConnectionError, ConnectionResult, DataPathMode, Event, EventCallback, EventCallbackArg,
ExpresslaneState, ServerConnectionBuilder, State, dplpmtud::Timer as DplpmtudTimer,
expresslane::*,
};
pub use context::{
ClientContext, ClientContextBuilder, ConnectionType, ContextError, ScheduleTickCb, ServerAuth,
Expand Down
3 changes: 3 additions & 0 deletions lightway-core/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,9 @@ async fn client<S: TestSock>(
Event::EncodingStateChanged { enabled } => {
println!("Encoding state change to {enabled}")
}
Event::DataPathModeChanged(mode) => {
println!("Data path mode change to {mode:?}")
}
}
}
});
Expand Down
1 change: 1 addition & 0 deletions lightway-server/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ async fn handle_events(
Event::TlsKeysUpdateStart => handle_tls_keys_update_start(&conn),
Event::TlsKeysUpdateCompleted => handle_tls_keys_update_complete(),
Event::ExpresslaneStateChanged(_) => {}
Event::DataPathModeChanged(_) => {}
Event::FirstPacketReceived => {
unreachable!("client only event received");
}
Expand Down
Loading