Skip to content

bug: Ollama Stream Error On Multi-line NDJSON lines #1758

@ChadBartley

Description

@ChadBartley
  • I have looked for existing issues (including closed) about this

Bug Report

Streaming chat completions through the Ollama provider intermittently fail mid-stream with a serde_json "EOF while parsing" error. The failure is timing-dependent and shows up most reliably on responses that contain longer NDJSON lines — for example, agent runs with thinking enabled, or any turn that carries a substantial tool-call argument.

The cause is in crates/rig-core/src/providers/ollama.rs. The streaming loop reads bytes_stream() and splits each chunk on \n before parsing each segment as a CompletionResponse:

while let Some(chunk) = byte_stream.next().await {
    let bytes = chunk?;
    for line in bytes.split(|&b| b == b'\n') {
        if line.is_empty() { continue; }
        let response: CompletionResponse = serde_json::from_slice(line)?;
        // ...
    }
}

bytes_stream() makes no guarantees about chunk boundaries — a single NDJSON line can be split mid-token across two chunks. When that happens, both halves are fed to serde_json::from_slice independently and the first half fails parsing, terminating the stream with CompletionError::JsonError.

Concrete error captured against a local Ollama server:

Received NDJSON line from Ollama: {"model":"gemma4:26b","created_at":"...","messa
STREAM ERROR: CompletionError: JsonError: EOF while parsing a string at line 1 column 74
source chain:
  - JsonError: EOF while parsing a string at line 1 column 74
  - EOF while parsing a string at line 1 column 74

The NDJSON line is truncated at byte 74 because the chunk boundary landed mid-string.

Reproduction

Runnable example against a local Ollama server (Ollama 0.x with a thinking-capable model such as gemma4:26b). The example chains four tools so the agent emits multiple turns of larger NDJSON lines; combined with think: low, this surfaces the bug reliably for me within a single run.

Click to expand reproduction (single-file Cargo example)
use anyhow::Result;
use futures::StreamExt;
use rig::agent::MultiTurnStreamItem;
use rig::client::{CompletionClient, Nothing};
use rig::completion::ToolDefinition;
use rig::providers::ollama;
use rig::streaming::StreamingPrompt;
use rig::tool::Tool;
use serde::{Deserialize, Serialize};
use serde_json::json;

#[derive(Deserialize)]
struct SearchCodeArgs { query: String, #[serde(default)] max_results: Option<usize> }

#[derive(Debug, thiserror::Error)]
#[error("search error")]
struct SearchError;

#[derive(Deserialize, Serialize)]
struct SearchCode;

impl Tool for SearchCode {
    const NAME: &'static str = "search_code";
    type Error = SearchError;
    type Args = SearchCodeArgs;
    type Output = serde_json::Value;

    async fn definition(&self, _: String) -> ToolDefinition {
        ToolDefinition {
            name: "search_code".into(),
            description: "Full-text search across the indexed Rust codebase. Returns ranked matches with file path, line number, and a surrounding snippet.".into(),
            parameters: json!({
                "type": "object",
                "properties": {
                    "query": { "type": "string" },
                    "max_results": { "type": "integer" }
                },
                "required": ["query"]
            }),
        }
    }

    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
        Ok(json!({
            "query": args.query,
            "results": [
                { "path": "crates/ingest/src/pipeline.rs", "line": 311, "snippet": "    pub async fn flush(&mut self) -> Result<FlushReport, IngestError> {\n        let pending = std::mem::take(&mut self.pending);" },
                { "path": "crates/ingest/src/worker.rs",   "line": 88,  "snippet": "                Some(deadline) if Instant::now() >= deadline => {\n                    self.pipeline.flush().await?;" },
                { "path": "crates/ingest/src/shutdown.rs", "line": 54,  "snippet": "    pipeline.flush().await.map_err(ShutdownError::Drain)?;" },
                { "path": "tests/integration/flush_ordering.rs", "line": 27, "snippet": "    pipeline.flush().await?;" }
            ]
        }))
    }
}

#[derive(Deserialize)]
struct ReadSourceArgs { path: String, start_line: usize, end_line: usize }

#[derive(Debug, thiserror::Error)]
#[error("read error")]
struct ReadError;

#[derive(Deserialize, Serialize)]
struct ReadSource;

impl Tool for ReadSource {
    const NAME: &'static str = "read_source";
    type Error = ReadError;
    type Args = ReadSourceArgs;
    type Output = serde_json::Value;

    async fn definition(&self, _: String) -> ToolDefinition {
        ToolDefinition {
            name: "read_source".into(),
            description: "Read a line range from a source file. Returns the raw source text.".into(),
            parameters: json!({
                "type": "object",
                "properties": {
                    "path": { "type": "string" },
                    "start_line": { "type": "integer" },
                    "end_line":   { "type": "integer" }
                },
                "required": ["path", "start_line", "end_line"]
            }),
        }
    }

    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
        let source = r#"impl Pipeline {
    pub async fn flush(&mut self) -> Result<FlushReport, IngestError> {
        let pending = std::mem::take(&mut self.pending);
        if pending.is_empty() { return Ok(FlushReport::default()); }
        let payload = encode_records(&pending).map_err(IngestError::Encode)?;
        self.sink.send(payload).await.map_err(IngestError::Sink)?;
        // BUG: buffered_bytes never reset on the happy path.
        // self.buffered_bytes = 0;
        Ok(FlushReport { records_flushed: pending.len(), bytes_flushed: payload.len() })
    }
}"#;
        Ok(json!({ "path": args.path, "start_line": args.start_line, "end_line": args.end_line, "source": source }))
    }
}

