From 1e4cc5e99e5bf6f7d3f7e31e7f20c406ef6932dc Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:42:02 +0800 Subject: [PATCH 1/7] core: add GSO segment fixup module Add the `gso` module to lightway-core with VirtioNetHdr definition, checksum helpers, and segment build/count functions for splitting GSO superpackets into individual segments with correct per-segment header fixups (IP ID, TCP seq, checksums). Also add tun-rs workspace dependency to lightway-core and lightway-server Cargo.toml. --- Cargo.lock | 2 + lightway-core/Cargo.toml | 1 + lightway-core/src/gso.rs | 265 +++++++++++++++++++++++++++++++++++++ lightway-core/src/lib.rs | 2 + lightway-server/Cargo.toml | 1 + 5 files changed, 271 insertions(+) create mode 100644 lightway-core/src/gso.rs diff --git a/Cargo.lock b/Cargo.lock index a19785ec..38dfc549 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1704,6 +1704,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "tun-rs", "wolfssl", ] @@ -1748,6 +1749,7 @@ dependencies = [ "tracing", "tracing-log", "tracing-subscriber", + "tun-rs", ] [[package]] diff --git a/lightway-core/Cargo.toml b/lightway-core/Cargo.toml index 42f36819..2b21c804 100644 --- a/lightway-core/Cargo.toml +++ b/lightway-core/Cargo.toml @@ -35,6 +35,7 @@ rand.workspace = true rand_core = "0.10.0" thiserror.workspace = true tracing.workspace = true +tun-rs.workspace = true wolfssl = "6.0.0" [dev-dependencies] diff --git a/lightway-core/src/gso.rs b/lightway-core/src/gso.rs new file mode 100644 index 00000000..26a9d122 --- /dev/null +++ b/lightway-core/src/gso.rs @@ -0,0 +1,265 @@ +//! GSO (Generic Segmentation Offload) segment fixup functions. +//! +//! When a GSO superpacket is processed as a single packet through +//! plugins/encoder, the individual segments need per-segment header +//! fixups (IP ID, TCP seq, checksums) before encryption and wire send. +//! +//! All functions take `&VirtioNetHdr` directly for metadata. + + +/// Virtio network header for GSO/checksum offload. +/// +/// This is a local copy of the kernel `virtio_net_hdr` structure, since +/// tun-rs defines this type internally but does not re-export it. +#[repr(C)] +#[derive(Debug, Default, Clone, Copy)] +pub struct VirtioNetHdr { + /// Flags (e.g. VIRTIO_NET_HDR_F_NEEDS_CSUM). + pub flags: u8, + /// GSO type (e.g. GSO_NONE, GSO_TCPV4, GSO_TCPV6, GSO_UDP_L4). + pub gso_type: u8, + /// Ethernet + IP + transport header length in bytes. + pub hdr_len: u16, + /// Bytes per GSO segment (payload only). + pub gso_size: u16, + /// Offset from packet start where checksum computation begins. + pub csum_start: u16, + /// Offset from csum_start to the checksum field. + pub csum_offset: u16, +} + +/// Size of the VirtioNetHdr in bytes. +pub const VIRTIO_NET_HDR_LEN: usize = std::mem::size_of::(); + +/// GSO type: not a GSO frame. +pub const VIRTIO_NET_HDR_GSO_NONE: u8 = 0; +/// Flag: checksum needs to be computed. +pub const VIRTIO_NET_HDR_F_NEEDS_CSUM: u8 = 1; + +impl VirtioNetHdr { + /// Interpret the first [`VIRTIO_NET_HDR_LEN`] bytes of `buf` as a + /// `&VirtioNetHdr` without copying. + /// + /// # Requirements + /// `buf` must be at least `VIRTIO_NET_HDR_LEN` bytes and 2-byte + /// aligned (any `Vec` or heap allocation satisfies this). + #[allow(unsafe_code)] + pub fn from_bytes(buf: &[u8]) -> std::io::Result<&Self> { + if buf.len() < VIRTIO_NET_HDR_LEN { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "buffer too short for VirtioNetHdr", + )); + } + let ptr = buf.as_ptr(); + assert!( + ptr.align_offset(std::mem::align_of::()) == 0, + "buffer is not aligned for VirtioNetHdr" + ); + // SAFETY: We verified length and alignment. VirtioNetHdr is repr(C) + // with no padding, and the returned lifetime is tied to `buf`. + unsafe { Ok(&*(ptr as *const VirtioNetHdr)) } + } +} + +/// Compute and fill the transport-layer checksum for a non-GSO packet +/// that has `VIRTIO_NET_HDR_F_NEEDS_CSUM` set. +/// +/// The kernel deposits the pseudo-header partial sum (src + dst + proto + +/// transport_len) at `[csum_start + csum_offset]` before delivering the +/// packet. We seed our sum with that value, then sum from `csum_start` +/// to end and complement. +pub fn gso_none_checksum(buf: &mut [u8], csum_start: u16, csum_offset: u16) { + let start = csum_start as usize; + let offset = csum_offset as usize; + let at = start + offset; + if at + 2 > buf.len() || start > buf.len() { + return; + } + + // Read the kernel-deposited pseudo-header partial, then zero the + // field so it doesn't double-count when we sum the segment. + let initial = read_u16(&buf[at..at + 2]) as u64; + buf[at] = 0; + buf[at + 1] = 0; + + let csum = !checksum(&buf[start..], initial); + buf[at] = (csum >> 8) as u8; + buf[at + 1] = csum as u8; +} + +fn read_u16(b: &[u8]) -> u16 { + u16::from_be_bytes([b[0], b[1]]) +} + +fn write_u16(b: &mut [u8], v: u16) { + b[..2].copy_from_slice(&v.to_be_bytes()); +} + +fn read_u32(b: &[u8]) -> u32 { + u32::from_be_bytes([b[0], b[1], b[2], b[3]]) +} + +fn write_u32(b: &mut [u8], v: u32) { + b[..4].copy_from_slice(&v.to_be_bytes()); +} + +fn checksum_no_fold(mut b: &[u8], initial: u64) -> u64 { + let mut acc = initial; + while b.len() >= 4 { + acc += read_u32(&b[..4]) as u64; + b = &b[4..]; + } + if b.len() >= 2 { + acc += read_u16(&b[..2]) as u64; + b = &b[2..]; + } + if let Some(&byte) = b.first() { + acc += (byte as u64) << 8; + } + acc +} + +fn checksum(b: &[u8], initial: u64) -> u16 { + let mut acc = checksum_no_fold(b, initial); + while acc > 0xFFFF { + acc = (acc >> 16) + (acc & 0xFFFF); + } + acc as u16 +} + +fn pseudo_header_checksum_no_fold( + protocol: u8, + src_addr: &[u8], + dst_addr: &[u8], + total_len: u16, +) -> u64 { + let sum = checksum_no_fold(src_addr, 0); + let sum = checksum_no_fold(dst_addr, sum); + let len_bytes = total_len.to_be_bytes(); + checksum_no_fold(&[0, protocol, len_bytes[0], len_bytes[1]], sum) +} + +const TCP_FLAGS_OFFSET: usize = 13; +const TCP_FLAG_FIN: u8 = 0x01; +const TCP_FLAG_PSH: u8 = 0x08; +const IPV4_SRC_ADDR_OFFSET: usize = 12; +const IPV6_SRC_ADDR_OFFSET: usize = 8; +const IPV6_FIXED_HDR_LEN: usize = 40; + +const VIRTIO_NET_HDR_GSO_TCPV4: u8 = 1; +const VIRTIO_NET_HDR_GSO_TCPV6: u8 = 4; + +fn is_v6(pkt: &[u8]) -> bool { + pkt[0] >> 4 == 6 +} + +fn is_tcp(hdr: &VirtioNetHdr) -> bool { + hdr.gso_type == VIRTIO_NET_HDR_GSO_TCPV4 || hdr.gso_type == VIRTIO_NET_HDR_GSO_TCPV6 +} + +fn addr_info(pkt: &[u8]) -> (usize, usize) { + if is_v6(pkt) { + (IPV6_SRC_ADDR_OFFSET, 16) + } else { + (IPV4_SRC_ADDR_OFFSET, 4) + } +} + +fn transport_pseudo_csum(hdr: &VirtioNetHdr, pkt: &[u8], segment_data_len: usize) -> u64 { + let (src_offset, addr_len) = addr_info(pkt); + let transport_header_len = (hdr.hdr_len - hdr.csum_start) as usize; + const IPPROTO_TCP: u8 = 6; + const IPPROTO_UDP: u8 = 17; + let protocol = if is_tcp(hdr) { + IPPROTO_TCP + } else { + IPPROTO_UDP + }; + pseudo_header_checksum_no_fold( + protocol, + &pkt[src_offset..src_offset + addr_len], + &pkt[src_offset + addr_len..src_offset + 2 * addr_len], + (transport_header_len + segment_data_len) as u16, + ) +} + +/// Number of segments in a GSO superpacket. +pub(crate) fn segment_count(hdr: &VirtioNetHdr, pkt_len: usize) -> usize { + let payload_len = pkt_len.saturating_sub(hdr.hdr_len as usize); + (payload_len + hdr.gso_size as usize - 1) / hdr.gso_size as usize +} + +/// Build segment `index` from the superpacket into `out`. +/// +/// Copies the header template + payload slice, then applies all +/// per-segment fixups (IP ID, TCP seq, checksums). +/// Returns the total byte length written to `out`. +pub(crate) fn build_segment( + hdr: &VirtioNetHdr, + pkt: &[u8], + out: &mut [u8], + index: usize, +) -> usize { + let hdr_len = hdr.hdr_len as usize; + let gso_size = hdr.gso_size as usize; + let csum_start = hdr.csum_start as usize; + let csum_offset = hdr.csum_offset as usize; + let transport_csum_at = csum_start + csum_offset; + let v6 = is_v6(pkt); + + // Compute this segment's payload range + let data_offset = hdr_len + index * gso_size; + let data_end = std::cmp::min(data_offset + gso_size, pkt.len()); + let segment_data_len = data_end - data_offset; + let total_len = hdr_len + segment_data_len; + let is_last = data_end == pkt.len(); + + // Copy header template + payload + out[..hdr_len].copy_from_slice(&pkt[..hdr_len]); + out[hdr_len..total_len].copy_from_slice(&pkt[data_offset..data_end]); + + // Clear checksums before recomputation + if !v6 { + out[10] = 0; + out[11] = 0; + } + out[transport_csum_at] = 0; + out[transport_csum_at + 1] = 0; + + // IP fixups + if !v6 { + if index > 0 { + let id = read_u16(&out[4..6]).wrapping_add(index as u16); + write_u16(&mut out[4..6], id); + } + write_u16(&mut out[2..4], total_len as u16); + let ip_csum = !checksum(&out[..csum_start], 0); + write_u16(&mut out[10..12], ip_csum); + } else { + write_u16(&mut out[4..6], (total_len - IPV6_FIXED_HDR_LEN) as u16); + } + + // Transport fixups + if is_tcp(hdr) { + let first_seq = read_u32(&pkt[csum_start + 4..]); + let seq = first_seq.wrapping_add(gso_size as u32 * index as u32); + write_u32(&mut out[csum_start + 4..csum_start + 8], seq); + if !is_last { + out[csum_start + TCP_FLAGS_OFFSET] &= !(TCP_FLAG_FIN | TCP_FLAG_PSH); + } + } else { + let transport_header_len = hdr_len - csum_start; + write_u16( + &mut out[csum_start + 4..csum_start + 6], + (transport_header_len + segment_data_len) as u16, + ); + } + + // Transport checksum + let pseudo = transport_pseudo_csum(hdr, pkt, segment_data_len); + let csum = !checksum(&out[csum_start..total_len], pseudo); + write_u16(&mut out[transport_csum_at..transport_csum_at + 2], csum); + + total_len +} diff --git a/lightway-core/src/lib.rs b/lightway-core/src/lib.rs index 229153e2..e1ffb01c 100644 --- a/lightway-core/src/lib.rs +++ b/lightway-core/src/lib.rs @@ -9,6 +9,8 @@ mod connection; mod context; mod encoding_request_states; mod features; +#[cfg(target_os = "linux")] +pub mod gso; mod io; #[cfg(feature = "postquantum")] mod keyshare; diff --git a/lightway-server/Cargo.toml b/lightway-server/Cargo.toml index c70899db..12f947c9 100644 --- a/lightway-server/Cargo.toml +++ b/lightway-server/Cargo.toml @@ -47,6 +47,7 @@ strum = { version = "0.28.0", features = ["derive"] } thiserror.workspace = true time = "0.3.29" tokio.workspace = true +tun-rs.workspace = true tokio-stream = { workspace = true, features = ["time"] } tracing.workspace = true tracing-log = "0.2.0" From 0e96076ce0e9d715f99e3d4077ff95ef94e4ed88 Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:42:22 +0800 Subject: [PATCH 2/7] core: add send_gso to OutsideIOSendCallback trait Add the `send_gso` method to the OutsideIOSendCallback trait for sending concatenated wire packets via kernel GSO (UDP_SEGMENT). Include todo!() stub implementations in client TCP/UDP, server TCP, and test harnesses to satisfy the trait contract. --- lightway-client/src/io/outside/tcp.rs | 4 ++++ lightway-client/src/io/outside/udp.rs | 4 ++++ lightway-core/src/connection/io_adapter.rs | 4 ++++ lightway-core/src/io.rs | 4 ++++ lightway-core/tests/connection.rs | 8 ++++++++ lightway-server/src/io/outside/tcp.rs | 4 ++++ lightway-server/src/io/outside/udp.rs | 4 ++++ 7 files changed, 32 insertions(+) diff --git a/lightway-client/src/io/outside/tcp.rs b/lightway-client/src/io/outside/tcp.rs index 7c78749b..d897692d 100644 --- a/lightway-client/src/io/outside/tcp.rs +++ b/lightway-client/src/io/outside/tcp.rs @@ -88,6 +88,10 @@ impl OutsideIOSendCallback for Tcp { } } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> SocketAddr { self.peer_addr() } diff --git a/lightway-client/src/io/outside/udp.rs b/lightway-client/src/io/outside/udp.rs index 76ba7495..94bee57a 100644 --- a/lightway-client/src/io/outside/udp.rs +++ b/lightway-client/src/io/outside/udp.rs @@ -202,6 +202,10 @@ impl OutsideIOSendCallback for Udp { } } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> SocketAddr { self.peer_addr() } diff --git a/lightway-core/src/connection/io_adapter.rs b/lightway-core/src/connection/io_adapter.rs index 406bc24f..b7d9f212 100644 --- a/lightway-core/src/connection/io_adapter.rs +++ b/lightway-core/src/connection/io_adapter.rs @@ -356,6 +356,10 @@ mod tests { } } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> std::net::SocketAddr { std::unreachable!("Should not be testing peer_addr"); } diff --git a/lightway-core/src/io.rs b/lightway-core/src/io.rs index c50e75a0..70e32e9d 100644 --- a/lightway-core/src/io.rs +++ b/lightway-core/src/io.rs @@ -60,6 +60,10 @@ pub trait OutsideIOSendCallback { fn disable_pmtud_probe(&self) -> std::io::Result<()> { Err(std::io::Error::other("pmtud probe not supported")) } + + /// Send concatenated wire packets via kernel GSO (`UDP_SEGMENT`). + /// `segment_size` is the wire size of each non-last packet. + fn send_gso(&self, buf: &[u8], segment_size: u16) -> IOCallbackResult; } /// Convenience type to use as function arguments diff --git a/lightway-core/tests/connection.rs b/lightway-core/tests/connection.rs index 2110dcc9..3a060741 100644 --- a/lightway-core/tests/connection.rs +++ b/lightway-core/tests/connection.rs @@ -145,6 +145,10 @@ impl OutsideIOSendCallback for TestDatagramSock { } } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> SocketAddr { todo!() } @@ -194,6 +198,10 @@ impl OutsideIOSendCallback for TestStreamSock { } } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> SocketAddr { todo!() } diff --git a/lightway-server/src/io/outside/tcp.rs b/lightway-server/src/io/outside/tcp.rs index c4a1547c..bbe30ee0 100644 --- a/lightway-server/src/io/outside/tcp.rs +++ b/lightway-server/src/io/outside/tcp.rs @@ -31,6 +31,10 @@ impl OutsideIOSendCallback for TcpStream { } } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> SocketAddr { self.peer_addr } diff --git a/lightway-server/src/io/outside/udp.rs b/lightway-server/src/io/outside/udp.rs index 7b6eebf1..c08b3572 100644 --- a/lightway-server/src/io/outside/udp.rs +++ b/lightway-server/src/io/outside/udp.rs @@ -99,6 +99,10 @@ impl OutsideIOSendCallback for UdpSocket { send_to_socket(&self.sock, buf, &peer_addr.1, self.reply_pktinfo) } + fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { + todo!() + } + fn peer_addr(&self) -> SocketAddr { self.peer_addr.read().unwrap().0 } From 406c95c7764a54303a9563610b2f79071aff8ddd Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:42:40 +0800 Subject: [PATCH 3/7] core: add GSO buffering to WolfSSL IO adapter Refactor udp_send into udp_frame + udp_send to enable reuse of wire framing logic. Add gso_buf/gso_size fields to WolfSSLIOAdapter so the wolfssl send() callback can buffer raw encrypted segments during GSO processing. Add udp_send_gso to wrap buffered segments with wire headers and send via send_gso. --- lightway-core/src/connection/builders.rs | 4 + lightway-core/src/connection/io_adapter.rs | 106 ++++++++++++++++++--- 2 files changed, 97 insertions(+), 13 deletions(-) diff --git a/lightway-core/src/connection/builders.rs b/lightway-core/src/connection/builders.rs index 2d382e32..acf2adf2 100644 --- a/lightway-core/src/connection/builders.rs +++ b/lightway-core/src/connection/builders.rs @@ -88,6 +88,8 @@ impl ClientConnectionBuilder { io: outside_io, session_id: SessionId::EMPTY, outside_plugins: outside_plugins.clone(), + gso_buf: None, + gso_size: 0, }; let session_config = wolfssl::SessionConfig::new(io).when(connection_type.is_datagram(), |s| { @@ -310,6 +312,8 @@ impl<'a, AppState: Send + 'static> ServerConnectionBuilder<'a, AppState> { io: outside_io, session_id, outside_plugins: outside_plugins.clone(), + gso_buf: None, + gso_size: 0, }; let session_config = wolfssl::SessionConfig::new(io).when(connection_type.is_datagram(), |s| { diff --git a/lightway-core/src/connection/io_adapter.rs b/lightway-core/src/connection/io_adapter.rs index b7d9f212..2f4b140c 100644 --- a/lightway-core/src/connection/io_adapter.rs +++ b/lightway-core/src/connection/io_adapter.rs @@ -101,6 +101,16 @@ pub(crate) struct WolfSSLIOAdapter { /// This buffer will be used to save the remaining data, to be sent in next call. pub(crate) send_buf: SendBuffer, + /// When `Some`, the wolfssl IO callback `send()` appends raw + /// encrypted segments here instead of sending to the socket. + /// After all segments are collected, `udp_send_gso` wraps each + /// with `wire::Header`, runs plugins, and sends via `send_gso`. + pub(crate) gso_buf: Option, + + /// Size of the first encrypted segment appended to `gso_buf`. + /// Used as the stride to split the buffer back into segments. + pub(crate) gso_size: usize, + /// Application provided object used to send data. pub(crate) io: OutsideIOSendCallbackArg, @@ -137,9 +147,13 @@ impl WolfSSLIOAdapter { } } - pub(crate) fn udp_send(&self, buf: &[u8], expresslane_data: bool) -> IOCallbackResult { - // Prepend our `wire::Header` to the data we've been asked to - // send. + /// Prepend wire header, run egress plugins. Returns `None` if + /// the plugins dropped the packet (caller should treat as success). + fn udp_frame( + &self, + buf: &[u8], + expresslane_data: bool, + ) -> Result> { let h = wire::Header { version: self.protocol_version, aggressive_mode: false, @@ -147,25 +161,27 @@ impl WolfSSLIOAdapter { session: self.session_id, }; - // Allocate max space let mut b = BytesMut::with_capacity(self.outside_mtu); h.append_to_wire(&mut b); b.extend_from_slice(buf); match self.outside_plugins.do_egress(&mut b) { - PluginResult::Accept => {} - PluginResult::Drop => { - return IOCallbackResult::Ok(buf.len()); - } - // Outside plugins cannot drop with reply - PluginResult::DropWithReply(_) => { - return IOCallbackResult::Ok(buf.len()); + PluginResult::Accept => Ok(b), + PluginResult::Drop | PluginResult::DropWithReply(_) => { + Err(IOCallbackResult::Ok(buf.len())) } PluginResult::Error(e) => { use std::io::Error; - return IOCallbackResult::Err(Error::other(e)); + Err(IOCallbackResult::Err(Error::other(e))) } } + } + + pub(crate) fn udp_send(&self, buf: &[u8], expresslane_data: bool) -> IOCallbackResult { + let b = match self.udp_frame(buf, expresslane_data) { + Ok(b) => b, + Err(r) => return r, + }; // Send header + buf. If we are in aggressive mode we send it // a total of three times. On any send error we return @@ -206,6 +222,56 @@ impl WolfSSLIOAdapter { } } + /// Take the raw encrypted segments from `gso_buf`, wrap each with + /// `wire::Header` + plugins, and send as one `sendmsg` with + /// `UDP_SEGMENT`. Clears `gso_buf` regardless of outcome. + pub(crate) fn udp_send_gso( + &mut self, + segment_count: usize, + expresslane_data: bool, + ) -> IOCallbackResult { + let Some(raw) = self.gso_buf.take() else { + return IOCallbackResult::Ok(0); + }; + let stride = self.gso_size; + self.gso_size = 0; + + if raw.is_empty() || segment_count == 0 || stride == 0 { + return IOCallbackResult::Ok(0); + } + + let mut wire_buf = BytesMut::with_capacity(segment_count * self.outside_mtu); + let mut wire_segment_size = 0; + + let mut offset = 0; + for i in 0..segment_count { + let end = if i == segment_count - 1 { + raw.len() + } else { + offset + stride + }; + let framed = match self.udp_frame(&raw[offset..end], expresslane_data) { + Ok(f) => f, + Err(IOCallbackResult::Ok(_)) => { + offset = end; + continue; + } + Err(other) => return other, + }; + if i == 0 { + wire_segment_size = framed.len(); + } + wire_buf.extend_from_slice(&framed); + offset = end; + } + + if wire_buf.is_empty() { + return IOCallbackResult::Ok(0); + } + + self.io.send_gso(&wire_buf, wire_segment_size as u16) + } + // In general, TCP send can succeed even for partial data and the caller // has to call send again with remaining data. // This api tries to hide the partial send behavior by buffering it. @@ -292,7 +358,19 @@ impl wolfssl::IOCallbacks for WolfSSLIOAdapter { fn send(&mut self, buf: &[u8]) -> IOCallbackResult { match self.connection_type { ConnectionType::Stream => self.tcp_send(buf), - ConnectionType::Datagram => self.udp_send(buf, false), + ConnectionType::Datagram => { + if let Some(gso_buf) = self.gso_buf.as_mut() { + // GSO: buffer raw encrypted data; udp_send_gso + // will wrap each segment with wire::Header later. + if gso_buf.is_empty() { + self.gso_size = buf.len(); + } + gso_buf.extend_from_slice(buf); + IOCallbackResult::Ok(buf.len()) + } else { + self.udp_send(buf, false) + } + } } } } @@ -380,6 +458,8 @@ mod tests { io, session_id: SessionId::from_const(*b"\xde\xad\xbe\xef\xde\xad\xbe\xef"), outside_plugins: outside_plugins.into(), + gso_buf: None, + gso_size: 0, } } From 595acd1cca1d50de8d0e19e792d588d633248253 Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:43:09 +0800 Subject: [PATCH 4/7] core: add inside_data_received_gso for GSO superpacket processing Add inside_data_received_gso and send_to_outside_gso methods to Connection. These process a GSO superpacket as a single packet through plugins/encoder, then split into per-segment encrypted frames and collect into a wire buffer for batch send via UDP_SEGMENT. --- lightway-core/src/connection.rs | 148 +++++++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 1 deletion(-) diff --git a/lightway-core/src/connection.rs b/lightway-core/src/connection.rs index 0b522109..feb459e1 100644 --- a/lightway-core/src/connection.rs +++ b/lightway-core/src/connection.rs @@ -988,6 +988,142 @@ impl Connection { } } + /// Process a GSO superpacket as a single packet through the pipeline. + /// + /// Unlike `inside_data_received`, this skips the MTU check (the + /// superpacket is intentionally oversized) and `tcp_clamp_mss` + /// (GSO packets are never SYN). Plugins and encoder see the whole + /// superpacket as one packet. + #[cfg(target_os = "linux")] + pub fn inside_data_received_gso( + &mut self, + pkt: &mut BytesMut, + hdr: &crate::gso::VirtioNetHdr, + ) -> ConnectionResult<()> { + use ConnectionError::InvalidInsidePacket; + use InvalidPacketError::InvalidIpv4Packet; + + if matches!(self.state, State::Disconnected) { + return Err(ConnectionError::Disconnected); + } + + if !matches!(self.state, State::Online) { + return Err(ConnectionError::InvalidState); + } + + let Some(inside_io) = &self.inside_io else { + return Err(ConnectionError::InvalidState); + }; + let mtu = inside_io.mtu(); + + // No MTU check — GSO superpacket is intentionally oversized + if !ipv4_is_valid_packet(pkt.as_ref()) { + return Err(InvalidInsidePacket(InvalidIpv4Packet)); + } + + let _ = self.rotate_expresslane_key(); + + // No tcp_clamp_mss — GSO packets are never SYN + + // Plugins see the whole superpacket as one packet + match self.inside_plugins.do_ingress(pkt) { + PluginResult::Accept => {} + PluginResult::Drop => { + return Ok(()); + } + PluginResult::DropWithReply(b) => { + return Err(ConnectionError::PluginDropWithReply(b)); + } + PluginResult::Error(e) => { + return Err(ConnectionError::PluginError(e)); + } + } + + // Encoder sees the whole superpacket + if let Some(encoder) = &mut self.inside_pkt_encoder { + let codec_state = encoder.store(pkt); + match codec_state { + Ok(CodecStatus::PacketAccepted) => return Ok(()), + Ok(CodecStatus::SkipPacket) => {} + Err(e) => return Err(ConnectionError::PacketCodecError(e)), + } + } + + self.send_to_outside_gso(pkt, hdr, mtu) + } + + /// Split a GSO superpacket into segments, encrypt each, and send + /// the entire wire buffer as one `sendmsg` with `UDP_SEGMENT`. + /// + /// For expresslane: encrypts with AES-GCM, frames via `udp_frame`, + /// collects into wire buffer. + /// + /// For DTLS: `send_frame_or_drop` triggers wolfssl `try_write` which + /// calls the IO callback `send()`. The IO callback detects `gso_buf` + /// and appends framed wire packets there instead of sending. + #[cfg(target_os = "linux")] + fn send_to_outside_gso( + &mut self, + pkt: &mut BytesMut, + hdr: &crate::gso::VirtioNetHdr, + mtu: usize, + ) -> ConnectionResult<()> { + use crate::gso; + + let n = gso::segment_count(hdr, pkt.len()); + if n == 0 { + return Ok(()); + } + + let expresslane = self.expresslane_ready(); + + // One reusable segment buffer: zero-init to `mtu` once so each + // iteration can `set_len(mtu)` (capacity bytes are initialized) + // before `build_segment` overwrites them. + let mut segment = BytesMut::zeroed(mtu); + + // Enable GSO wire buffer — IO callback will append here. + // Both DTLS and expresslane paths detect this and append + // encrypted segments instead of sending immediately. + self.session + .io_cb_mut() + .gso_buf = Some(BytesMut::with_capacity(n * self.outside_mtu)); + + let mut result = Ok(()); + + for i in 0..n { + // Build segment in-place into `segment`. `set_len(mtu)` is + // sound because all `mtu` bytes were initialized at alloc + // (and only ever overwritten or shrunk thereafter). + // SAFETY: capacity ≥ mtu and bytes 0..mtu are initialized. + #[allow(unsafe_code)] + unsafe { + segment.set_len(mtu); + } + let total_len = gso::build_segment(hdr, pkt.as_ref(), &mut segment, i); + // SAFETY: build_segment wrote 0..total_len; total_len ≤ mtu. + #[allow(unsafe_code)] + unsafe { + segment.set_len(total_len); + } + + if let Err(e) = self.send_outside_data(&mut segment, false) { + result = Err(e); + break; + } + } + + self.activity.last_data_traffic_from_peer = Instant::now(); + + if result.is_ok() { + let _ = self.session.io_cb_mut().udp_send_gso(n, expresslane); + } else { + self.session.io_cb_mut().gso_buf.take(); + } + + result + } + /// Send a packet to the outside pub fn send_to_outside( &mut self, @@ -1874,7 +2010,17 @@ impl Connection { .append_to_wire(&mut buf, session_id, data.as_ref(), iv, is_encoded); self.activity.last_data_traffic_from_peer = Instant::now(); - match self.session.io_cb().udp_send(buf.as_ref(), true) { + // GSO path: buffer encrypted data for batch send + let io = self.session.io_cb_mut(); + if let Some(gso_buf) = io.gso_buf.as_mut() { + if gso_buf.is_empty() { + io.gso_size = buf.len(); + } + gso_buf.extend_from_slice(&buf); + return Ok(()); + } + + match io.udp_send(buf.as_ref(), true) { IOCallbackResult::Ok(_) => ConnectionResult::Ok(()), IOCallbackResult::WouldBlock => ConnectionResult::Ok(()), IOCallbackResult::Err(_e) => ConnectionResult::Err(ConnectionError::AccessDenied), From 8ec7da552d36c1fe5d1a80360902d10d626106a1 Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:43:43 +0800 Subject: [PATCH 5/7] app-utils: add TUN offload (IFF_VNET_HDR) and GSO recv support Add offload config field to TunConfig to enable IFF_VNET_HDR on TUN devices. Add recv_gso for raw reads that include the virtio_net_hdr prefix, and prepend a zeroed virtio header on try_send when offload is enabled. --- lightway-app-utils/src/tun.rs | 55 ++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/lightway-app-utils/src/tun.rs b/lightway-app-utils/src/tun.rs index ba67b172..489a9bc8 100644 --- a/lightway-app-utils/src/tun.rs +++ b/lightway-app-utils/src/tun.rs @@ -49,6 +49,11 @@ pub struct TunConfig { #[cfg(unix)] #[educe(Default = true)] pub close_fd_on_drop: bool, + /// Enable TUN offload (`IFF_VNET_HDR`) so reads/writes carry a + /// `virtio_net_hdr` and the kernel performs GRO/GSO across the + /// device. Required for the GSO inside-IO path. + #[cfg(target_os = "linux")] + pub offload: bool, #[cfg(windows)] /// Optional wintun file path for Windows TUN interfaces pub wintun_file: Option, @@ -206,6 +211,10 @@ impl TunConfig { { builder = builder.associate_route(false); } + #[cfg(target_os = "linux")] + if self.offload { + builder = builder.offload(true); + } let device = builder.build_async()?; if let Some(mtu) = self.mtu { @@ -307,6 +316,16 @@ impl Tun { } } + /// Raw read from `Tun`, returning the full virtio frame (header + payload). + #[cfg(target_os = "linux")] + pub async fn recv_gso(&self, buf: &mut [u8]) -> IOCallbackResult { + match self { + Tun::Direct(t) => t.recv_gso(buf).await, + #[cfg(feature = "io-uring")] + Tun::IoUring(_) => todo!(), + } + } + /// Send a packet to `Tun` pub fn try_send(&self, buf: BytesMut) -> IOCallbackResult { match self { @@ -363,6 +382,10 @@ pub struct TunDirect { fd: RawFd, #[cfg(unix)] close_fd_on_drop: bool, + /// `IFF_VNET_HDR` enabled — sends must be prefixed with a 12-byte + /// `virtio_net_hdr`, reads include it. + #[cfg(target_os = "linux")] + vnet_hdr: bool, } impl TunDirect { @@ -385,6 +408,8 @@ impl TunDirect { fd, #[cfg(unix)] close_fd_on_drop: config.close_fd_on_drop, + #[cfg(target_os = "linux")] + vnet_hdr: config.offload, }) } @@ -406,10 +431,38 @@ impl TunDirect { } } + /// Raw read from Tun, returning the full virtio frame (header + payload). + #[cfg(target_os = "linux")] + pub async fn recv_gso(&self, buf: &mut [u8]) -> IOCallbackResult { + let tun = self.tun.as_ref().unwrap(); + match tun.recv(buf).await { + Ok(0) => IOCallbackResult::WouldBlock, + Ok(n) => IOCallbackResult::Ok(n), + Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => { + IOCallbackResult::WouldBlock + } + Err(err) => IOCallbackResult::Err(err), + } + } + /// Try write from Tun pub fn try_send(&self, buf: BytesMut) -> IOCallbackResult { let tun = self.tun.as_ref().unwrap(); - match tun.try_send(&buf[..]) { + #[cfg(target_os = "linux")] + let res = if self.vnet_hdr { + // IFF_VNET_HDR requires a zeroed `virtio_net_hdr` prefix + // on every write (NEEDS_CSUM=0, GSO_NONE). + let hdr_len = tun_rs::VIRTIO_NET_HDR_LEN; + let mut prefixed = bytes::BytesMut::zeroed(hdr_len); + prefixed.extend_from_slice(&buf[..]); + tun.try_send(&prefixed[..]) + .map(|n| n.saturating_sub(hdr_len)) + } else { + tun.try_send(&buf[..]) + }; + #[cfg(not(target_os = "linux"))] + let res = tun.try_send(&buf[..]); + match res { Ok(nr) => IOCallbackResult::Ok(nr), Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => { IOCallbackResult::WouldBlock From 6d41fe0e16663c9f19b20f53859fa21f2919a032 Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:44:30 +0800 Subject: [PATCH 6/7] server: implement GSO send via UDP_SEGMENT Extend send_to_socket to accept an optional gso_size parameter and build UDP_SEGMENT cmsg for kernel-level segmentation. Implement the real send_gso on UdpSocket using this path. --- lightway-server/src/io/outside/udp.rs | 54 +++++++++++++++++++-------- 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/lightway-server/src/io/outside/udp.rs b/lightway-server/src/io/outside/udp.rs index c08b3572..ef0865b7 100644 --- a/lightway-server/src/io/outside/udp.rs +++ b/lightway-server/src/io/outside/udp.rs @@ -51,26 +51,43 @@ fn send_to_socket( buf: &[u8], peer_addr: &SockAddr, pktinfo: Option, + gso_size: Option, ) -> IOCallbackResult { + #[cfg(target_vendor = "apple")] + const IP_PKTINFO_LEVEL: libc::c_int = libc::IPPROTO_IP; + #[cfg(not(target_vendor = "apple"))] + const IP_PKTINFO_LEVEL: libc::c_int = libc::SOL_IP; + + const CMSG_SIZE: usize = + cmsg::Message::space::() + cmsg::Message::space::(); + let res = sock.try_io(Interest::WRITABLE, || { let sock = SockRef::from(sock.as_ref()); let bufs = [std::io::IoSlice::new(buf)]; - - let msghdr = MsgHdr::new().with_addr(peer_addr).with_buffers(&bufs); - - const CMSG_SIZE: usize = cmsg::Message::space::(); let mut cmsg = cmsg::BufferMut::::zeroed(); - let msghdr = if let Some(pktinfo) = pktinfo { + // Track actual used bytes so we don't pass trailing zeroes + // that the kernel would interpret as a malformed cmsg header. + let mut cmsg_len: usize = 0; + if pktinfo.is_some() || gso_size.is_some() { let mut builder = cmsg.builder(); - #[cfg(target_vendor = "apple")] - let (cmsg_level, cmsg_type) = (libc::IPPROTO_IP, libc::IP_PKTINFO); - #[cfg(not(target_vendor = "apple"))] - let (cmsg_level, cmsg_type) = (libc::SOL_IP, libc::IP_PKTINFO); + if let Some(pi) = pktinfo { + builder.fill_next(IP_PKTINFO_LEVEL, libc::IP_PKTINFO, pi)?; + cmsg_len += cmsg::Message::space::(); + } + #[cfg(target_os = "linux")] + if let Some(size) = gso_size { + builder.fill_next(libc::SOL_UDP, libc::UDP_SEGMENT, size)?; + cmsg_len += cmsg::Message::space::(); + } + } - builder.fill_next(cmsg_level, cmsg_type, pktinfo)?; + let msghdr = MsgHdr::new() + .with_addr(peer_addr) + .with_buffers(&bufs); - msghdr.with_control(cmsg.as_ref()) + let msghdr = if cmsg_len > 0 { + msghdr.with_control(&cmsg.as_ref()[..cmsg_len]) } else { msghdr }; @@ -96,11 +113,18 @@ struct UdpSocket { impl OutsideIOSendCallback for UdpSocket { fn send(&self, buf: &[u8]) -> IOCallbackResult { let peer_addr = self.peer_addr.read().unwrap(); - send_to_socket(&self.sock, buf, &peer_addr.1, self.reply_pktinfo) + send_to_socket(&self.sock, buf, &peer_addr.1, self.reply_pktinfo, None) } - fn send_gso(&self, _buf: &[u8], _segment_size: u16) -> IOCallbackResult { - todo!() + fn send_gso(&self, buf: &[u8], segment_size: u16) -> IOCallbackResult { + let peer_addr = self.peer_addr.read().unwrap(); + send_to_socket( + &self.sock, + buf, + &peer_addr.1, + self.reply_pktinfo, + Some(segment_size), + ) } fn peer_addr(&self) -> SocketAddr { @@ -272,7 +296,7 @@ impl UdpServer { msg.append_to_wire(&mut buf); // Ignore failure to send. - let _ = send_to_socket(&self.sock, &buf, &peer_addr, reply_pktinfo); + let _ = send_to_socket(&self.sock, &buf, &peer_addr, reply_pktinfo, None); } } From ed9e4752d308b15ebba96a153dffbc1c0194d12a Mon Sep 17 00:00:00 2001 From: Samuel Tam Date: Mon, 4 May 2026 18:45:01 +0800 Subject: [PATCH 7/7] server: integrate GSO inside IO loop and configuration Add enable_tun_offload config option and wire it through ServerConfig to main. Extract the default inside IO loop into its own function and add inside_io_loop_gso that reads virtio-framed superpackets from TUN, dispatches GSO vs single-packet paths, and sets gso_max_size on the TUN device. --- lightway-server/src/config.rs | 7 + lightway-server/src/connection.rs | 2 + lightway-server/src/io/inside.rs | 4 + lightway-server/src/io/inside/tun.rs | 16 ++ lightway-server/src/lib.rs | 234 +++++++++++++++++++++++---- lightway-server/src/main.rs | 2 + 6 files changed, 232 insertions(+), 33 deletions(-) diff --git a/lightway-server/src/config.rs b/lightway-server/src/config.rs index e24d519d..e6977d3f 100644 --- a/lightway-server/src/config.rs +++ b/lightway-server/src/config.rs @@ -88,6 +88,12 @@ pub struct Config { #[patch(attribute(doc = "Enable Post Quantum Crypto"))] pub enable_pqc: bool, + #[patch(attribute(clap(long)))] + #[patch(empty_value = false)] + #[patch(attribute(serde(default)))] + #[patch(attribute(doc = "Enable TUN offload (GRO/GSO) for batch packet processing"))] + pub enable_tun_offload: bool, + #[patch(attribute(clap(long)))] #[patch(empty_value = false)] #[patch(attribute(serde(default)))] @@ -171,6 +177,7 @@ impl Default for Config { lightway_dns_ip: Ipv4Addr::new(10, 125, 0, 1), enable_expresslane: false, enable_pqc: false, + enable_tun_offload: false, enable_tun_iouring: false, iouring_entry_count: 1024, iouring_sqpoll_idle_time: Duration::from_std_duration(StdDuration::from_millis(100)), diff --git a/lightway-server/src/connection.rs b/lightway-server/src/connection.rs index d54d9988..5fb22cb1 100644 --- a/lightway-server/src/connection.rs +++ b/lightway-server/src/connection.rs @@ -113,6 +113,8 @@ impl Connection { pub fn outside_data_received(&self, buf: OutsidePacket) -> ConnectionResult; pub fn inside_data_received(&self, pkt: &mut BytesMut) -> ConnectionResult<()>; + #[cfg(target_os = "linux")] + pub fn inside_data_received_gso(&self, pkt: &mut BytesMut, hdr: &lightway_core::gso::VirtioNetHdr) -> ConnectionResult<()>; } } diff --git a/lightway-server/src/io/inside.rs b/lightway-server/src/io/inside.rs index 600343f4..972aec71 100644 --- a/lightway-server/src/io/inside.rs +++ b/lightway-server/src/io/inside.rs @@ -11,6 +11,10 @@ use std::sync::Arc; pub trait InsideIORecv: Sync + Send { async fn recv_buf(&self, buf: &mut bytes::BytesMut) -> IOCallbackResult; + /// Raw read from inside IO, returning the full virtio frame (header + payload). + #[cfg(target_os = "linux")] + async fn recv_gso(&self, buf: &mut [u8]) -> IOCallbackResult; + fn into_io_send_callback(self: Arc) -> InsideIOSendCallbackArg; } diff --git a/lightway-server/src/io/inside/tun.rs b/lightway-server/src/io/inside/tun.rs index 70b8fc57..db4bb259 100644 --- a/lightway-server/src/io/inside/tun.rs +++ b/lightway-server/src/io/inside/tun.rs @@ -33,6 +33,11 @@ impl Tun { let tun = AppUtilsTun::iouring(tun, ring_size, sqpoll_idle_time).await?; Ok(Tun(tun)) } + + /// Get the interface name of the underlying TUN device. + pub fn name(&self) -> std::io::Result { + self.0.name() + } } impl AsRawFd for Tun { @@ -53,6 +58,17 @@ impl InsideIORecv for Tun { } } + #[cfg(target_os = "linux")] + async fn recv_gso(&self, buf: &mut [u8]) -> IOCallbackResult { + match self.0.recv_gso(buf).await { + IOCallbackResult::Ok(n) => { + metrics::tun_to_client(n); + IOCallbackResult::Ok(n) + } + e => e, + } + } + fn into_io_send_callback(self: Arc) -> InsideIOSendCallbackArg { self } diff --git a/lightway-server/src/lib.rs b/lightway-server/src/lib.rs index c657df00..fc335ede 100644 --- a/lightway-server/src/lib.rs +++ b/lightway-server/src/lib.rs @@ -5,6 +5,8 @@ mod ip_manager; pub mod metrics; mod statistics; +#[cfg(target_os = "linux")] +use bytes::Buf; use bytesize::ByteSize; use connection::Connection; // re-export so server app does not need to depend on lightway-core @@ -194,6 +196,10 @@ pub struct ServerConfig ServerAuth>> { /// Enable Post Quantum Crypto pub enable_pqc: bool, + /// Enable TUN offload (GRO/GSO) for batch packet processing + #[cfg(target_os = "linux")] + pub enable_tun_offload: bool, + #[cfg(feature = "io-uring")] /// Enable IO-uring interface for Tunnel pub enable_tun_iouring: bool, @@ -258,6 +264,163 @@ pub(crate) fn handle_inside_io_error(conn: Arc, result: ConnectionRe } } +async fn inside_io_loop_default( + inside_io: Arc, + ip_manager: Arc>>, + lightway_client_ip: Ipv4Addr, +) -> anyhow::Result<()> { + let mtu = inside_io.mtu(); + let mut buf = BytesMut::with_capacity(mtu); + loop { + buf.clear(); + buf.resize(mtu, 0); + match inside_io.recv_buf(&mut buf).await { + IOCallbackResult::Ok(_n) => {} + IOCallbackResult::WouldBlock => continue, + IOCallbackResult::Err(err) => { + break Err(anyhow!(err).context("InsideIO recv buf error")); + } + }; + + let packet = Ipv4Packet::new(buf.as_ref()); + let Some(packet) = packet else { + eprintln!("Invalid inside packet size (less than Ipv4 header)!"); + continue; + }; + let conn = ip_manager.find_connection(packet.get_destination()); + + ipv4_update_destination(buf.as_mut(), lightway_client_ip); + + if let Some(conn) = conn { + let result = conn.inside_data_received(&mut buf); + handle_inside_io_error(conn, result); + } else { + metrics::tun_rejected_packet_no_connection(); + } + } +} + +#[cfg(target_os = "linux")] +async fn inside_io_loop_gso( + inside_io: Arc, + ip_manager: Arc>>, + lightway_client_ip: Ipv4Addr, +) -> anyhow::Result<()> { + use lightway_core::gso::{ + VIRTIO_NET_HDR_F_NEEDS_CSUM, VIRTIO_NET_HDR_GSO_NONE, VIRTIO_NET_HDR_LEN, VirtioNetHdr, + gso_none_checksum, + }; + + // Receive buffer reused across iterations. Allocate once and + // recv directly into it (no per-packet `BytesMut::from(&...)` + // copy + alloc). + // + // Mental model: `BytesMut` is a (ptr, len, cap) *window* into a + // backing slab. `cap` is the distance from `ptr` to the end of + // the slab — NOT the slab size. Every `advance(N)` below slides + // `ptr += N` and shrinks `cap -= N`; the slab itself doesn't + // change. Without intervention, the window crawls toward the + // end of the slab and `cap` decays. + // + // `pkt.reserve(initial_cap)` below is the "slide back" call: if + // the window can already hold `initial_cap` more bytes after + // `len`, it's a free no-op; otherwise BytesMut compacts the + // window back to the start of the slab (with `len = 0` after + // `clear()`, this is just a pointer reset — no memcpy). + let initial_cap = VIRTIO_NET_HDR_LEN + 65535; + let mut pkt = bytes::BytesMut::zeroed(initial_cap); + + loop { + // Reset the window to start of slab (cheap; no-op while still + // at slab start, pointer-only reset after `advance()` has + // drifted us). + pkt.reserve(initial_cap); + + // Expose the full slab to `recv_gso` as `&mut [u8]`. + // SAFETY: every byte of the slab was zero-initialized at + // construction; subsequent iters only ever shrunk `len` or + // overwrote bytes. We never hand out uninitialized memory. + #[allow(unsafe_code)] + unsafe { + pkt.set_len(pkt.capacity()); + } + + let len = match inside_io.recv_gso(pkt.as_mut()).await { + IOCallbackResult::Ok(n) => n, + IOCallbackResult::WouldBlock => continue, + IOCallbackResult::Err(err) => { + break Err(anyhow!(err).context("InsideIO recv gso error")); + } + }; + + // SAFETY: `recv_gso` wrote `len` bytes; `len ≤ pkt.capacity()`. + #[allow(unsafe_code)] + unsafe { + pkt.set_len(len); + } + + if len <= VIRTIO_NET_HDR_LEN { + tracing::warn!("TUN read too short ({len} <= {VIRTIO_NET_HDR_LEN})"); + pkt.clear(); + continue; + } + + let hdr = match VirtioNetHdr::from_bytes(&pkt[..VIRTIO_NET_HDR_LEN]) { + Ok(hdr) => *hdr, + Err(err) => { + tracing::warn!("Failed to decode virtio header: {err}"); + pkt.clear(); + continue; + } + }; + // Strip the virtio header — `pkt` is now the IP payload. + pkt.advance(VIRTIO_NET_HDR_LEN); + + if hdr.gso_type == VIRTIO_NET_HDR_GSO_NONE { + // Single packet fast path + if hdr.flags & VIRTIO_NET_HDR_F_NEEDS_CSUM != 0 { + gso_none_checksum(pkt.as_mut(), hdr.csum_start, hdr.csum_offset); + } + + let packet = Ipv4Packet::new(pkt.as_ref()); + let Some(packet) = packet else { + pkt.clear(); + continue; + }; + let conn = ip_manager.find_connection(packet.get_destination()); + + ipv4_update_destination(pkt.as_mut(), lightway_client_ip); + + if let Some(conn) = conn { + let result = conn.inside_data_received(&mut pkt); + handle_inside_io_error(conn, result); + } else { + metrics::tun_rejected_packet_no_connection(); + } + } else { + // GSO superpacket path — process as one big packet + let packet = Ipv4Packet::new(pkt.as_ref()); + let Some(packet) = packet else { + pkt.clear(); + continue; + }; + let conn = ip_manager.find_connection(packet.get_destination()); + + // Rewrite destination on the superpacket before processing + ipv4_update_destination(pkt.as_mut(), lightway_client_ip); + + if let Some(conn) = conn { + let result = conn.inside_data_received_gso(&mut pkt, &hdr); + handle_inside_io_error(conn, result); + } else { + metrics::tun_rejected_packet_no_connection(); + } + } + + pkt.clear(); + } +} + pub async fn server ServerAuth> + Sync + Send + 'static>( mut config: ServerConfig, ) -> Result<()> { @@ -302,6 +465,10 @@ pub async fn server ServerAuth> + Sync + Send + 'stati Some(io) => io, None => { use io::inside::Tun; + #[cfg(target_os = "linux")] + if config.enable_tun_offload { + config.tun_config.offload = true; + } #[cfg(not(feature = "io-uring"))] let tun = Tun::new(&config.tun_config).await; #[cfg(feature = "io-uring")] @@ -318,6 +485,18 @@ pub async fn server ServerAuth> + Sync + Send + 'stati let tun = tun.context("Tun creation")?; + #[cfg(target_os = "linux")] + if config.enable_tun_offload { + // Cap gso_max_size so the wire superframe stays within 65535 + // after per-segment overhead: 65535 - 64 * 56 = 61951 + const MAX_GSO_BUF_SIZE: u32 = 61951; + if let Ok(name) = tun.name() { + let _ = std::process::Command::new("ip") + .args(["link", "set", &name, "gso_max_size", &MAX_GSO_BUF_SIZE.to_string()]) + .output(); + } + } + Arc::new(tun) } }; @@ -369,39 +548,28 @@ pub async fn server ServerAuth> + Sync + Send + 'stati ), }; - let inside_io_loop: JoinHandle> = tokio::spawn(async move { - let mtu = inside_io.mtu(); - let mut buf = BytesMut::with_capacity(mtu); - loop { - buf.clear(); - buf.resize(mtu, 0); - match inside_io.recv_buf(&mut buf).await { - IOCallbackResult::Ok(_n) => {} - IOCallbackResult::WouldBlock => continue, // Spuriously failed to read, keep waiting - IOCallbackResult::Err(err) => { - break Err(anyhow!(err).context("InsideIO recv buf error")); - } - }; - - // Find connection based on client ip (dest ip) and forward packet - let packet = Ipv4Packet::new(buf.as_ref()); - let Some(packet) = packet else { - eprintln!("Invalid inside packet size (less than Ipv4 header)!"); - continue; - }; - let conn = ip_manager.find_connection(packet.get_destination()); - - // Update destination IP address to client's ip - ipv4_update_destination(buf.as_mut(), config.lightway_client_ip); - - if let Some(conn) = conn { - let result = conn.inside_data_received(&mut buf); - handle_inside_io_error(conn, result); - } else { - metrics::tun_rejected_packet_no_connection(); - } + let inside_io_loop: JoinHandle> = { + #[cfg(target_os = "linux")] + if config.enable_tun_offload { + tokio::spawn(inside_io_loop_gso( + inside_io, + ip_manager.clone(), + config.lightway_client_ip, + )) + } else { + tokio::spawn(inside_io_loop_default( + inside_io, + ip_manager.clone(), + config.lightway_client_ip, + )) } - }); + #[cfg(not(target_os = "linux"))] + tokio::spawn(inside_io_loop_default( + inside_io, + ip_manager.clone(), + config.lightway_client_ip, + )) + }; let (ctrlc_tx, ctrlc_rx) = tokio::sync::oneshot::channel(); @@ -434,7 +602,7 @@ pub async fn server ServerAuth> + Sync + Send + 'stati tokio::select! { err = server.run() => err.context("Outside IO loop exited"), - io = inside_io_loop => io.map_err(|e| anyhow!(e).context("Inside IO loop panicked"))?.context("Inside IO loop exited"), + io = inside_io_loop => io.map_err(|e| anyhow!(e).context("Inside IO loop panicked"))?.context("Inside IO loop exited"), _ = ctrlc_rx => { info!("Sigterm or Sigint received"); conn_manager.shutdown(); diff --git a/lightway-server/src/main.rs b/lightway-server/src/main.rs index 4b403d40..289a3f2e 100644 --- a/lightway-server/src/main.rs +++ b/lightway-server/src/main.rs @@ -175,6 +175,8 @@ async fn main() -> Result<()> { expresslane_metrics: None, event_cb: None, enable_pqc: config.enable_pqc, + #[cfg(target_os = "linux")] + enable_tun_offload: config.enable_tun_offload, #[cfg(feature = "io-uring")] enable_tun_iouring: config.enable_tun_iouring, #[cfg(feature = "io-uring")]