use anyhow::Result;
use futures::StreamExt;
use rig::agent::MultiTurnStreamItem;
use rig::client::{CompletionClient, Nothing};
use rig::completion::ToolDefinition;
use rig::providers::ollama;
use rig::streaming::StreamingPrompt;
use rig::tool::Tool;
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Deserialize)]
struct SearchCodeArgs { query: String, #[serde(default)] max_results: Option<usize> }
#[derive(Debug, thiserror::Error)]
#[error("search error")]
struct SearchError;
#[derive(Deserialize, Serialize)]
struct SearchCode;
impl Tool for SearchCode {
const NAME: &'static str = "search_code";
type Error = SearchError;
type Args = SearchCodeArgs;
type Output = serde_json::Value;
async fn definition(&self, _: String) -> ToolDefinition {
ToolDefinition {
name: "search_code".into(),
description: "Full-text search across the indexed Rust codebase. Returns ranked matches with file path, line number, and a surrounding snippet.".into(),
parameters: json!({
"type": "object",
"properties": {
"query": { "type": "string" },
"max_results": { "type": "integer" }
},
"required": ["query"]
}),
}
}
async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
Ok(json!({
"query": args.query,
"results": [
{ "path": "crates/ingest/src/pipeline.rs", "line": 311, "snippet": " pub async fn flush(&mut self) -> Result<FlushReport, IngestError> {\n let pending = std::mem::take(&mut self.pending);" },
{ "path": "crates/ingest/src/worker.rs", "line": 88, "snippet": " Some(deadline) if Instant::now() >= deadline => {\n self.pipeline.flush().await?;" },
{ "path": "crates/ingest/src/shutdown.rs", "line": 54, "snippet": " pipeline.flush().await.map_err(ShutdownError::Drain)?;" },
{ "path": "tests/integration/flush_ordering.rs", "line": 27, "snippet": " pipeline.flush().await?;" }
]
}))
}
}
#[derive(Deserialize)]
struct ReadSourceArgs { path: String, start_line: usize, end_line: usize }
#[derive(Debug, thiserror::Error)]
#[error("read error")]
struct ReadError;
#[derive(Deserialize, Serialize)]
struct ReadSource;
impl Tool for ReadSource {
const NAME: &'static str = "read_source";
type Error = ReadError;
type Args = ReadSourceArgs;
type Output = serde_json::Value;
async fn definition(&self, _: String) -> ToolDefinition {
ToolDefinition {
name: "read_source".into(),
description: "Read a line range from a source file. Returns the raw source text.".into(),
parameters: json!({
"type": "object",
"properties": {
"path": { "type": "string" },
"start_line": { "type": "integer" },
"end_line": { "type": "integer" }
},
"required": ["path", "start_line", "end_line"]
}),
}
}
async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
let source = r#"impl Pipeline {
pub async fn flush(&mut self) -> Result<FlushReport, IngestError> {
let pending = std::mem::take(&mut self.pending);
if pending.is_empty() { return Ok(FlushReport::default()); }
let payload = encode_records(&pending).map_err(IngestError::Encode)?;
self.sink.send(payload).await.map_err(IngestError::Sink)?;
// BUG: buffered_bytes never reset on the happy path.
// self.buffered_bytes = 0;
Ok(FlushReport { records_flushed: pending.len(), bytes_flushed: payload.len() })
}
}"#;
Ok(json!({ "path": args.path, "start_line": args.start_line, "end_line": args.end_line, "source": source }))
}
}
#[derive(Deserialize)]
#[allow(dead_code)]
struct SubmitFindingArgs {
title: String,
severity: String,
affected_paths: Vec<String>,
root_cause: String,
evidence: String,
suggested_fix: String,
test_plan: String,
}
#[derive(Debug, thiserror::Error)]
#[error("submit error")]
struct SubmitError;
#[derive(Deserialize, Serialize)]
struct SubmitFinding;
impl Tool for SubmitFinding {
const NAME: &'static str = "submit_finding";
type Error = SubmitError;
type Args = SubmitFindingArgs;
type Output = String;
async fn definition(&self, _: String) -> ToolDefinition {
ToolDefinition {
name: "submit_finding".into(),
description: "Submit your final bug report once you have enough evidence.".into(),
parameters: json!({
"type": "object",
"properties": {
"title": { "type": "string" },
"severity": { "type": "string", "enum": ["low", "medium", "high", "critical"] },
"affected_paths": { "type": "array", "items": { "type": "string" } },
"root_cause": { "type": "string" },
"evidence": { "type": "string" },
"suggested_fix": { "type": "string" },
"test_plan": { "type": "string" }
},
"required": ["title", "severity", "affected_paths", "root_cause", "evidence", "suggested_fix", "test_plan"]
}),
}
}
async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
Ok(format!("Finding '{}' submitted with severity={}.", args.title, args.severity))
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
let client = ollama::Client::builder()
.api_key(Nothing)
.base_url("http://localhost:11434")
.build()?;
let agent = client
.agent("gemma4:26b")
.preamble(
"You are a senior Rust engineer triaging a production bug. Investigate by \
chaining tools: first `search_code` to locate the symbol, then \
`read_source` to inspect each hit, then finally `submit_finding` with a \
thorough report quoting specific file:line references.",
)
.tool(SearchCode)
.tool(ReadSource)
.tool(SubmitFinding)
.additional_params(json!({ "think": "low" }))
.build();
let mut stream = agent
.stream_prompt(
"The ingest pipeline's `buffered_bytes` counter grows unbounded. \
Investigate the `flush` path, find the root cause, and file a \
complete finding via the `submit_finding` tool.",
)
.multi_turn(30)
.await;
while let Some(item) = stream.next().await {
match item {
Ok(MultiTurnStreamItem::FinalResponse(_)) => println!("--- stream finished ok ---"),
Ok(_) => {}
Err(e) => {
eprintln!("STREAM ERROR: {e}");
let mut source = std::error::Error::source(&e);
while let Some(s) = source { eprintln!(" - {s}"); source = s.source(); }
std::process::exit(1);
}
}
}
Ok(())
}
Bug Report
Streaming chat completions through the Ollama provider intermittently fail mid-stream with a
serde_json"EOF while parsing" error. The failure is timing-dependent and shows up most reliably on responses that contain longer NDJSON lines — for example, agent runs with thinking enabled, or any turn that carries a substantial tool-call argument.The cause is in
crates/rig-core/src/providers/ollama.rs. The streaming loop readsbytes_stream()and splits each chunk on\nbefore parsing each segment as aCompletionResponse:bytes_stream()makes no guarantees about chunk boundaries — a single NDJSON line can be split mid-token across two chunks. When that happens, both halves are fed toserde_json::from_sliceindependently and the first half fails parsing, terminating the stream withCompletionError::JsonError.Concrete error captured against a local Ollama server:
The NDJSON line is truncated at byte 74 because the chunk boundary landed mid-string.
Reproduction
Runnable example against a local Ollama server (Ollama 0.x with a thinking-capable model such as
gemma4:26b). The example chains four tools so the agent emits multiple turns of larger NDJSON lines; combined withthink: low, this surfaces the bug reliably for me within a single run.Click to expand reproduction (single-file Cargo example)
Expected behavior
A single NDJSON line should always be parsed as a whole, regardless of how
bytes_stream()chunks the underlying HTTP body. The stream should run to completion and yield the full sequence of assistant deltas and tool calls without raisingJsonErroron internal chunk boundaries.Screenshots
NA
Additional context
The fix is a small buffered line reassembler in the Ollama streaming path, plus unit tests covering the partial-line case. Happy to follow up with a PR linked to this issue.