#[derive(Deserialize)]
#[allow(dead_code)]
struct SubmitFindingArgs {
    title: String,
    severity: String,
    affected_paths: Vec<String>,
    root_cause: String,
    evidence: String,
    suggested_fix: String,
    test_plan: String,
}

#[derive(Debug, thiserror::Error)]
#[error("submit error")]
struct SubmitError;

#[derive(Deserialize, Serialize)]
struct SubmitFinding;

impl Tool for SubmitFinding {
    const NAME: &'static str = "submit_finding";
    type Error = SubmitError;
    type Args = SubmitFindingArgs;
    type Output = String;

    async fn definition(&self, _: String) -> ToolDefinition {
        ToolDefinition {
            name: "submit_finding".into(),
            description: "Submit your final bug report once you have enough evidence.".into(),
            parameters: json!({
                "type": "object",
                "properties": {
                    "title": { "type": "string" },
                    "severity": { "type": "string", "enum": ["low", "medium", "high", "critical"] },
                    "affected_paths": { "type": "array", "items": { "type": "string" } },
                    "root_cause":     { "type": "string" },
                    "evidence":       { "type": "string" },
                    "suggested_fix":  { "type": "string" },
                    "test_plan":      { "type": "string" }
                },
                "required": ["title", "severity", "affected_paths", "root_cause", "evidence", "suggested_fix", "test_plan"]
            }),
        }
    }

    async fn call(&self, args: Self::Args) -> Result<Self::Output, Self::Error> {
        Ok(format!("Finding '{}' submitted with severity={}.", args.title, args.severity))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .with_target(false)
        .init();

    let client = ollama::Client::builder()
        .api_key(Nothing)
        .base_url("http://localhost:11434")
        .build()?;

    let agent = client
        .agent("gemma4:26b")
        .preamble(
            "You are a senior Rust engineer triaging a production bug. Investigate by \
             chaining tools: first `search_code` to locate the symbol, then \
             `read_source` to inspect each hit, then finally `submit_finding` with a \
             thorough report quoting specific file:line references.",
        )
        .tool(SearchCode)
        .tool(ReadSource)
        .tool(SubmitFinding)
        .additional_params(json!({ "think": "low" }))
        .build();

    let mut stream = agent
        .stream_prompt(
            "The ingest pipeline's `buffered_bytes` counter grows unbounded. \
             Investigate the `flush` path, find the root cause, and file a \
             complete finding via the `submit_finding` tool.",
        )
        .multi_turn(30)
        .await;

    while let Some(item) = stream.next().await {
        match item {
            Ok(MultiTurnStreamItem::FinalResponse(_)) => println!("--- stream finished ok ---"),
            Ok(_) => {}
            Err(e) => {
                eprintln!("STREAM ERROR: {e}");
                let mut source = std::error::Error::source(&e);
                while let Some(s) = source { eprintln!("  - {s}"); source = s.source(); }
                std::process::exit(1);
            }
        }
    }
    Ok(())
}

Expected behavior

A single NDJSON line should always be parsed as a whole, regardless of how bytes_stream() chunks the underlying HTTP body. The stream should run to completion and yield the full sequence of assistant deltas and tool calls without raising JsonError on internal chunk boundaries.

Screenshots

NA

Additional context

The fix is a small buffered line reassembler in the Ollama streaming path, plus unit tests covering the partial-line case. Happy to follow up with a PR linked to this issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions