Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 54 additions & 1 deletion lightway-app-utils/src/tun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ pub struct TunConfig {
#[cfg(unix)]
#[educe(Default = true)]
pub close_fd_on_drop: bool,
/// Enable TUN offload (`IFF_VNET_HDR`) so reads/writes carry a
/// `virtio_net_hdr` and the kernel performs GRO/GSO across the
/// device. Required for the GSO inside-IO path.
#[cfg(target_os = "linux")]
pub offload: bool,
#[cfg(windows)]
/// Optional wintun file path for Windows TUN interfaces
pub wintun_file: Option<String>,
Expand Down Expand Up @@ -206,6 +211,10 @@ impl TunConfig {
{
builder = builder.associate_route(false);
}
#[cfg(target_os = "linux")]
if self.offload {
builder = builder.offload(true);
}
let device = builder.build_async()?;

if let Some(mtu) = self.mtu {
Expand Down Expand Up @@ -307,6 +316,16 @@ impl Tun {
}
}

/// Raw read from `Tun`, returning the full virtio frame (header + payload).
#[cfg(target_os = "linux")]
pub async fn recv_gso(&self, buf: &mut [u8]) -> IOCallbackResult<usize> {
match self {
Tun::Direct(t) => t.recv_gso(buf).await,
#[cfg(feature = "io-uring")]
Tun::IoUring(_) => todo!(),
}
}

/// Send a packet to `Tun`
pub fn try_send(&self, buf: BytesMut) -> IOCallbackResult<usize> {
match self {
Expand Down Expand Up @@ -363,6 +382,10 @@ pub struct TunDirect {
fd: RawFd,
#[cfg(unix)]
close_fd_on_drop: bool,
/// `IFF_VNET_HDR` enabled — sends must be prefixed with a 12-byte
/// `virtio_net_hdr`, reads include it.
#[cfg(target_os = "linux")]
vnet_hdr: bool,
}

impl TunDirect {
Expand All @@ -385,6 +408,8 @@ impl TunDirect {
fd,
#[cfg(unix)]
close_fd_on_drop: config.close_fd_on_drop,
#[cfg(target_os = "linux")]
vnet_hdr: config.offload,
})
}

Expand All @@ -406,10 +431,38 @@ impl TunDirect {
}
}

/// Raw read from Tun, returning the full virtio frame (header + payload).
#[cfg(target_os = "linux")]
pub async fn recv_gso(&self, buf: &mut [u8]) -> IOCallbackResult<usize> {
let tun = self.tun.as_ref().unwrap();
match tun.recv(buf).await {
Ok(0) => IOCallbackResult::WouldBlock,
Ok(n) => IOCallbackResult::Ok(n),
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
IOCallbackResult::WouldBlock
}
Err(err) => IOCallbackResult::Err(err),
}
}

/// Try write from Tun
pub fn try_send(&self, buf: BytesMut) -> IOCallbackResult<usize> {
let tun = self.tun.as_ref().unwrap();
match tun.try_send(&buf[..]) {
#[cfg(target_os = "linux")]
let res = if self.vnet_hdr {
// IFF_VNET_HDR requires a zeroed `virtio_net_hdr` prefix
// on every write (NEEDS_CSUM=0, GSO_NONE).
let hdr_len = tun_rs::VIRTIO_NET_HDR_LEN;
let mut prefixed = bytes::BytesMut::zeroed(hdr_len);
prefixed.extend_from_slice(&buf[..]);
tun.try_send(&prefixed[..])
.map(|n| n.saturating_sub(hdr_len))
} else {
tun.try_send(&buf[..])
};
#[cfg(not(target_os = "linux"))]
let res = tun.try_send(&buf[..]);
match res {
Ok(nr) => IOCallbackResult::Ok(nr),
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
IOCallbackResult::WouldBlock
Expand Down
4 changes: 4 additions & 0 deletions lightway-client/src/io/outside/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl OutsideIOSendCallback for Tcp {
}
}

fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult<usize> {
todo!()
}

fn peer_addr(&self) -> SocketAddr {
self.peer_addr()
}
Expand Down
4 changes: 4 additions & 0 deletions lightway-client/src/io/outside/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ impl OutsideIOSendCallback for Udp {
}
}

fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult<usize> {
todo!()
}

fn peer_addr(&self) -> SocketAddr {
self.peer_addr()
}
Expand Down
1 change: 1 addition & 0 deletions lightway-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rand.workspace = true
rand_core = "0.10.0"
thiserror.workspace = true
tracing.workspace = true
tun-rs.workspace = true
wolfssl = "6.0.0"

[dev-dependencies]
Expand Down
148 changes: 147 additions & 1 deletion lightway-core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,142 @@ impl<AppState: Send> Connection<AppState> {
}
}

/// Process a GSO superpacket as a single packet through the pipeline.
///
/// Unlike `inside_data_received`, this skips the MTU check (the
/// superpacket is intentionally oversized) and `tcp_clamp_mss`
/// (GSO packets are never SYN). Plugins and encoder see the whole
/// superpacket as one packet.
#[cfg(target_os = "linux")]
pub fn inside_data_received_gso(
&mut self,
pkt: &mut BytesMut,
hdr: &crate::gso::VirtioNetHdr,
) -> ConnectionResult<()> {
use ConnectionError::InvalidInsidePacket;
use InvalidPacketError::InvalidIpv4Packet;

if matches!(self.state, State::Disconnected) {
return Err(ConnectionError::Disconnected);
}

if !matches!(self.state, State::Online) {
return Err(ConnectionError::InvalidState);
}

let Some(inside_io) = &self.inside_io else {
return Err(ConnectionError::InvalidState);
};
let mtu = inside_io.mtu();

// No MTU check — GSO superpacket is intentionally oversized
if !ipv4_is_valid_packet(pkt.as_ref()) {
return Err(InvalidInsidePacket(InvalidIpv4Packet));
}

let _ = self.rotate_expresslane_key();

// No tcp_clamp_mss — GSO packets are never SYN

// Plugins see the whole superpacket as one packet
match self.inside_plugins.do_ingress(pkt) {
PluginResult::Accept => {}
PluginResult::Drop => {
return Ok(());
}
PluginResult::DropWithReply(b) => {
return Err(ConnectionError::PluginDropWithReply(b));
}
PluginResult::Error(e) => {
return Err(ConnectionError::PluginError(e));
}
}

// Encoder sees the whole superpacket
if let Some(encoder) = &mut self.inside_pkt_encoder {
let codec_state = encoder.store(pkt);
match codec_state {
Ok(CodecStatus::PacketAccepted) => return Ok(()),
Ok(CodecStatus::SkipPacket) => {}
Err(e) => return Err(ConnectionError::PacketCodecError(e)),
}
}

self.send_to_outside_gso(pkt, hdr, mtu)
}

/// Split a GSO superpacket into segments, encrypt each, and send
/// the entire wire buffer as one `sendmsg` with `UDP_SEGMENT`.
///
/// For expresslane: encrypts with AES-GCM, frames via `udp_frame`,
/// collects into wire buffer.
///
/// For DTLS: `send_frame_or_drop` triggers wolfssl `try_write` which
/// calls the IO callback `send()`. The IO callback detects `gso_buf`
/// and appends framed wire packets there instead of sending.
#[cfg(target_os = "linux")]
fn send_to_outside_gso(
&mut self,
pkt: &mut BytesMut,
hdr: &crate::gso::VirtioNetHdr,
mtu: usize,
) -> ConnectionResult<()> {
use crate::gso;

let n = gso::segment_count(hdr, pkt.len());
if n == 0 {
return Ok(());
}

let expresslane = self.expresslane_ready();

// One reusable segment buffer: zero-init to `mtu` once so each
// iteration can `set_len(mtu)` (capacity bytes are initialized)
// before `build_segment` overwrites them.
let mut segment = BytesMut::zeroed(mtu);

// Enable GSO wire buffer — IO callback will append here.
// Both DTLS and expresslane paths detect this and append
// encrypted segments instead of sending immediately.
self.session
.io_cb_mut()
.gso_buf = Some(BytesMut::with_capacity(n * self.outside_mtu));

let mut result = Ok(());

for i in 0..n {
// Build segment in-place into `segment`. `set_len(mtu)` is
// sound because all `mtu` bytes were initialized at alloc
// (and only ever overwritten or shrunk thereafter).
// SAFETY: capacity ≥ mtu and bytes 0..mtu are initialized.
#[allow(unsafe_code)]
unsafe {
segment.set_len(mtu);
}
let total_len = gso::build_segment(hdr, pkt.as_ref(), &mut segment, i);
// SAFETY: build_segment wrote 0..total_len; total_len ≤ mtu.
#[allow(unsafe_code)]
unsafe {
segment.set_len(total_len);
}

if let Err(e) = self.send_outside_data(&mut segment, false) {
result = Err(e);
break;
}
}

self.activity.last_data_traffic_from_peer = Instant::now();

if result.is_ok() {
let _ = self.session.io_cb_mut().udp_send_gso(n, expresslane);
} else {
self.session.io_cb_mut().gso_buf.take();
}

result
}

/// Send a packet to the outside
pub fn send_to_outside(
&mut self,
Expand Down Expand Up @@ -1874,7 +2010,17 @@ impl<AppState: Send> Connection<AppState> {
.append_to_wire(&mut buf, session_id, data.as_ref(), iv, is_encoded);
self.activity.last_data_traffic_from_peer = Instant::now();

match self.session.io_cb().udp_send(buf.as_ref(), true) {
// GSO path: buffer encrypted data for batch send
let io = self.session.io_cb_mut();
if let Some(gso_buf) = io.gso_buf.as_mut() {
if gso_buf.is_empty() {
io.gso_size = buf.len();
}
gso_buf.extend_from_slice(&buf);
return Ok(());
}

match io.udp_send(buf.as_ref(), true) {
IOCallbackResult::Ok(_) => ConnectionResult::Ok(()),
IOCallbackResult::WouldBlock => ConnectionResult::Ok(()),
IOCallbackResult::Err(_e) => ConnectionResult::Err(ConnectionError::AccessDenied),
Expand Down
4 changes: 4 additions & 0 deletions lightway-core/src/connection/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ impl<AppState: Send + 'static> ClientConnectionBuilder<AppState> {
io: outside_io,
session_id: SessionId::EMPTY,
outside_plugins: outside_plugins.clone(),
gso_buf: None,
gso_size: 0,
};
let session_config =
wolfssl::SessionConfig::new(io).when(connection_type.is_datagram(), |s| {
Expand Down Expand Up @@ -310,6 +312,8 @@ impl<'a, AppState: Send + 'static> ServerConnectionBuilder<'a, AppState> {
io: outside_io,
session_id,
outside_plugins: outside_plugins.clone(),
gso_buf: None,
gso_size: 0,
};
let session_config =
wolfssl::SessionConfig::new(io).when(connection_type.is_datagram(), |s| {
Expand Down
Loading
Loading