Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,23 @@
cargo +nightly rustdoc -p volo-thrift --all-features --config 'build.rustdocflags=["--cfg", "docsrs"]' -- --deny warnings

test-linux:
runs-on: [self-hosted, Linux, amd64]
runs-on: ubuntu-latest

strategy:
matrix:
rust: [nightly, stable]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@master
with:
components: rustfmt,clippy
toolchain: ${{matrix.rust}}
- name: Run tests
run: |
sudo bash scripts/install-linux-dependencies.sh
bash scripts/clippy-and-test.sh

test-linux-aarch64:

Check warning

Code scanning / CodeQL

Workflow does not contain permissions Medium

Actions job or workflow does not limit the permissions of the GITHUB_TOKEN. Consider setting an explicit permissions block, using the following as a minimal starting point: {contents: read}
runs-on: ubuntu-24.04-arm

strategy:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
.idea
target
/benchmark/output
.DS_Store
.DS_Store
.trae
1 change: 1 addition & 0 deletions scripts/clippy-and-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ run_clippy() {

run_test() {
echo_command cargo test -p volo-thrift
echo_command cargo test -p volo-thrift --features shmipc
echo_command cargo test -p volo-grpc --features rustls
echo_command cargo test -p volo-http --features client,server,http1,query,form,json,tls,cookie,multipart,ws
echo_command cargo test -p volo-http --features client,server,http2,query,form,json,tls,cookie,multipart,ws
Expand Down
262 changes: 259 additions & 3 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,26 @@ impl<D: ZeroCopyDecoder, R: AsyncRead + AsyncExt + Unpin + Send + Sync + 'static
&mut self,
cx: &mut Cx,
) -> Result<Option<ThriftMessage<Msg>>, ThriftException> {
// just to check if we have reached EOF
if self.reader.fill_buf().await?.is_empty() {
let buf = match self.reader.fill_buf().await {
Ok(buf) => buf,
Err(e) => {
#[cfg(feature = "shmipc")]
{
if e.kind() == std::io::ErrorKind::UnexpectedEof
&& self.shmipc_helper().available()
{
tracing::trace!(
"[VOLO] thrift codec decode message EOF (shmipc), rpcinfo: {:?}",
cx.rpc_info()
);
return Ok(None);
}
}
return Err(e.into());
}
};

if buf.is_empty() {
tracing::trace!(
"[VOLO] thrift codec decode message EOF, rpcinfo: {:?}",
cx.rpc_info()
Expand Down Expand Up @@ -325,12 +343,250 @@ where

#[cfg(test)]
mod tests {
use super::DefaultMakeCodec;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

use bytes::Bytes;
use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
use volo::context::RpcInfo;

use super::*;
use crate::ThriftMessage;

#[test]
fn test_mk_codec() {
let _framed = DefaultMakeCodec::framed();
let _ttheader_framed = DefaultMakeCodec::ttheader_framed();
let _buffered = DefaultMakeCodec::buffered();
}

struct MockReader {
eof_behavior: EofBehavior,
#[cfg(feature = "shmipc")]
shmipc_stream: Option<volo::net::shmipc::Stream>,
}

enum EofBehavior {
EmptyBuffer,
UnexpectedEof,
OtherError,
}

impl AsyncRead for MockReader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
_buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.eof_behavior {
EofBehavior::EmptyBuffer => Poll::Ready(Ok(())),
EofBehavior::UnexpectedEof => Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected eof",
))),
EofBehavior::OtherError => Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionReset,
"connection reset",
))),
}
}
}

impl AsyncBufRead for MockReader {
fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
match self.eof_behavior {
EofBehavior::EmptyBuffer => Poll::Ready(Ok(&[])),
EofBehavior::UnexpectedEof => Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected eof",
))),
EofBehavior::OtherError => Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionReset,
"connection reset",
))),
}
}

fn consume(self: Pin<&mut Self>, _amt: usize) {}
}

impl volo::net::ext::AsyncExt for MockReader {
async fn ready(&self, _interest: tokio::io::Interest) -> io::Result<tokio::io::Ready> {
Ok(tokio::io::Ready::READABLE | tokio::io::Ready::WRITABLE)
}

#[cfg(feature = "shmipc")]
fn shmipc_helper(&self) -> volo::net::shmipc::ShmipcHelper {
if let Some(stream) = &self.shmipc_stream {
stream.helper()
} else {
volo::net::shmipc::ShmipcHelper::none()
}
}
}

#[tokio::test]
async fn test_decode_empty_buffer_returns_none() {
let reader = MockReader {
eof_behavior: EofBehavior::EmptyBuffer,
#[cfg(feature = "shmipc")]
shmipc_stream: None,
};
let mut decoder = DefaultDecoder {
decoder: thrift::MakeThriftCodec::default().make_codec().1,
reader: BufReader::new(reader),
};

let mut cx = crate::context::ClientContext::new(
1,
RpcInfo::with_role(volo::context::Role::Client),
pilota::thrift::TMessageType::Call,
);

let result: Result<Option<ThriftMessage<Bytes>>, _> = decoder.decode(&mut cx).await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}

#[tokio::test]
async fn test_decode_unexpected_eof_returns_error() {
let reader = MockReader {
eof_behavior: EofBehavior::UnexpectedEof,
#[cfg(feature = "shmipc")]
shmipc_stream: None,
};
let mut decoder = DefaultDecoder {
decoder: thrift::MakeThriftCodec::default().make_codec().1,
reader: BufReader::new(reader),
};

let mut cx = crate::context::ClientContext::new(
1,
RpcInfo::with_role(volo::context::Role::Client),
pilota::thrift::TMessageType::Call,
);

let result: Result<Option<ThriftMessage<Bytes>>, _> = decoder.decode(&mut cx).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("unexpected eof"));
}

#[cfg(feature = "shmipc")]
struct ShmipcTestEnv {
path: std::path::PathBuf,
}

#[cfg(feature = "shmipc")]
impl ShmipcTestEnv {
async fn new() -> (Self, volo::net::shmipc::Stream) {
use std::{
os::unix::net::SocketAddr,
sync::atomic::{AtomicUsize, Ordering},
};

use motore::service::UnaryService;
use volo::net::shmipc::{
Listener,
addr::{Address, ShmipcMakeTransport},
};

static COUNTER: AtomicUsize = AtomicUsize::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);

let dir = std::env::temp_dir();
let path = dir.join(format!(
"volo_shmipc_test_{}_{}.sock",
std::process::id(),
id
));
let _ = std::fs::remove_file(&path);

let addr_val = SocketAddr::from_pathname(&path).expect("failed to create socket addr");
let addr = Address::from(addr_val);
let addr_clone = addr.clone();

tokio::spawn(async move {
if let Ok(mut listener) = Listener::listen(addr_clone, None).await {
while let Ok(_stream) = listener.accept().await {}
}
});

// Give listener time to start
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

let svc = ShmipcMakeTransport::new();
let stream = svc
.call(addr)
.await
.expect("failed to connect to shmipc listener");

(Self { path }, stream)
}
}

#[cfg(feature = "shmipc")]
impl Drop for ShmipcTestEnv {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}

#[cfg(feature = "shmipc")]
#[tokio::test]
async fn test_decode_unexpected_eof_returns_none_when_shmipc_available() {
let (_env, stream) = ShmipcTestEnv::new().await;

let reader = MockReader {
eof_behavior: EofBehavior::UnexpectedEof,
shmipc_stream: Some(stream),
};

let mut decoder = DefaultDecoder {
decoder: thrift::MakeThriftCodec::default().make_codec().1,
reader: BufReader::new(reader),
};

let mut cx = crate::context::ClientContext::new(
1,
RpcInfo::with_role(volo::context::Role::Client),
pilota::thrift::TMessageType::Call,
);

let result: Result<Option<ThriftMessage<Bytes>>, _> = decoder.decode(&mut cx).await;

assert!(result.is_ok());
assert!(result.unwrap().is_none());
}

#[cfg(feature = "shmipc")]
#[tokio::test]
async fn test_decode_other_error_returns_error_when_shmipc_available() {
let (_env, stream) = ShmipcTestEnv::new().await;

let reader = MockReader {
eof_behavior: EofBehavior::OtherError,
shmipc_stream: Some(stream),
};

let mut decoder = DefaultDecoder {
decoder: thrift::MakeThriftCodec::default().make_codec().1,
reader: BufReader::new(reader),
};

let mut cx = crate::context::ClientContext::new(
1,
RpcInfo::with_role(volo::context::Role::Client),
pilota::thrift::TMessageType::Call,
);

let result: Result<Option<ThriftMessage<Bytes>>, _> = decoder.decode(&mut cx).await;

assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("connection reset"));
}
}
Loading